[Model] Support pooling models (#3122)

### What this PR does / why we need it?

Support pooling models (like `bge-reranker-v2-m3`) in vllm-ascend, this
pr covered the three model types of embed (cls_token, mean_token,
lasttoken).

After this
[commit](17373dcd93),
vllm has provided support for adapting pooling models on the v1 engine.
This PR includes corresponding adaptations on the vllm-ascend side.

Fixes #1960

- vLLM version: v0.12.0
- vLLM main:
ad32e3e19c

---------

Signed-off-by: lianyibo <lianyibo1@kunlunit.com>
Signed-off-by: MengqingCao <cmq0113@163.com>
Co-authored-by: MengqingCao <cmq0113@163.com>
This commit is contained in:
lianyibo
2025-12-10 11:37:57 +08:00
committed by GitHub
parent 1a7a34c5ec
commit e32014ac1d
17 changed files with 577 additions and 338 deletions

View File

@@ -35,7 +35,6 @@ class AttentionMaskBuilder:
self.attn_mask_cache = None
self._seq_len_cached = 0
self.device = device
self.pooling_mask = None
self.mla_mask = None
self.chunked_prefill_attn_mask = None
self.pcp_mla_mask = None
@@ -50,14 +49,6 @@ class AttentionMaskBuilder:
return self.attn_mask_cache[:max_seq_len, :max_seq_len].contiguous(
).to(self.device, non_blocking=True)
def get_pooling_mask(self):
if self.pooling_mask is None:
# the compressed attention mask for npu_fusion_attention sparse mode 4
self.pooling_mask = torch.triu(torch.ones(
2048, 2048), diagonal=1).to(torch.bool).to(self.device,
non_blocking=True)
return self.pooling_mask
def get_splitfuse_attn_mask(self) -> torch.Tensor:
if self.chunked_prefill_attn_mask is None:
self.chunked_prefill_attn_mask = torch.triu(

View File

@@ -221,6 +221,10 @@ class AscendMetadata:
# dcp
decode_meta: Optional[AscendMetadataForDecode] = None
# Whether is the pooling model with causal attention,
# used to guide the attention computation for pooling models.
is_causal_pooling: Optional[bool] = None
class AscendAttentionMetadataBuilder:
# Does this backend/builder support ACL Graphs for attention (default: no).
@@ -319,6 +323,10 @@ class AscendAttentionMetadataBuilder:
query_start_loc = query_start_loc_cpu.pin_memory().to(
self.device, non_blocking=True)
is_causal_pooling = None
if self.model_config.runner_type == "pooling":
is_causal_pooling = common_attn_metadata.causal if hasattr(
common_attn_metadata, 'causal') else True
attn_metadata = AscendMetadata(
num_actual_tokens=num_actual_tokens,
@@ -336,7 +344,8 @@ class AscendAttentionMetadataBuilder:
attn_mask=attn_mask,
attn_state=attn_state,
num_prefills=num_prefills,
num_decodes=num_decodes)
num_decodes=num_decodes,
is_causal_pooling=is_causal_pooling)
return attn_metadata
def build_for_graph_capture(
@@ -597,30 +606,39 @@ class AscendAttentionBackendImpl(AttentionImpl):
out=output)
return output
def _forward_encode(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attn_metadata: AscendMetadata,
output: torch.Tensor,
) -> torch.Tensor:
cum_seq_len = attn_metadata.query_start_loc[1:].tolist()
output = torch_npu.npu_fusion_attention(
query,
key,
value,
head_num=self.num_heads,
input_layout="TND",
scale=self.scale,
sparse_mode=4,
atten_mask=attn_metadata.attn_mask,
pre_tockens=attn_metadata.max_query_len,
next_tockens=attn_metadata.max_query_len,
actual_seq_qlen=cum_seq_len,
actual_seq_kvlen=cum_seq_len,
)[0]
return output
def _forward_encoder_attention(self, query: torch.Tensor,
key: torch.Tensor, value: torch.Tensor,
attn_metadata: AscendMetadata,
_: torch.Tensor) -> torch.Tensor:
assert attn_metadata is not None
assert attn_metadata.is_causal_pooling is not None
if attn_metadata.is_causal_pooling:
# use sparse_mode 3 in causal scenario
return torch_npu.npu_fusion_attention(
query=query,
key=key,
value=value,
head_num=self.num_heads,
input_layout="TND",
scale=self.scale,
sparse_mode=3,
atten_mask=attn_metadata.attn_mask,
actual_seq_qlen=attn_metadata.actual_seq_lengths_q,
actual_seq_kvlen=attn_metadata.actual_seq_lengths_q,
)[0]
else:
# use default sparse_mode 0 in normal scenario, which means no mask works on it
return torch_npu.npu_fusion_attention(
query=query,
key=key,
value=value,
head_num=self.num_heads,
input_layout="TND",
scale=self.scale,
actual_seq_qlen=attn_metadata.actual_seq_lengths_q,
actual_seq_kvlen=attn_metadata.actual_seq_lengths_q,
)[0]
def reshape_and_cache(
self,
@@ -697,18 +715,22 @@ class AscendAttentionBackendImpl(AttentionImpl):
" for AscendAttentionBackendImpl")
assert layer._k_scale_float == 1.0 and layer._v_scale_float == 1.0
if self.attn_type != AttentionType.DECODER and self.attn_type != AttentionType.ENCODER_ONLY:
raise NotImplementedError("Encoder/decoder cross-attention "
"are not implemented for "
attn_type = self.attn_type
if attn_type not in [
AttentionType.DECODER, AttentionType.ENCODER_ONLY
]:
raise NotImplementedError("Encoder/Decoder cross-attention "
"is not implemented for "
"PallasAttentionBackendImpl")
num_tokens = query.shape[0]
if attn_metadata is None:
return output.fill_(0)
key, value = self.reshape_and_cache(key, value, kv_cache,
attn_metadata)
if self.attn_type == AttentionType.ENCODER_ONLY:
attn_output = self._forward_encode(query, key, value,
attn_metadata, output)
# pooling model branch
if isinstance(attn_metadata.is_causal_pooling, bool):
attn_output = self._forward_encoder_attention(
query, key, value, attn_metadata, output)
output[:num_tokens] = attn_output[:num_tokens]
return output
output = self.forward_impl(query, key, value, kv_cache, attn_metadata,

View File

@@ -106,16 +106,7 @@
#
# ** File: worker/patch_roberta.py **
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 1. `vllm.model_executor.models.roberta.RobertaEmbedding.forward`
# Why:
# shift operation in `_encode_token_type_ids` and `_decode_token_type_ids` cannot run in ascend aclgraph mode
# How
# Replace shift operation with multiplication and division.
# Related PR (if no, explain why):
# No, this need CANN add an aclnn shift operation
# Future Plan:
# Revert this when CANN support shift aclnn operation
# 2. `vllm.model_executor.models.roberta.RobertaForSequenceClassification.forward `
# 1. `vllm.model_executor.models.bert `
# Why:
# shift operation in `_encode_token_type_ids` and `_decode_token_type_ids` cannot run in ascend aclgraph mode
# How

View File

@@ -22,9 +22,9 @@ if HAS_TRITON:
# isort: off
import vllm_ascend.patch.platform.patch_sched_yield # noqa
import vllm_ascend.patch.worker.patch_bert # noqa
import vllm_ascend.patch.worker.patch_distributed # noqa
import vllm_ascend.patch.worker.patch_deepseek # noqa
import vllm_ascend.patch.worker.patch_roberta # noqa
import vllm_ascend.patch.worker.patch_weight_loader # noqa
import vllm_ascend.patch.worker.patch_multimodal_merge # noqa
import vllm_ascend.patch.worker.patch_minicpm # noqa

View File

@@ -0,0 +1,45 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the vllm-ascend project.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import torch
from vllm.model_executor.models import bert
# aclgraph does not support shift operator for now
# TODO: revert me when aclgraph supports shift operator
TOKEN_TYPE_SHIFT = 30
TOKEN_TYPE_MULTIPLIER = 1 << 30
TOKEN_MASK = TOKEN_TYPE_MULTIPLIER - 1
def _encode_token_type_ids(input_ids: torch.Tensor,
token_type_ids: torch.Tensor) -> None:
# input_ids can be padded to the right
input_ids[:token_type_ids.shape[0]].bitwise_or_(token_type_ids *
TOKEN_TYPE_MULTIPLIER)
def _decode_token_type_ids(input_ids: torch.Tensor) -> torch.Tensor:
token_type_ids = input_ids // TOKEN_TYPE_MULTIPLIER
input_ids.bitwise_and_(TOKEN_MASK)
return token_type_ids
bert._encode_token_type_ids = _encode_token_type_ids
bert._decode_token_type_ids = _decode_token_type_ids

View File

@@ -1,91 +0,0 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the vllm-ascend project.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Optional, Union
import torch
from vllm.model_executor.models.roberta import (
RobertaEmbedding, RobertaForSequenceClassification,
replace_roberta_positions)
from vllm.sequence import IntermediateTensors
# aclgraph does not support shift operator for now
# TODO: revert me when aclgraph supports shift operator
TOKEN_TYPE_SHIFT = 30
TOKEN_TYPE_MULTIPLIER = 1 << 30
TOKEN_MASK = TOKEN_TYPE_MULTIPLIER - 1
def _encode_token_type_ids(input_ids: torch.Tensor,
token_type_ids: torch.Tensor) -> None:
# input_ids can be padded to the right
input_ids[:token_type_ids.shape[0]].bitwise_or_(token_type_ids *
TOKEN_TYPE_MULTIPLIER)
def _decode_token_type_ids(input_ids: torch.Tensor) -> torch.Tensor:
token_type_ids = input_ids // TOKEN_TYPE_MULTIPLIER
input_ids.bitwise_and_(TOKEN_MASK)
return token_type_ids
def roberta_for_sequence_classification_forward(
self,
input_ids: Optional[torch.Tensor],
positions: torch.Tensor,
intermediate_tensors: Optional[IntermediateTensors] = None,
inputs_embeds: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
) -> torch.Tensor:
replace_roberta_positions(input_ids=input_ids,
position_ids=positions,
padding_idx=self.padding_idx)
if token_type_ids is not None:
assert self.roberta.config.vocab_size < (1 << TOKEN_TYPE_SHIFT)
assert input_ids is not None
_encode_token_type_ids(input_ids, token_type_ids)
return self.roberta(input_ids=input_ids,
positions=positions,
inputs_embeds=inputs_embeds,
intermediate_tensors=intermediate_tensors)
def roberta_embedding_forward(
self,
input_ids: torch.Tensor,
position_ids: torch.Tensor,
inputs_embeds: Union[torch.Tensor, None] = None,
) -> torch.Tensor:
token_type_ids = _decode_token_type_ids(input_ids)
if inputs_embeds is None:
inputs_embeds = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
token_type_embeddings = self.token_type_embeddings(token_type_ids)
embeddings = inputs_embeds + token_type_embeddings + position_embeddings
embeddings = self.LayerNorm(embeddings)
return embeddings
RobertaEmbedding.forward = roberta_embedding_forward
RobertaForSequenceClassification.forward = roberta_for_sequence_classification_forward

View File

@@ -377,6 +377,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
self.block_size,
use_mla=self.model_config.use_mla,
use_sparse=self.use_sparse)
self.attn_mask_builder = AttentionMaskBuilder(self.device)
self._set_up_drafter()
@@ -1029,8 +1030,8 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
if self.attn_mask_builder is None:
raise ValueError("Attn mask builder is None")
# Pooling situation.
if self.model_config.runner_type == "pooling" and self.model_config.pooler_config.pooling_type == "CLS":
return self.attn_mask_builder.get_pooling_mask()
if self.model_config.runner_type == "pooling":
return self.attn_mask_builder.get_attn_mask(2048, torch.bool)
if self.vllm_config.model_config.use_mla:
if self.pcp_size > 1:
@@ -1933,8 +1934,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
common_prefix_len = 0
extra_attn_metadata_args = {}
builder = attn_group.get_metadata_builder()
if isinstance(builder, GDNAttentionMetadataBuilder
) or self.model_config.runner_type == "pooling":
if isinstance(builder, GDNAttentionMetadataBuilder):
if use_spec_decode:
extra_attn_metadata_args = dict(
num_accepted_tokens=self.num_accepted_tokens.
@@ -1946,6 +1946,11 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
common_prefix_len=common_prefix_len,
common_attn_metadata=common_attn_metadata,
**extra_attn_metadata_args)
elif self.model_config.runner_type == "pooling":
attn_metadata_i = builder.build(
common_prefix_len=common_prefix_len,
common_attn_metadata=common_attn_metadata,
**extra_attn_metadata_args)
else:
attn_metadata_i = builder.build(
common_prefix_len=common_prefix_len,
@@ -1968,18 +1973,52 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
input_ids, inputs_embeds, intermediate_tensors,
max_num_scheduled_tokens)
def _init_model_kwargs(self):
model_kwargs = dict[str, Any]()
num_reqs = self.input_batch.num_reqs
num_pooling_reqs = len(self.input_batch.pooling_params)
if num_pooling_reqs == 0:
return model_kwargs
pooling_params = self.input_batch.get_pooling_params()
assert num_pooling_reqs == num_reqs
token_type_id_requests = dict[int, Any]()
for i, param in enumerate(pooling_params):
if param.extra_kwargs is not None and \
(token_types := param.extra_kwargs.get(
"compressed_token_type_ids")) is not None:
token_type_id_requests[i] = token_types
if len(token_type_id_requests) == 0:
return model_kwargs
seq_lens = self.seq_lens[:num_reqs]
token_type_ids = []
for i in range(num_reqs):
pos = token_type_id_requests.get(i, seq_lens[i])
ids = (torch.arange(seq_lens[i]) >= pos).int()
token_type_ids.append(ids)
model_kwargs["token_type_ids"] = torch.concat(token_type_ids).to(
device=self.device)
return model_kwargs
def _generate_process_reqs_hidden_states(self, attn_metadata, with_prefill,
maybe_padded_num_tokens,
input_ids, positions,
intermediate_tensors,
inputs_embeds):
assert self.model is not None
hidden_states = self.model(
input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
)
hidden_states = self.model(input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
**self._init_model_kwargs())
forward_context = get_forward_context()
if forward_context.cudagraph_runtime_mode == CUDAGraphMode.FULL \
@@ -2022,7 +2061,14 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
def _build_attn_state(self, num_reqs, num_scheduled_tokens,
num_valid_tokens):
if np.array_equal(self.seq_lens_np[:num_reqs], num_scheduled_tokens):
if self.model_config.runner_type == "pooling":
if isinstance(
self.kv_cache_config.kv_cache_groups[0].kv_cache_spec,
EncoderOnlyAttentionSpec):
attn_state = AscendAttentionState.PrefillNoCache
else:
attn_state = AscendAttentionState.PrefillCacheHit
elif np.array_equal(self.seq_lens_np[:num_reqs], num_scheduled_tokens):
attn_state = AscendAttentionState.PrefillNoCache
# We assume it is the decode stage, where prefill occurs but only one token is not hit in cache.
elif np.all(num_scheduled_tokens == 1):
@@ -2251,7 +2297,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
" a batch must be pooling request"
hidden_states = hidden_states[:num_scheduled_tokens]
pooling_metadata = self.input_batch.pooling_metadata
pooling_metadata = self.input_batch.get_pooling_metadata()
pooling_metadata.build_pooling_cursor(num_scheduled_tokens_np.tolist(),
device=hidden_states.device)
seq_lens_cpu = self.seq_lens_cpu[:self.input_batch.num_reqs]
@@ -4049,6 +4095,15 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
desc="Capturing ACL graphs ({}, {})".format(
"decode" if uniform_decode else "mixed prefill-decode",
aclgraph_runtime_mode.name))
force_attention = (aclgraph_runtime_mode == CUDAGraphMode.FULL)
# When the kv cache spec is empty, PiecewiseBackend is not initialized, and
# compilation_case=1 will cause the dynamic shape position to be incorrectly derived.
if not self.get_kv_cache_spec():
self._dummy_run(2,
aclgraph_runtime_mode=CUDAGraphMode.NONE,
force_attention=force_attention,
uniform_decode=uniform_decode)
# We skip EPLB here since we don't want to record dummy metrics
for num_tokens in compilation_cases:
for _ in range(self.compilation_config.cudagraph_num_of_warmups):
@@ -4057,7 +4112,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):
# if we want to warm up attention or not. This is
# different from the case where `FULL` implies capture
# attention while `PIECEWISE` implies no attention.
force_attention = (aclgraph_runtime_mode == CUDAGraphMode.FULL)
self._dummy_run(num_tokens,
aclgraph_runtime_mode=CUDAGraphMode.NONE,
force_attention=force_attention,

View File

@@ -793,17 +793,12 @@ class InputBatch:
logitsprocs=self.logitsprocs,
)
@property
def pooling_metadata(self) -> PoolingMetadata:
if len(self.pooling_params) == 0:
pooling_params = []
else:
# Note, for now this assumes that all request in the batch
# are either sampling or pooling requests
assert len(self.req_ids) == len(self.pooling_params)
pooling_params = [
self.pooling_params[req_id] for req_id in self.req_ids
]
def get_pooling_params(self) -> list[PoolingParams]:
assert len(self.req_ids) == len(self.pooling_params)
return [self.pooling_params[req_id] for req_id in self.req_ids]
def get_pooling_metadata(self) -> PoolingMetadata:
pooling_params = self.get_pooling_params()
return PoolingMetadata(
prompt_lens=torch.from_numpy(