"""Inference-only Qwen3VL model compatible with HuggingFace weights.""" from typing import Any, Callable, Optional, Union import torch import torch.nn as nn import torch.nn.functional as F from vllm.model_executor.models.interfaces import (MultiModalEmbeddings, SupportsLoRA, SupportsMultiModal, SupportsPP) from vllm.model_executor.models.utils import (AutoWeightsLoader, PPMissingLayer, WeightsMapper, maybe_prefix, merge_multimodal_embeddings) from vllm.logger import init_logger from .hf_processor.qwenvl_processor import Qwen3VLProcessorWithVacc from .hf_processor.qwen2vl_image_processor import Qwen2VLImageProcessorFastWithVacc from vllm.distributed import (get_tp_group, tensor_model_parallel_all_reduce) from .vars import USE_FUSED_QWEN_ATTENTION # from vacc_tools.trace_logger import get_trace_api # trace_time, register_module_trace, trace_autograd_function, register_optimizer_trace = ( # get_trace_api("Qwen3vl") # ) logger = init_logger(__name__) class Qwen3_VisionPatchEmbed(nn.Module): def forward(self, x: torch.Tensor) -> torch.Tensor: if hasattr(self.proj, 'bias') and self.proj.bias is not None: return torch.nn.functional.linear(x, self.proj.weight.view(self.hidden_size, -1), self.proj.bias) return torch.matmul(x, self.proj.weight.view(self.hidden_size, -1).T) class Qwen3_VisionBlock(nn.Module): def forward( self, x: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor | list[torch.Tensor], max_seqlen: Optional[int] = None, # Only used for Flash Attention seqlens: Optional[list[int]] = None, # Only used for xFormers ) -> torch.Tensor: if USE_FUSED_QWEN_ATTENTION: assert isinstance(rotary_pos_emb, list), "qwen3vl vit-attention need rotary_pos_emb is list[torch.Tensor]" total_bytes = x.numel() * x.element_size() * get_tp_group().world_size reduce_result = get_tp_group().world_size > 1 and total_bytes < 4194304 # hidden_states = self.norm1(x) attn_outs = torch.vacc.fuse_atten_vit( hidden_states=x.view(-1, x.shape[-1]), hidden_states_norm_weight = self.norm1.weight, hidden_states_norm_bias = self.norm1.bias, # hidden_states_norm_weight = torch.Tensor(), # hidden_states_norm_bias = torch.Tensor(), qkv_proj_weight=self.attn.qkv.weight, qkv_proj_bias=self.attn.qkv.bias, sin_cache=rotary_pos_emb[0], cos_cache=rotary_pos_emb[1], o_proj_weight=self.attn.proj.weight, o_proj_bias=self.attn.proj.bias if self.attn.proj.tp_rank == 0 else torch.Tensor(), seq_lens=cu_seqlens, sm_scale=-1, num_attention_heads=self.attn.num_attention_heads_per_partition * get_tp_group().world_size, flash_attention=True, reduce_result=reduce_result, world_size=get_tp_group().world_size, rank=get_tp_group().rank_in_group, group_id=get_tp_group().group_id, dev_info=get_tp_group().rank_device_infos ) attn_out = attn_outs[0] if reduce_result else tensor_model_parallel_all_reduce(attn_outs[0]) attn_out = attn_out.view(x.shape) x = x + attn_out else: x = x + self.attn(self.norm1(x), cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb, max_seqlen=max_seqlen, seqlens=seqlens) x = x + self.mlp(self.norm2(x)) return x class Qwen3_VisionTransformer(nn.Module): def rot_pos_emb(self, grid_thw): if USE_FUSED_QWEN_ATTENTION: try: from torch_vacc.vacc.custom_qwen3_ops import rot_pos_emb_qwenvl return rot_pos_emb_qwenvl(grid_thw, self.hidden_size, self.num_heads, self.spatial_merge_size, self.dtype, self.device) except Exception as e: logger.error(f"rot_pos_emb fused ops run fail, e:{e}") pos_ids = [] # Support both Tensor and list inputs for DP path if isinstance(grid_thw, list): grid_list = grid_thw max_grid_size = max(max(h, w) for _, h, w in grid_list) else: grid_list = grid_thw.tolist() max_grid_size = int(grid_thw[:, 1:].max().item()) for t, h, w in grid_list: hpos_ids = torch.arange(h).unsqueeze(1).expand(-1, w) hpos_ids = hpos_ids.reshape( h // self.spatial_merge_size, self.spatial_merge_size, w // self.spatial_merge_size, self.spatial_merge_size, ) hpos_ids = hpos_ids.permute(0, 2, 1, 3) hpos_ids = hpos_ids.flatten() wpos_ids = torch.arange(w).unsqueeze(0).expand(h, -1) wpos_ids = wpos_ids.reshape( h // self.spatial_merge_size, self.spatial_merge_size, w // self.spatial_merge_size, self.spatial_merge_size, ) wpos_ids = wpos_ids.permute(0, 2, 1, 3) wpos_ids = wpos_ids.flatten() pos_ids.append( torch.stack([hpos_ids, wpos_ids], dim=-1).repeat(t, 1)) pos_ids = torch.cat(pos_ids, dim=0) rotary_pos_emb_full = self.rotary_pos_emb(max_grid_size) rotary_pos_emb = rotary_pos_emb_full[pos_ids].flatten(1) return rotary_pos_emb def fast_pos_embed_interpolate(self, grid_thw: list[list[int]]) -> torch.Tensor: num_grid_per_side = self.num_grid_per_side m_size = self.spatial_merge_size hidden_dim = self.pos_embed.embedding_dim try: from torch_vacc.vacc.custom_qwen3_ops import fast_pos_embed_interpolate_qwenvl return fast_pos_embed_interpolate_qwenvl(self.pos_embed.weight, grid_thw, num_grid_per_side, m_size, hidden_dim) except Exception as e: logger.error(f"fast_pos_embed_interpolate fused ops run fail, e:{e}") outputs = [] for t, h, w in grid_thw: h_idxs = torch.linspace(0, num_grid_per_side - 1, h, dtype=torch.float32, device=self.device) w_idxs = torch.linspace(0, num_grid_per_side - 1, w, dtype=torch.float32, device=self.device) h_floor = h_idxs.to(torch.long) w_floor = w_idxs.to(torch.long) h_ceil = torch.clamp(h_floor + 1, max=num_grid_per_side - 1) w_ceil = torch.clamp(w_floor + 1, max=num_grid_per_side - 1) dh = h_idxs - h_floor dw = w_idxs - w_floor # Create meshgrid view for all h, w vars dh_grid, dw_grid = torch.meshgrid(dh, dw, indexing='ij') h_floor_grid, w_floor_grid = torch.meshgrid(h_floor, w_floor, indexing='ij') h_ceil_grid, w_ceil_grid = torch.meshgrid(h_ceil, w_ceil, indexing='ij') h_floor_grid_idx = h_floor_grid * num_grid_per_side h_ceil_grid_idx = h_ceil_grid * num_grid_per_side # original computation of weights # w00 = (1 - dh_grid) * (1 - dw_grid) # w01 = (1 - dh_grid) * dw_grid # w10 = dh_grid * (1 - dw_grid) # w11 = dh_grid * dw_grid # we reuse w11 here to avoid duplicate # dh_grid * dw_grid computation w11 = dh_grid * dw_grid w10 = dh_grid - w11 w01 = dw_grid - w11 w00 = 1 - dh_grid - dw_grid + w11 idx00 = h_floor_grid_idx + w_floor_grid idx01 = h_floor_grid_idx + w_ceil_grid idx10 = h_ceil_grid_idx + w_floor_grid idx11 = h_ceil_grid_idx + w_ceil_grid indices = torch.stack([idx00, idx01, idx10, idx11], dim=0).reshape(4, -1) weights = torch.stack([w00, w01, w10, w11], dim=0).reshape(4, -1, 1) weights = weights.to(dtype=self.dtype, device=self.device) embeds = self.pos_embed(indices) weighted_embeds = embeds * weights p0, p1, p2, p3 = weighted_embeds.unbind(dim=0) combined = p0 + p1 + p2 + p3 combined = combined.view(h * w, hidden_dim) repeated = combined.unsqueeze(0).expand(t, -1, -1).contiguous() repeated = repeated.view(t, h // m_size, m_size, w // m_size, m_size, hidden_dim) repeated = repeated.permute(0, 1, 3, 2, 4, 5).reshape(-1, hidden_dim) outputs.append(repeated) return torch.cat(outputs, dim=0) def forward( self, x: torch.Tensor, grid_thw: list[list[int]], ) -> torch.Tensor: hidden_states = x.to(device=self.device, dtype=self.dtype) hidden_states = self.patch_embed(hidden_states) pos_embeds = self.fast_pos_embed_interpolate(grid_thw) hidden_states = hidden_states + pos_embeds rotary_pos_emb = self.rot_pos_emb(grid_thw) grid_thw_tensor = torch.tensor(grid_thw, dtype=torch.int32) cu_seqlens = torch.repeat_interleave( grid_thw_tensor[:, 1] * grid_thw_tensor[:, 2], grid_thw_tensor[:, 0]).cumsum( dim=0, dtype=grid_thw_tensor.dtype if torch.jit.is_tracing() else torch.int32, ) cu_seqlens = F.pad(cu_seqlens, (1, 0), value=0) hidden_states = hidden_states.unsqueeze(1) if isinstance(rotary_pos_emb, torch.Tensor): rotary_pos_emb = rotary_pos_emb.to(hidden_states.device) if USE_FUSED_QWEN_ATTENTION: max_seqlen, seqlens = None, None cu_seqlens = cu_seqlens.tolist() else: max_seqlen, seqlens = self.compute_attn_mask_seqlen(cu_seqlens) deepstack_feature_lists = [] for layer_num, blk in enumerate(self.blocks): hidden_states = blk(hidden_states, cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb, max_seqlen=max_seqlen, seqlens=seqlens) if layer_num in self.deepstack_visual_indexes: deepstack_merger_idx = self.deepstack_visual_indexes.index( layer_num) deepstack_feature = self.deepstack_merger_list[ deepstack_merger_idx](hidden_states) deepstack_feature_lists.append(deepstack_feature) hidden_states = self.merger(hidden_states) hidden_states = torch.cat( [hidden_states] + deepstack_feature_lists, dim=1) # [seq_len, hidden_size * (1 + depth_of_deepstack)] return hidden_states class Qwen3VLForConditionalGeneration(nn.Module, SupportsMultiModal, SupportsLoRA, SupportsPP): def get_input_embeddings( self, input_ids: torch.Tensor, multimodal_embeddings: Optional[MultiModalEmbeddings] = None, ) -> torch.Tensor: deepstack_input_embeds = None inputs_embeds = self.language_model.get_input_embeddings(input_ids) if multimodal_embeddings is not None: if self.use_deepstack: deepstack_input_embeds, multimodal_embeddings = self._compute_deepstack_embeds( # noqa:E501 input_ids, inputs_embeds, multimodal_embeddings) self._set_deepstack_input_embeds(deepstack_input_embeds) inputs_embeds = merge_multimodal_embeddings( input_ids, inputs_embeds, multimodal_embeddings, [self.config.image_token_id, self.config.video_token_id]) # commit here to remove deepstack_input_embeds copy # if self.use_deepstack: # if deepstack_input_embeds is None: # deepstack_input_embeds = torch.zeros_like( # inputs_embeds).unsqueeze(0).repeat( # self.deepstack_num_level, 1, 1).contiguous() # self._set_deepstack_input_embeds(deepstack_input_embeds) return inputs_embeds def _clear_deepstack_input_embeds(self, num_tokens: int) -> None: return #patch here to optimize deepstack_input_embeds # clear deepstack_input_embeds in buffer if num_tokens > 0: for idx in range(self.deepstack_num_level): self.deepstack_input_embeds[idx][:num_tokens].zero_() class Qwen3VLProcessingInfo(): def get_hf_processor(self, **kwargs: object) -> Qwen3VLProcessorWithVacc: processor = self.ctx.get_hf_processor( Qwen3VLProcessorWithVacc, use_fast=kwargs.pop("use_fast", True), **kwargs, ) return processor def get_image_processor(self, **kwargs: object) -> Qwen2VLImageProcessorFastWithVacc: return self.get_hf_processor(**kwargs).image_processor # def get_video_processor(self, **kwargs: object) -> Qwen3VLVideoProcessor: # return self.get_hf_processor(**kwargs).video_processor class Qwen3_VisionPatchMerger(): def forward(self, x: torch.Tensor) -> torch.Tensor: if self.use_postshuffle_norm: x = self.norm(x.view(-1, self.hidden_size)) else: x = self.norm(x).view(-1, self.hidden_size) try: from torch_vacc.vacc import patch_merger_vision tp_rank_id = get_tp_group().rank_in_group fc2_bias = None if tp_rank_id > 0 else self.linear_fc2.bias hidden_states = patch_merger_vision(x, self.linear_fc1.weight, self.linear_fc2.weight, self.linear_fc1.bias, fc2_bias, 0) #0 is gelu, 1 is silu return tensor_model_parallel_all_reduce(hidden_states) except Exception as e: logger.error(f"merge patch fused vision mlp run fail, cased by:{e}") x_parallel, _ = self.linear_fc1(x) x_parallel = self.act_fn(x_parallel) out, _ = self.linear_fc2(x_parallel) return out class Qwen3_VisionMLP(): def forward(self, x: torch.Tensor): try: from torch_vacc.vacc import fuse_mlp_vision hiddens_shape = x.shape tp_rank_id = get_tp_group().rank_in_group fc2_bias = None if tp_rank_id > 0 else self.linear_fc2.bias hidden_states = fuse_mlp_vision(x.view(-1, hiddens_shape[-1]), self.linear_fc1.weight, self.linear_fc2.weight, self.linear_fc1.bias, fc2_bias, 0) #0 is gelu, 1 is silu return tensor_model_parallel_all_reduce(hidden_states).view(hiddens_shape) except Exception as e: logger.error(f"qwen3vl fused vision mlp run fail, cased by:{e}") return self.linear_fc2(self.act_fn(self.linear_fc1(x)))