This commit is contained in:
2025-10-09 16:47:16 +08:00
parent c8feb4deb5
commit e27e3f16bb
5248 changed files with 1778505 additions and 0 deletions

View File

@@ -0,0 +1,300 @@
# Copyright 2025 The Qwen Team and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch Qwen3-VL model."""
import copy
import unittest
from transformers import (
Qwen3VLConfig,
Qwen3VLForConditionalGeneration,
Qwen3VLModel,
is_torch_available,
)
from transformers.testing_utils import (
require_torch,
torch_device,
)
from ...generation.test_utils import GenerationTesterMixin
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import (
ModelTesterMixin,
floats_tensor,
ids_tensor,
)
if is_torch_available():
import torch
class Qwen3VLVisionText2TextModelTester:
def __init__(
self,
parent,
batch_size=3,
seq_length=7,
num_channels=3,
ignore_index=-100,
image_size=16,
text_config={
"bos_token_id": 0,
"eos_token_id": 1,
"pad_token_id": 2,
"hidden_act": "silu",
"head_dim": 8,
"hidden_size": 32,
"vocab_size": 99,
"intermediate_size": 37,
"max_position_embeddings": 512,
"model_type": "qwen3_vl",
"num_attention_heads": 4,
"num_hidden_layers": 2,
"num_key_value_heads": 2,
"rope_theta": 10000,
"tie_word_embeddings": True,
"rope_scaling": {"rope_type": "default", "mrope_section": [16, 8, 8], "mrope_interleaved": True},
},
vision_config={
"depth": 2,
"in_chans": 3,
"hidden_act": "gelu_pytorch_tanh",
"intermediate_size": 32,
"out_hidden_size": 32,
"hidden_size": 32,
"num_heads": 4,
"patch_size": 16,
"spatial_merge_size": 1,
"temporal_patch_size": 2,
"num_position_embeddings": 16,
"deepstack_visual_indexes": [0, 1],
},
image_token_id=3,
video_token_id=4,
vision_start_token_id=5,
vision_end_token_id=6,
tie_word_embeddings=True,
is_training=True,
):
self.parent = parent
self.ignore_index = ignore_index
self.is_training = is_training
self.vision_config = vision_config
self.text_config = text_config
self.vocab_size = text_config["vocab_size"]
self.bos_token_id = text_config["bos_token_id"]
self.eos_token_id = text_config["eos_token_id"]
self.pad_token_id = text_config["pad_token_id"]
self.head_dim = text_config["head_dim"]
self.hidden_size = text_config["hidden_size"]
self.intermediate_size = text_config["intermediate_size"]
self.num_hidden_layers = text_config["num_hidden_layers"]
self.num_attention_heads = text_config["num_attention_heads"]
self.num_key_value_heads = text_config["num_key_value_heads"]
self.rope_theta = text_config["rope_theta"]
self.rope_scaling = text_config["rope_scaling"]
self.hidden_act = text_config["hidden_act"]
self.max_position_embeddings = text_config["max_position_embeddings"]
self.model_type = text_config["model_type"]
self.vision_start_token_id = vision_start_token_id
self.vision_end_token_id = vision_end_token_id
self.image_token_id = image_token_id
self.video_token_id = video_token_id
self.tie_word_embeddings = tie_word_embeddings
self.batch_size = batch_size
self.num_channels = num_channels
self.image_size = image_size
self.num_image_tokens = 32
self.seq_length = seq_length + self.num_image_tokens
def get_config(self):
return Qwen3VLConfig(
text_config=self.text_config,
vision_config=self.vision_config,
image_token_id=self.image_token_id,
video_token_id=self.video_token_id,
vision_start_token_id=self.vision_start_token_id,
vision_end_token_id=self.vision_end_token_id,
tie_word_embeddings=self.tie_word_embeddings,
)
def prepare_config_and_inputs(self):
config = self.get_config()
patch_size = config.vision_config.patch_size
temporal_patch_size = config.vision_config.temporal_patch_size
pixel_values = floats_tensor(
[
self.batch_size * (self.image_size**2) // (patch_size**2),
self.num_channels * (patch_size**2) * temporal_patch_size,
]
)
return config, pixel_values
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, pixel_values = config_and_inputs
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
attention_mask = torch.ones(input_ids.shape, dtype=torch.long, device=torch_device)
input_ids[:, -1] = self.pad_token_id
input_ids[input_ids == self.video_token_id] = self.pad_token_id
input_ids[input_ids == self.image_token_id] = self.pad_token_id
input_ids[input_ids == self.vision_start_token_id] = self.pad_token_id
input_ids[:, self.num_image_tokens] = self.image_token_id
input_ids[:, self.num_image_tokens - 1] = self.vision_start_token_id
inputs_dict = {
"pixel_values": pixel_values,
"image_grid_thw": torch.tensor([[1, 1, 1]] * self.batch_size, device=torch_device),
"input_ids": input_ids,
"attention_mask": attention_mask,
}
return config, inputs_dict
@require_torch
class Qwen3VLModelTest(ModelTesterMixin, GenerationTesterMixin, unittest.TestCase):
"""
Model tester for `Qwen3VLForConditionalGeneration`.
"""
all_model_classes = (
(
Qwen3VLModel,
Qwen3VLForConditionalGeneration,
)
if is_torch_available()
else ()
)
test_pruning = False
test_head_masking = False
def setUp(self):
self.model_tester = Qwen3VLVisionText2TextModelTester(self)
self.config_tester = ConfigTester(self, config_class=Qwen3VLConfig, has_text_modality=False)
def test_config(self):
self.config_tester.run_common_tests()
def test_mismatching_num_image_tokens(self):
"""
Tests that VLMs through an error with explicit message saying what is wrong
when number of images don't match number of image tokens in the text.
Also we need to test multi-image cases when one prompr has multiple image tokens.
"""
config, input_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config).to(torch_device)
model.eval()
_ = model(**input_dict) # successful forward with no modifications
curr_input_dict = copy.deepcopy(input_dict)
# remove one image but leave the image token in text
patch_size = config.vision_config.patch_size
one_img_length = (self.model_tester.image_size**2) // (patch_size**2)
curr_input_dict["pixel_values"] = curr_input_dict["pixel_values"][-one_img_length:, ...]
curr_input_dict["image_grid_thw"] = curr_input_dict["image_grid_thw"][-1:, ...]
with self.assertRaises(ValueError):
_ = model(**curr_input_dict)
# simulate multi-image case by concatenating inputs where each has exactly one image/image-token
input_ids = curr_input_dict["input_ids"][:1]
pixel_values = curr_input_dict["pixel_values"][:one_img_length]
image_grid_thw = curr_input_dict["image_grid_thw"][:1]
input_ids = torch.cat([input_ids, input_ids], dim=0)
# one image and two image tokens raise an error
with self.assertRaises(ValueError):
_ = model(
input_ids=input_ids,
pixel_values=pixel_values,
image_grid_thw=image_grid_thw,
)
# two images and two image tokens don't raise an error
pixel_values = torch.cat([pixel_values, pixel_values], dim=0)
image_grid_thw = torch.cat([image_grid_thw, image_grid_thw], dim=0)
_ = model(
input_ids=input_ids,
pixel_values=pixel_values,
image_grid_thw=image_grid_thw,
)
def test_video_forward(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
B = self.model_tester.batch_size
C = config.vision_config.in_chans
T = config.vision_config.temporal_patch_size
P = config.vision_config.patch_size
input_ids = ids_tensor([B, self.model_tester.seq_length], self.model_tester.vocab_size)
F = 4
patch_H = self.model_tester.image_size // P
patch_W = self.model_tester.image_size // P
patch_T = F // T
patches_per_video = patch_T * patch_H * patch_W
pathed_per_frame = patch_H * patch_W
pixel_values_videos = floats_tensor(
[
# first dim: batch_size * num_patches
B * patches_per_video,
# second dim: in_channels * temporal_patch_size * patch_size^2
C * T * (P**2),
]
)
# qwen3vl use timestamps for video, so split it into patch_T sub-videos
video_grid_thw = torch.tensor([[1, patch_H, patch_W] for _ in range(patch_T)] * B)
# sanity check
self.assertEqual(pixel_values_videos.shape[0], video_grid_thw.prod(dim=1).sum().item())
# Insert video token sequence
input_ids[:, -1] = self.model_tester.pad_token_id
input_ids[input_ids == self.model_tester.video_token_id] = self.model_tester.pad_token_id
input_ids[input_ids == self.model_tester.image_token_id] = self.model_tester.pad_token_id
input_ids[input_ids == self.model_tester.vision_start_token_id] = self.model_tester.pad_token_id
input_ids[:, self.model_tester.num_image_tokens] = self.model_tester.video_token_id
insertion_point = self.model_tester.num_image_tokens
self.assertLessEqual((B * patches_per_video) + insertion_point, self.model_tester.seq_length)
for b in range(B):
# each frame is separated by a vision_start_token_id
for frame_idx in range(patch_T):
input_ids[b, insertion_point + frame_idx * (pathed_per_frame + 1)] = (
self.model_tester.vision_start_token_id
)
input_ids[
b,
insertion_point + frame_idx * (pathed_per_frame + 1) + 1 : insertion_point
+ (frame_idx + 1) * (pathed_per_frame + 1),
] = self.model_tester.video_token_id
for model_class in self.all_model_classes:
# TODO:we should remove this because we use timestamps for video
model = model_class(config).to(torch_device)
outputs = model(
input_ids=input_ids,
pixel_values_videos=pixel_values_videos,
video_grid_thw=video_grid_thw,
)
self.assertIsNotNone(outputs)

View File

@@ -0,0 +1,379 @@
# Copyright 2025 The Qwen Team and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import shutil
import tempfile
import unittest
import numpy as np
import pytest
from transformers import AutoProcessor, Qwen2TokenizerFast
from transformers.testing_utils import require_av, require_torch, require_torchvision, require_vision
from transformers.utils import is_torch_available, is_vision_available
from ...test_processing_common import ProcessorTesterMixin
if is_vision_available():
from transformers import Qwen2VLImageProcessorFast, Qwen3VLProcessor
if is_torch_available():
import torch
@require_vision
@require_torch
@require_torchvision
@unittest.skip("The checkpoint is not yet released")
class Qwen3VLProcessorTest(ProcessorTesterMixin, unittest.TestCase):
processor_class = Qwen3VLProcessor
@classmethod
def setUpClass(cls):
cls.tmpdirname = tempfile.mkdtemp()
processor = Qwen3VLProcessor.from_pretrained(
"Qwen/Qwen3-VL-4B-Instruct", patch_size=4, max_pixels=56 * 56, min_pixels=28 * 28
)
processor.save_pretrained(cls.tmpdirname)
cls.image_token = processor.image_token
def get_tokenizer(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).tokenizer
def get_image_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).image_processor
def get_video_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).video_processor
def get_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdirname, ignore_errors=True)
# Copied from tests.models.llava.test_processing_llava.LlavaProcessorTest.test_get_num_vision_tokens
def test_get_num_vision_tokens(self):
"Tests general functionality of the helper used internally in vLLM"
processor = self.get_processor()
output = processor._get_num_multimodal_tokens(image_sizes=[(100, 100), (300, 100), (500, 30)])
self.assertTrue("num_image_tokens" in output)
self.assertEqual(len(output["num_image_tokens"]), 3)
self.assertTrue("num_image_patches" in output)
self.assertEqual(len(output["num_image_patches"]), 3)
def test_save_load_pretrained_default(self):
tokenizer = self.get_tokenizer()
image_processor = self.get_image_processor()
video_processor = self.get_video_processor()
processor = Qwen3VLProcessor(
tokenizer=tokenizer, image_processor=image_processor, video_processor=video_processor
)
processor.save_pretrained(self.tmpdirname)
processor = Qwen3VLProcessor.from_pretrained(self.tmpdirname, use_fast=True)
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer.get_vocab())
self.assertEqual(processor.image_processor.to_json_string(), image_processor.to_json_string())
self.assertIsInstance(processor.tokenizer, Qwen2TokenizerFast)
self.assertIsInstance(processor.image_processor, Qwen2VLImageProcessorFast)
def test_image_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
video_processor = self.get_video_processor()
processor = Qwen3VLProcessor(
tokenizer=tokenizer, image_processor=image_processor, video_processor=video_processor
)
image_input = self.prepare_image_inputs()
input_image_proc = image_processor(image_input, return_tensors="pt")
input_processor = processor(images=image_input, text="dummy", return_tensors="pt")
for key in input_image_proc:
self.assertAlmostEqual(input_image_proc[key].sum(), input_processor[key].sum(), delta=1e-2)
def test_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
video_processor = self.get_video_processor()
processor = Qwen3VLProcessor(
tokenizer=tokenizer, image_processor=image_processor, video_processor=video_processor
)
input_str = "lower newer"
image_input = self.prepare_image_inputs()
inputs = processor(text=input_str, images=image_input)
self.assertListEqual(
list(inputs.keys()),
["input_ids", "attention_mask", "pixel_values", "image_grid_thw"],
)
# test if it raises when no input is passed
with pytest.raises(ValueError):
processor()
# test if it raises when no text is passed
with pytest.raises(TypeError):
processor(images=image_input)
def test_model_input_names(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
video_processor = self.get_video_processor()
processor = Qwen3VLProcessor(
tokenizer=tokenizer, image_processor=image_processor, video_processor=video_processor
)
input_str = "lower newer"
image_input = self.prepare_image_inputs()
video_inputs = self.prepare_video_inputs()
inputs = processor(text=input_str, images=image_input, videos=video_inputs, do_sample_frames=False)
self.assertListEqual(list(inputs.keys()), processor.model_input_names)
@require_torch
@require_av
def _test_apply_chat_template(
self,
modality: str,
batch_size: int,
return_tensors: str,
input_name: str,
processor_name: str,
input_data: list[str],
):
processor = self.get_processor()
if processor.chat_template is None:
self.skipTest("Processor has no chat template")
if processor_name not in self.processor_class.attributes:
self.skipTest(f"{processor_name} attribute not present in {self.processor_class}")
batch_messages = [
[
{
"role": "user",
"content": [{"type": "text", "text": "Describe this."}],
},
]
] * batch_size
# Test that jinja can be applied
formatted_prompt = processor.apply_chat_template(batch_messages, add_generation_prompt=True, tokenize=False)
self.assertEqual(len(formatted_prompt), batch_size)
# Test that tokenizing with template and directly with `self.tokenizer` gives same output
formatted_prompt_tokenized = processor.apply_chat_template(
batch_messages, add_generation_prompt=True, tokenize=True, return_tensors=return_tensors
)
add_special_tokens = True
if processor.tokenizer.bos_token is not None and formatted_prompt[0].startswith(processor.tokenizer.bos_token):
add_special_tokens = False
tok_output = processor.tokenizer(
formatted_prompt, return_tensors=return_tensors, add_special_tokens=add_special_tokens
)
expected_output = tok_output.input_ids
self.assertListEqual(expected_output.tolist(), formatted_prompt_tokenized.tolist())
# Test that kwargs passed to processor's `__call__` are actually used
tokenized_prompt_100 = processor.apply_chat_template(
batch_messages,
add_generation_prompt=True,
tokenize=True,
padding="max_length",
truncation=True,
return_tensors=return_tensors,
max_length=100,
)
self.assertEqual(len(tokenized_prompt_100[0]), 100)
# Test that `return_dict=True` returns text related inputs in the dict
out_dict_text = processor.apply_chat_template(
batch_messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors=return_tensors,
)
self.assertTrue(all(key in out_dict_text for key in ["input_ids", "attention_mask"]))
self.assertEqual(len(out_dict_text["input_ids"]), batch_size)
self.assertEqual(len(out_dict_text["attention_mask"]), batch_size)
# Test that with modality URLs and `return_dict=True`, we get modality inputs in the dict
for idx, url in enumerate(input_data[:batch_size]):
batch_messages[idx][0]["content"] = [batch_messages[idx][0]["content"][0], {"type": modality, "url": url}]
out_dict = processor.apply_chat_template(
batch_messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors=return_tensors,
max_frames=2, # by default no more than 2 frames, otherwise too slow
)
input_name = getattr(self, input_name)
self.assertTrue(input_name in out_dict)
self.assertEqual(len(out_dict["input_ids"]), batch_size)
self.assertEqual(len(out_dict["attention_mask"]), batch_size)
if modality == "video":
# qwen pixels don't scale with bs same way as other models, calculate expected video token count based on video_grid_thw
expected_video_token_count = 0
for thw in out_dict["video_grid_thw"]:
expected_video_token_count += thw[0] * thw[1] * thw[2]
mm_len = expected_video_token_count
else:
mm_len = batch_size * 192
self.assertEqual(len(out_dict[input_name]), mm_len)
return_tensor_to_type = {"pt": torch.Tensor, "np": np.ndarray, None: list}
for k in out_dict:
self.assertIsInstance(out_dict[k], return_tensor_to_type[return_tensors])
@require_av
@unittest.skip("qwen3_vl can't sample frames from image frames directly, user can use `qwen-vl-utils`")
def test_apply_chat_template_video_1(self):
pass
@require_av
@unittest.skip("qwen3_vl can't sample frames from image frames directly, user can use `qwen-vl-utils`")
def test_apply_chat_template_video_2(self):
pass
@require_av
def test_apply_chat_template_video_frame_sampling(self):
processor = self.get_processor()
if processor.chat_template is None:
self.skipTest("Processor has no chat template")
signature = inspect.signature(processor.__call__)
if "videos" not in {*signature.parameters.keys()} or (
signature.parameters.get("videos") is not None
and signature.parameters["videos"].annotation == inspect._empty
):
self.skipTest("Processor doesn't accept videos at input")
messages = [
[
{
"role": "user",
"content": [
{"type": "video"},
{"type": "text", "text": "What is shown in this video?"},
],
},
]
]
formatted_prompt = processor.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
self.assertEqual(len(formatted_prompt), 1)
formatted_prompt_tokenized = processor.apply_chat_template(messages, add_generation_prompt=True, tokenize=True)
expected_output = processor.tokenizer(formatted_prompt, return_tensors=None).input_ids
self.assertListEqual(expected_output, formatted_prompt_tokenized)
out_dict = processor.apply_chat_template(messages, add_generation_prompt=True, tokenize=True, return_dict=True)
self.assertListEqual(list(out_dict.keys()), ["input_ids", "attention_mask"])
# Add video URL for return dict and load with `num_frames` arg
messages[0][0]["content"][0] = {
"type": "video",
"url": "https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/tiny_video.mp4",
}
num_frames = 3
out_dict_with_video = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
num_frames=num_frames,
)
self.assertTrue(self.videos_input_name in out_dict_with_video)
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 360)
# Load with `fps` arg
fps = 1
out_dict_with_video = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
fps=fps,
)
self.assertTrue(self.videos_input_name in out_dict_with_video)
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 900)
# Load with `fps` and `num_frames` args, should raise an error
with self.assertRaises(ValueError):
out_dict_with_video = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
fps=fps,
num_frames=num_frames,
)
# Load without any arg should load the whole video
out_dict_with_video = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
)
self.assertTrue(self.videos_input_name in out_dict_with_video)
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 27000)
# Load video as a list of frames (i.e. images). NOTE: each frame should have same size
# because we assume they come from one video
messages[0][0]["content"][0] = {
"type": "video",
"url": [
"https://www.ilankelman.org/stopsigns/australia.jpg",
"https://www.ilankelman.org/stopsigns/australia.jpg",
],
}
out_dict_with_video = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
do_sample_frames=False,
)
self.assertTrue(self.videos_input_name in out_dict_with_video)
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 160)
def test_kwargs_overrides_custom_image_processor_kwargs(self):
processor = self.get_processor()
self.skip_processor_without_typed_kwargs(processor)
input_str = self.prepare_text_inputs()
image_input = self.prepare_image_inputs()
inputs = processor(text=input_str, images=image_input, max_pixels=56 * 56 * 4, return_tensors="pt")
self.assertEqual(inputs[self.images_input_name].shape[0], 612)
inputs = processor(text=input_str, images=image_input, return_tensors="pt")
self.assertEqual(inputs[self.images_input_name].shape[0], 100)

View File

@@ -0,0 +1,330 @@
# coding=utf-8
# Copyright 2025 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from transformers.image_utils import IMAGENET_STANDARD_MEAN, IMAGENET_STANDARD_STD
from transformers.testing_utils import require_torch, require_vision
from transformers.utils import is_torch_available, is_torchvision_available, is_vision_available
from ...test_video_processing_common import VideoProcessingTestMixin, prepare_video_inputs
if is_torch_available():
from PIL import Image
if is_vision_available() and is_torchvision_available():
from transformers import Qwen3VLVideoProcessor
from transformers.models.qwen3_vl.video_processing_qwen3_vl import smart_resize
class Qwen3VLVideoProcessingTester:
def __init__(
self,
parent,
batch_size=5,
num_frames=8,
num_channels=3,
min_resolution=32,
max_resolution=80,
temporal_patch_size=2,
patch_size=16,
merge_size=2,
do_resize=True,
size=None,
do_normalize=True,
image_mean=IMAGENET_STANDARD_MEAN,
image_std=IMAGENET_STANDARD_STD,
do_convert_rgb=True,
):
size = size if size is not None else {"longest_edge": 20, "shortest_edge": 10}
self.parent = parent
self.batch_size = batch_size
self.num_frames = num_frames
self.num_channels = num_channels
self.min_resolution = min_resolution
self.max_resolution = max_resolution
self.do_resize = do_resize
self.size = size
self.do_normalize = do_normalize
self.image_mean = image_mean
self.image_std = image_std
self.do_convert_rgb = do_convert_rgb
self.temporal_patch_size = temporal_patch_size
self.patch_size = patch_size
self.merge_size = merge_size
def prepare_video_processor_dict(self):
return {
"do_resize": self.do_resize,
"size": self.size,
"do_normalize": self.do_normalize,
"image_mean": self.image_mean,
"image_std": self.image_std,
"do_convert_rgb": self.do_convert_rgb,
"do_sample_frames": True,
}
def prepare_video_metadata(self, videos):
video_metadata = []
for video in videos:
if isinstance(video, list):
num_frames = len(video)
elif hasattr(video, "shape"):
if len(video.shape) == 4: # (T, H, W, C)
num_frames = video.shape[0]
else:
num_frames = 1
else:
num_frames = self.num_frames
metadata = {
"fps": 2,
"duration": num_frames / 2,
"total_num_frames": num_frames,
}
video_metadata.append(metadata)
return video_metadata
def expected_output_video_shape(self, videos):
grid_t = self.num_frames // self.temporal_patch_size
hidden_dim = self.num_channels * self.temporal_patch_size * self.patch_size * self.patch_size
seq_len = 0
for video in videos:
if isinstance(video, list) and isinstance(video[0], Image.Image):
video = np.stack([np.array(frame) for frame in video])
elif hasattr(video, "shape"):
pass
else:
video = np.array(video)
if hasattr(video, "shape") and len(video.shape) >= 3:
if len(video.shape) == 4:
t, height, width = video.shape[:3]
elif len(video.shape) == 3:
height, width = video.shape[:2]
t = 1
else:
t, height, width = self.num_frames, self.min_resolution, self.min_resolution
else:
t, height, width = self.num_frames, self.min_resolution, self.min_resolution
resized_height, resized_width = smart_resize(
t,
height,
width,
factor=self.patch_size * self.merge_size,
min_pixels=self.size["shortest_edge"],
max_pixels=self.size["longest_edge"],
)
grid_h, grid_w = resized_height // self.patch_size, resized_width // self.patch_size
seq_len += grid_t * grid_h * grid_w
return [seq_len, hidden_dim]
def prepare_video_inputs(self, equal_resolution=False, return_tensors="pil"):
videos = prepare_video_inputs(
batch_size=self.batch_size,
num_frames=self.num_frames,
num_channels=self.num_channels,
min_resolution=self.min_resolution,
max_resolution=self.max_resolution,
equal_resolution=equal_resolution,
return_tensors=return_tensors,
)
return videos
@require_torch
@require_vision
class Qwen3VLVideoProcessingTest(VideoProcessingTestMixin, unittest.TestCase):
fast_video_processing_class = Qwen3VLVideoProcessor if is_torchvision_available() else None
input_name = "pixel_values_videos"
def setUp(self):
super().setUp()
self.video_processor_tester = Qwen3VLVideoProcessingTester(self)
@property
def video_processor_dict(self):
return self.video_processor_tester.prepare_video_processor_dict()
def test_video_processor_from_dict_with_kwargs(self):
video_processor = self.fast_video_processing_class.from_dict(self.video_processor_dict)
self.assertEqual(video_processor.size, {"longest_edge": 20, "shortest_edge": 10})
video_processor = self.fast_video_processing_class.from_dict(
self.video_processor_dict, size={"longest_edge": 42, "shortest_edge": 42}
)
self.assertEqual(video_processor.size, {"longest_edge": 42, "shortest_edge": 42})
def test_call_pil(self):
for video_processing_class in self.video_processor_list:
video_processing = video_processing_class(**self.video_processor_dict)
video_inputs = self.video_processor_tester.prepare_video_inputs(
equal_resolution=False, return_tensors="pil"
)
for video in video_inputs:
self.assertIsInstance(video[0], Image.Image)
video_metadata = self.video_processor_tester.prepare_video_metadata(video_inputs)
encoded_videos = video_processing(
video_inputs[0], video_metadata=[video_metadata[0]], return_tensors="pt"
)[self.input_name]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape([video_inputs[0]])
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
encoded_videos = video_processing(video_inputs, video_metadata=video_metadata, return_tensors="pt")[
self.input_name
]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape(video_inputs)
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
def test_call_numpy(self):
for video_processing_class in self.video_processor_list:
video_processing = video_processing_class(**self.video_processor_dict)
video_inputs = self.video_processor_tester.prepare_video_inputs(
equal_resolution=False, return_tensors="np"
)
video_metadata = self.video_processor_tester.prepare_video_metadata(video_inputs)
encoded_videos = video_processing(
video_inputs[0], video_metadata=[video_metadata[0]], return_tensors="pt"
)[self.input_name]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape([video_inputs[0]])
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
encoded_videos = video_processing(video_inputs, video_metadata=video_metadata, return_tensors="pt")[
self.input_name
]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape(video_inputs)
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
def test_call_pytorch(self):
for video_processing_class in self.video_processor_list:
video_processing = video_processing_class(**self.video_processor_dict)
video_inputs = self.video_processor_tester.prepare_video_inputs(
equal_resolution=False, return_tensors="pt"
)
video_metadata = self.video_processor_tester.prepare_video_metadata(video_inputs)
encoded_videos = video_processing(
video_inputs[0], video_metadata=[video_metadata[0]], return_tensors="pt"
)[self.input_name]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape([video_inputs[0]])
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
encoded_videos = video_processing(video_inputs, video_metadata=video_metadata, return_tensors="pt")[
self.input_name
]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape(video_inputs)
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
@unittest.skip("Skip for now, the test needs adjustment for Qwen3VL")
def test_call_numpy_4_channels(self):
for video_processing_class in self.video_processor_list:
# Test that can process videos which have an arbitrary number of channels
# Initialize video_processing
video_processor = video_processing_class(**self.video_processor_dict)
# create random numpy tensors
self.video_processor_tester.num_channels = 4
video_inputs = self.video_processor_tester.prepare_video_inputs(
equal_resolution=False, return_tensors="np"
)
# Test not batched input
encoded_videos = video_processor(
video_inputs[0],
return_tensors="pt",
input_data_format="channels_last",
image_mean=0,
image_std=1,
)[self.input_name]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape([video_inputs[0]])
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
# Test batched
encoded_videos = video_processor(
video_inputs,
return_tensors="pt",
input_data_format="channels_last",
image_mean=0,
image_std=1,
)[self.input_name]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape(video_inputs)
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
def test_nested_input(self):
"""Tests that the processor can work with nested list where each video is a list of arrays"""
for video_processing_class in self.video_processor_list:
video_processing = video_processing_class(**self.video_processor_dict)
video_inputs = self.video_processor_tester.prepare_video_inputs(
equal_resolution=False, return_tensors="np"
)
video_inputs_nested = [list(video) for video in video_inputs]
video_metadata = self.video_processor_tester.prepare_video_metadata(video_inputs)
# Test not batched input
encoded_videos = video_processing(
video_inputs_nested[0], video_metadata=[video_metadata[0]], return_tensors="pt"
)[self.input_name]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape([video_inputs[0]])
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
# Test batched
encoded_videos = video_processing(video_inputs_nested, video_metadata=video_metadata, return_tensors="pt")[
self.input_name
]
expected_output_video_shape = self.video_processor_tester.expected_output_video_shape(video_inputs)
self.assertEqual(list(encoded_videos.shape), expected_output_video_shape)
def test_call_sample_frames(self):
for video_processing_class in self.video_processor_list:
video_processor_dict = self.video_processor_dict.copy()
video_processing = video_processing_class(**video_processor_dict)
prev_num_frames = self.video_processor_tester.num_frames
self.video_processor_tester.num_frames = 8
prev_min_resolution = getattr(self.video_processor_tester, "min_resolution", None)
prev_max_resolution = getattr(self.video_processor_tester, "max_resolution", None)
self.video_processor_tester.min_resolution = 56
self.video_processor_tester.max_resolution = 112
video_inputs = self.video_processor_tester.prepare_video_inputs(
equal_resolution=False,
return_tensors="torch",
)
metadata = [[{"total_num_frames": 8, "fps": 4}]]
batched_metadata = metadata * len(video_inputs)
encoded_videos = video_processing(video_inputs[0], return_tensors="pt", video_metadata=metadata)[
self.input_name
]
encoded_videos_batched = video_processing(
video_inputs, return_tensors="pt", video_metadata=batched_metadata
)[self.input_name]
self.assertIsNotNone(encoded_videos)
self.assertIsNotNone(encoded_videos_batched)
self.assertEqual(len(encoded_videos.shape), 2)
self.assertEqual(len(encoded_videos_batched.shape), 2)
self.video_processor_tester.num_frames = prev_num_frames
if prev_min_resolution is not None:
self.video_processor_tester.min_resolution = prev_min_resolution
if prev_max_resolution is not None:
self.video_processor_tester.max_resolution = prev_max_resolution