Files
2026-01-19 10:38:50 +08:00

341 lines
11 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import base64
import math
from abc import abstractmethod
from functools import partial
from io import BytesIO
from pathlib import Path
from typing import Any
import numpy as np
import numpy.typing as npt
from PIL import Image
from vllm import envs
from vllm.logger import init_logger
from vllm.utils.registry import ExtensionManager
from .base import MediaIO
from .image import ImageMediaIO
logger = init_logger(__name__)
def resize_video(frames: npt.NDArray, size: tuple[int, int]) -> npt.NDArray:
num_frames, _, _, channels = frames.shape
new_height, new_width = size
resized_frames = np.empty(
(num_frames, new_height, new_width, channels), dtype=frames.dtype
)
# lazy import cv2 to avoid bothering users who only use text models
import cv2
for i, frame in enumerate(frames):
resized_frame = cv2.resize(frame, (new_width, new_height))
resized_frames[i] = resized_frame
return resized_frames
def rescale_video_size(frames: npt.NDArray, size_factor: float) -> npt.NDArray:
_, height, width, _ = frames.shape
new_height = int(height * size_factor)
new_width = int(width * size_factor)
return resize_video(frames, (new_height, new_width))
def sample_frames_from_video(frames: npt.NDArray, num_frames: int) -> npt.NDArray:
total_frames = frames.shape[0]
if num_frames == -1:
return frames
frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
sampled_frames = frames[frame_indices, ...]
return sampled_frames
class VideoLoader:
@classmethod
@abstractmethod
def load_bytes(
cls, data: bytes, num_frames: int = -1, **kwargs
) -> tuple[npt.NDArray, dict[str, Any]]:
raise NotImplementedError
@staticmethod
def _read_frames(
cap,
frame_indices: set[int],
num_expected_frames: int,
max_frame_idx: int,
) -> tuple[npt.NDArray, int, list[int]]:
import cv2
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames = np.empty((num_expected_frames, height, width, 3), dtype=np.uint8)
i = 0
valid_frame_indices = []
for idx in range(max_frame_idx + 1):
ok = cap.grab()
if not ok:
# Frame is broken/unreadable, log warning
if idx in frame_indices:
logger.warning(
"Failed to grab frame %d during video loading. "
"This frame will be skipped.",
idx,
)
continue
if idx in frame_indices:
ret, frame = cap.retrieve()
if ret:
frames[i] = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
valid_frame_indices.append(idx)
i += 1
else:
# retrieve() failed even though grab() succeeded
logger.warning(
"Failed to retrieve frame %d during video loading. "
"This frame will be skipped.",
idx,
)
valid_num_frames = len(valid_frame_indices)
if valid_num_frames < num_expected_frames:
logger.warning(
"Video loading completed with %d broken/unreadable frames. "
"Expected %d frames but only loaded %d frames.",
num_expected_frames - valid_num_frames,
num_expected_frames,
valid_num_frames,
)
return frames[:valid_num_frames], valid_num_frames, valid_frame_indices
VIDEO_LOADER_REGISTRY = ExtensionManager()
@VIDEO_LOADER_REGISTRY.register("opencv")
class OpenCVVideoBackend(VideoLoader):
def get_cv2_video_api(self):
import cv2.videoio_registry as vr
api_pref = None
for backend in vr.getStreamBufferedBackends():
if not vr.hasBackend(backend):
continue
if not vr.isBackendBuiltIn(backend):
_, abi, api = vr.getStreamBufferedBackendPluginVersion(backend)
if abi < 1 or (abi == 1 and api < 2):
continue
api_pref = backend
break
return api_pref
@classmethod
def load_bytes(
cls,
data: bytes,
num_frames: int = -1,
fps: int = -1,
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
import cv2
backend = cls().get_cv2_video_api()
cap = cv2.VideoCapture(BytesIO(data), backend, [])
if not cap.isOpened():
raise ValueError("Could not open video stream")
total_frames_num = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
original_fps = cap.get(cv2.CAP_PROP_FPS)
duration = total_frames_num / original_fps if original_fps > 0 else 0
# resample video to target num_frames and fps
# - the minimum of the two will be used
num_frames_to_sample = total_frames_num
if num_frames > 0:
num_frames_to_sample = min(num_frames, total_frames_num)
if fps > 0:
num_frames_to_sample = min(num_frames_to_sample, math.floor(duration * fps))
num_frames_to_sample = max(1, num_frames_to_sample) # at least one sample
if num_frames_to_sample == total_frames_num:
frame_idx = list(range(0, num_frames_to_sample))
else:
uniform_sampled_frames = np.linspace(
0, total_frames_num - 1, num_frames_to_sample, dtype=int
)
frame_idx = uniform_sampled_frames.tolist()
# Convert to set for O(1) lookup performance
frame_idx_set = set(frame_idx)
frames, valid_num_frames, valid_frame_indices = cls._read_frames(
cap, frame_idx_set, num_frames_to_sample, max(frame_idx)
)
# Use transformers transformers.video_utils.VideoMetadata format
# NOTE(Isotr0py): For models like Qwen3-VL/GLM4.5V, this metadata
# can cause incorrect timestamp calculation without num_frames=-1.
metadata = {
"total_num_frames": total_frames_num,
"fps": original_fps,
"duration": duration,
"video_backend": "opencv",
"frames_indices": valid_frame_indices,
# extra field used to control hf processor's video
# sampling behavior
"do_sample_frames": valid_num_frames == total_frames_num,
}
return frames, metadata
@VIDEO_LOADER_REGISTRY.register("opencv_dynamic")
class OpenCVDynamicVideoBackend(OpenCVVideoBackend):
@classmethod
def load_bytes(
cls,
data: bytes,
num_frames: int = -1,
fps: int = 2,
max_duration: int = 300,
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
import cv2
backend = cls().get_cv2_video_api()
cap = cv2.VideoCapture(BytesIO(data), backend, [])
if not cap.isOpened():
raise ValueError("Could not open video stream")
total_frames_num = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
original_fps = cap.get(cv2.CAP_PROP_FPS)
duration = total_frames_num / original_fps if original_fps > 0 else 0
# resample video to target num_frames
max_frame_idx = total_frames_num - 1
duration = duration or round(max_frame_idx / original_fps) + 1
# Refer to:
# https://github.com/huggingface/transformers/blob/v4.55.4/src/transformers/models/glm4v/video_processing_glm4v.py#L103-L140
frame_indices_list: list[int]
if duration <= max_duration:
n = int(math.floor(duration * fps))
frame_indices_list = sorted(
{
min(max_frame_idx, int(math.ceil(i * original_fps / fps)))
for i in range(n)
}
)
else:
num_samples = int(max_duration * fps)
if num_samples >= total_frames_num:
frame_indices_list = list(range(total_frames_num))
else:
target_seconds = np.linspace(0, duration, num_samples, endpoint=True)
frame_indices_list = sorted(
{
min(max_frame_idx, int(math.ceil(t * original_fps)))
for t in target_seconds
}
)
# Convert to set for O(1) lookup performance
frame_indices_set = set(frame_indices_list)
frames, valid_num_frames, valid_frame_indices = cls._read_frames(
cap,
frame_indices_set,
len(frame_indices_list),
total_frames_num - 1,
)
# Use transformers transformers.video_utils.VideoMetadata format
metadata = {
"total_num_frames": total_frames_num,
"fps": original_fps,
"duration": duration,
"video_backend": "opencv_dynamic",
"frames_indices": valid_frame_indices,
"do_sample_frames": False,
}
return frames, metadata
class VideoMediaIO(MediaIO[tuple[npt.NDArray, dict[str, Any]]]):
def __init__(
self,
image_io: ImageMediaIO,
num_frames: int = 32,
**kwargs,
) -> None:
super().__init__()
self.image_io = image_io
self.num_frames = num_frames
# `kwargs` contains custom arguments from
# --media-io-kwargs for this modality.
# They can be passed to the underlying
# media loaders (e.g. custom implementations)
# for flexible control.
# Allow per-request override of video backend via kwargs.
# This enables users to specify a different backend than the
# global VLLM_VIDEO_LOADER_BACKEND env var, e.g.:
# --media-io-kwargs '{"video": {"video_backend": "torchcodec"}}'
video_loader_backend = (
kwargs.pop("video_backend", None) or envs.VLLM_VIDEO_LOADER_BACKEND
)
self.kwargs = kwargs
self.video_loader = VIDEO_LOADER_REGISTRY.load(video_loader_backend)
def load_bytes(self, data: bytes) -> tuple[npt.NDArray, dict[str, Any]]:
return self.video_loader.load_bytes(
data, num_frames=self.num_frames, **self.kwargs
)
def load_base64(
self, media_type: str, data: str
) -> tuple[npt.NDArray, dict[str, Any]]:
if media_type.lower() == "video/jpeg":
load_frame = partial(
self.image_io.load_base64,
"image/jpeg",
)
return np.stack(
[np.asarray(load_frame(frame_data)) for frame_data in data.split(",")]
), {}
return self.load_bytes(base64.b64decode(data))
def load_file(self, filepath: Path) -> tuple[npt.NDArray, dict[str, Any]]:
with filepath.open("rb") as f:
data = f.read()
return self.load_bytes(data)
def encode_base64(
self,
media: npt.NDArray,
*,
video_format: str = "JPEG",
) -> str:
video = media
if video_format == "JPEG":
encode_frame = partial(
self.image_io.encode_base64,
image_format=video_format,
)
return ",".join(encode_frame(Image.fromarray(frame)) for frame in video)
msg = "Only JPEG format is supported for now."
raise NotImplementedError(msg)