[CI] Remove compatibility maintenance for vllm v0.10.1 and v0.10.1.1 (#2840)

### What this PR does / why we need it?
Remove compatibility maintenance for vllm v0.10.1 and v0.10.1.1

### Does this PR introduce _any_ user-facing change?
branch main of vllm-ascend will not be compatible with vllm v0.10.1 and
v0.10.1.1

### How was this patch tested?
CI passed with existing test.

- vLLM version: v0.10.1.1
- vLLM main:
6fb2788163

---------

Signed-off-by: MengqingCao <cmq0113@163.com>
This commit is contained in:
Mengqing Cao
2025-09-10 08:43:10 +08:00
committed by GitHub
parent 93e28e6862
commit edf1f600ad
22 changed files with 340 additions and 876 deletions

View File

@@ -23,6 +23,7 @@ from vllm.distributed.kv_events import KVEventBatch
from vllm.logger import logger
from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry
from vllm.utils import cdiv
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.core.sched.output import NewRequestData, SchedulerOutput
from vllm.v1.core.sched.scheduler import Scheduler
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutputs
@@ -31,13 +32,6 @@ from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.structured_output import StructuredOutputManager
from vllm_ascend.utils import vllm_version_is
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
else:
KVCacheBlocks = None
class AscendScheduler(Scheduler):
"""This Scheduler extends vllm's original v1 scheduler
@@ -66,10 +60,7 @@ class AscendScheduler(Scheduler):
scheduled_running_reqs: list[Request] = []
preempted_reqs: list[Request] = []
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
req_to_new_block_ids: dict[str, list[list[int]]] = {}
else:
req_to_new_blocks: dict[str, KVCacheBlocks] = {}
req_to_new_blocks: dict[str, KVCacheBlocks] = {}
num_scheduled_tokens: dict[str, int] = {}
token_budget = self.max_num_scheduled_tokens
# Spec decode-related.
@@ -227,13 +218,10 @@ class AscendScheduler(Scheduler):
if self.lora_config and request.lora_request:
scheduled_loras.add(request.lora_request.lora_int_id)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
req_to_new_block_ids[request.request_id] = (
self.kv_cache_manager.get_block_ids(request.request_id))
else:
req_to_new_blocks[
request.request_id] = self.kv_cache_manager.get_blocks(
request.request_id)
req_to_new_blocks[
request.request_id] = self.kv_cache_manager.get_blocks(
request.request_id)
# Update request info.
num_scheduled_tokens[request.request_id] = num_new_tokens
token_budget -= num_new_tokens
@@ -322,11 +310,7 @@ class AscendScheduler(Scheduler):
# Schedule the request.
scheduled_running_reqs.append(request)
self.scheduled_req_ids.add(request.request_id)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
req_to_new_block_ids[request.request_id] = (
new_blocks.get_block_ids())
else:
req_to_new_blocks[request.request_id] = new_blocks
req_to_new_blocks[request.request_id] = new_blocks
num_scheduled_tokens[request.request_id] = num_new_tokens
token_budget -= num_new_tokens
req_index += 1
@@ -365,67 +349,36 @@ class AscendScheduler(Scheduler):
any_request, len(self.running)))
# Construct the scheduler output.
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
new_reqs_data = [
NewRequestData.from_request(
req, req_to_new_block_ids[req.request_id])
for req in scheduled_new_reqs
]
cached_reqs_data = self._make_cached_request_data(
scheduled_running_reqs, scheduled_resumed_reqs,
num_scheduled_tokens, scheduled_spec_decode_tokens,
req_to_new_block_ids)
else:
new_reqs_data = [
NewRequestData.from_request(
req, req_to_new_blocks[req.request_id].get_block_ids())
for req in scheduled_new_reqs
]
new_reqs_data = [
NewRequestData.from_request(
req, req_to_new_blocks[req.request_id].get_block_ids())
for req in scheduled_new_reqs
]
cached_reqs_data = self._make_cached_request_data(
scheduled_running_reqs, scheduled_resumed_reqs,
num_scheduled_tokens, scheduled_spec_decode_tokens,
req_to_new_blocks)
cached_reqs_data = self._make_cached_request_data(
scheduled_running_reqs, scheduled_resumed_reqs,
num_scheduled_tokens, scheduled_spec_decode_tokens,
req_to_new_blocks)
scheduled_cached_reqs = cached_reqs_data
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=scheduled_cached_reqs,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
scheduled_encoder_inputs={},
num_common_prefix_blocks=num_common_prefix_blocks,
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids=self.finished_req_ids, # type: ignore
free_encoder_input_ids=self.encoder_cache_manager.
get_freed_ids(),
structured_output_request_ids={},
grammar_bitmask=None,
)
else:
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=scheduled_cached_reqs,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
scheduled_encoder_inputs={},
num_common_prefix_blocks=num_common_prefix_blocks,
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids=self.finished_req_ids, # type: ignore
free_encoder_mm_hashes=self.encoder_cache_manager.
get_freed_mm_hashes(),
structured_output_request_ids={},
grammar_bitmask=None,
)
scheduler_output = SchedulerOutput(
scheduled_new_reqs=new_reqs_data,
scheduled_cached_reqs=scheduled_cached_reqs,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
scheduled_encoder_inputs={},
num_common_prefix_blocks=num_common_prefix_blocks,
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids=self.finished_req_ids, # type: ignore
free_encoder_mm_hashes=self.encoder_cache_manager.
get_freed_mm_hashes(),
structured_output_request_ids={},
grammar_bitmask=None,
)
# NOTE(Kuntai): this function is designed for multiple purposes:
# 1. Plan the KV cache store

View File

@@ -51,7 +51,6 @@ from vllm.sequence import IntermediateTensors
from vllm_ascend.ops.fused_moe import AscendFusedMoE
from vllm_ascend.ops.sequence_parallel import (MetadataForPadding,
init_metadata_for_sp)
from vllm_ascend.utils import vllm_version_is
class CustomSparseMoeBlock(Qwen3MoeSparseMoeBlock):
@@ -255,11 +254,8 @@ class CustomQwen3MoeModel(Qwen3MoeModel):
quant_config = vllm_config.quant_config
parallel_config = vllm_config.parallel_config
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
self.num_redundant_experts = parallel_config.num_redundant_experts
else:
eplb_config = parallel_config.eplb_config
self.num_redundant_experts = eplb_config.num_redundant_experts
eplb_config = parallel_config.eplb_config
self.num_redundant_experts = eplb_config.num_redundant_experts
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.config = config

View File

@@ -34,7 +34,7 @@ from vllm_ascend.ops.moe.experts_selector import select_experts
from vllm_ascend.ops.moe.moe_comm_method import (AllGatherCommImpl,
AlltoAllCommImpl, MC2CommImpl)
from vllm_ascend.ops.moe.token_dispatcher import setup_token_dispatchers
from vllm_ascend.utils import ACL_FORMAT_FRACTAL_NZ, is_310p, vllm_version_is
from vllm_ascend.utils import ACL_FORMAT_FRACTAL_NZ, is_310p
original_unquantized_fused_moe_init_func = UnquantizedFusedMoEMethod.__init__
@@ -137,67 +137,6 @@ def unquantized_fused_moe_init_func(self, *args, **kwargs):
self.transpose = True
def forward_oot_v01011(
self,
layer: torch.nn.Module,
x: torch.Tensor,
use_grouped_topk: bool,
top_k: int,
router_logits: torch.Tensor,
renormalize: bool,
topk_group: Optional[int] = None,
num_expert_group: Optional[int] = None,
custom_routing_function: Optional[Callable] = None,
scoring_func: str = "softmax",
e_score_correction_bias: Optional[torch.Tensor] = None,
global_num_experts: int = -1,
expert_map: Optional[torch.Tensor] = None,
apply_router_weight_on_input: bool = False,
activation: str = "silu",
enable_eplb: bool = False,
expert_load_view: Optional[torch.Tensor] = None,
logical_to_physical_map: Optional[torch.Tensor] = None,
logical_replica_count: Optional[torch.Tensor] = None) -> torch.Tensor:
topk_weights, topk_ids, row_idx = select_experts(
hidden_states=x,
router_logits=router_logits,
top_k=top_k,
use_grouped_topk=use_grouped_topk,
renormalize=renormalize,
topk_group=topk_group,
num_expert_group=num_expert_group,
custom_routing_function=custom_routing_function,
scoring_func=scoring_func,
routed_scaling_factor=1.0,
e_score_correction_bias=e_score_correction_bias,
global_num_experts=global_num_experts)
if topk_ids.shape[1] < top_k or is_310p():
assert global_num_experts is not None
return fused_experts_moge(
hidden_states=x,
w1=layer.w13_weight,
w2=layer.w2_weight,
moe_parallel_config=self.moe.moe_parallel_config,
topk_weights=topk_weights,
topk_ids=topk_ids,
top_k=top_k,
global_num_experts=global_num_experts,
expert_map=expert_map,
apply_router_weight_on_input=apply_router_weight_on_input)
moe_comm_method = get_forward_context().moe_comm_method
return moe_comm_method.fused_experts(hidden_states=x,
w1=layer.w13_weight,
w2=layer.w2_weight,
topk_weights=topk_weights,
topk_ids=topk_ids,
row_idx=row_idx,
global_num_experts=global_num_experts,
expert_map=expert_map)
def forward_oot(
self,
layer: torch.nn.Module,
@@ -315,59 +254,32 @@ class AscendFusedMoE(FusedMoE):
num_redundant_experts=0,
has_bias=False,
):
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
super().__init__(
num_experts,
top_k,
hidden_size,
intermediate_size,
params_dtype,
reduce_results,
renormalize,
use_grouped_topk,
num_expert_group,
topk_group,
quant_config,
tp_size,
ep_size,
dp_size,
prefix,
custom_routing_function,
scoring_func,
e_score_correction_bias,
apply_router_weight_on_input,
activation,
enable_eplb,
num_redundant_experts,
has_bias,
)
else:
super().__init__(
num_experts,
top_k,
hidden_size,
intermediate_size,
params_dtype,
reduce_results,
renormalize,
use_grouped_topk,
num_expert_group,
topk_group,
quant_config,
tp_size,
ep_size,
dp_size,
prefix,
custom_routing_function,
scoring_func,
routed_scaling_fator,
e_score_correction_bias,
apply_router_weight_on_input,
activation,
enable_eplb,
num_redundant_experts,
has_bias,
)
super().__init__(
num_experts,
top_k,
hidden_size,
intermediate_size,
params_dtype,
reduce_results,
renormalize,
use_grouped_topk,
num_expert_group,
topk_group,
quant_config,
tp_size,
ep_size,
dp_size,
prefix,
custom_routing_function,
scoring_func,
routed_scaling_fator,
e_score_correction_bias,
apply_router_weight_on_input,
activation,
enable_eplb,
num_redundant_experts,
has_bias,
)
setup_token_dispatchers(self.moe_config.ep_size,
top_k=self.top_k,
num_experts=self.global_num_experts,
@@ -529,8 +441,4 @@ class AscendSharedFusedMoE(AscendFusedMoE):
UnquantizedFusedMoEMethod.__init__ = unquantized_fused_moe_init_func
UnquantizedFusedMoEMethod.process_weights_after_loading = process_weights_after_loading
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
UnquantizedFusedMoEMethod.forward_oot = forward_oot_v01011
else:
UnquantizedFusedMoEMethod.forward_oot = forward_oot
UnquantizedFusedMoEMethod.forward_oot = forward_oot

View File

@@ -1,16 +1,12 @@
import torch
import torch_npu
from vllm.config import LogprobsMode
from vllm.v1.sample.ops.topk_topp_sampler import TopKTopPSampler, random_sample
from vllm.v1.sample.sampler import Sampler
from vllm_ascend.utils import is_310p, vllm_version_is
from vllm_ascend.utils import is_310p
if not (vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1")):
from vllm.config import LogprobsMode
DEFAULT_LOGPROBS_MODE = LogprobsMode.RAW_LOGPROBS
else:
LogprobsMode = None
DEFAULT_LOGPROBS_MODE = "raw_logprobs"
DEFAULT_LOGPROBS_MODE = LogprobsMode.RAW_LOGPROBS
class AscendSampler(Sampler):
@@ -68,19 +64,11 @@ class AscendTopKTopPSampler(TopKTopPSampler):
def forward_native(self, logits, generators, k, p):
"""Override pytorch native implementation to torch_npu"""
logits = self._apply_top_k_top_p(logits, k, p)
if not (vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1")):
logits_to_return = None
if self.logprobs_mode == LogprobsMode.PROCESSED_LOGITS:
logits_to_return = logits
elif self.logprobs_mode == LogprobsMode.PROCESSED_LOGPROBS:
logits_to_return = logits.log_softmax(dim=-1,
dtype=torch.float32)
logits_to_return = None
if self.logprobs_mode == LogprobsMode.PROCESSED_LOGITS:
logits_to_return = logits
elif self.logprobs_mode == LogprobsMode.PROCESSED_LOGPROBS:
logits_to_return = logits.log_softmax(dim=-1, dtype=torch.float32)
probs = logits.softmax(dim=-1, dtype=torch.float32)
output = None
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
output = random_sample(probs, generators)
else:
output = (random_sample(probs, generators), logits_to_return)
return output
return random_sample(probs, generators), logits_to_return

View File

@@ -63,8 +63,8 @@ from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, DeviceMemoryProfiler,
from vllm.v1.cudagraph_dispatcher import CudagraphDispatcher
from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig,
KVCacheSpec)
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, LogprobsTensors,
ModelRunnerOutput)
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, DraftTokenIds,
LogprobsTensors, ModelRunnerOutput)
from vllm.v1.pool.metadata import PoolingMetadata
from vllm.v1.sample.logits_processor import build_logitsprocs
from vllm.v1.sample.metadata import SamplingMetadata
@@ -96,14 +96,9 @@ from vllm_ascend.torchair.torchair_mla import AscendMLATorchairMetadata
from vllm_ascend.utils import (ACL_FORMAT_FRACTAL_ND, ACL_FORMAT_FRACTAL_NZ,
AscendSocVersion, ProfileExecuteDuration,
get_ascend_soc_version, is_310p,
lmhead_tp_enable, vllm_version_is)
lmhead_tp_enable)
from vllm_ascend.worker.npu_input_batch import CachedRequestState, InputBatch
if not (vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1")):
from vllm.v1.outputs import DraftTokenIds
else:
DraftTokenIds = None
if TYPE_CHECKING:
import xgrammar as xgr # type: ignore[import-untyped]
from vllm.v1.core.sched.output import SchedulerOutput
@@ -195,9 +190,7 @@ class NPUModelRunner(LoRAModelRunnerMixin):
# Lazy initialization, these will be set after __init__
self.kv_caches: List[torch.Tensor] = []
# TODO: remove Dict[str, Dict[int, torch.Tensor]] type after 0.10.1.1
self.encoder_cache: Union[Dict[str, Dict[int, torch.Tensor]],
Dict[str, torch.Tensor]] = {}
self.encoder_cache: Dict[str, torch.Tensor] = {}
self.attn_mask = None
self.attn_state = None
self.requests: Dict[str, CachedRequestState] = {}
@@ -369,8 +362,7 @@ class NPUModelRunner(LoRAModelRunnerMixin):
# Remove finished requests from the cached states.
for req_id in scheduler_output.finished_req_ids:
self.requests.pop(req_id, None)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
self.encoder_cache.pop(req_id, None)
# Remove the finished requests from the persistent batch.
# NOTE(woosuk): There could be an edge case where finished_req_ids and
# scheduled_req_ids overlap. This happens when a request is aborted and
@@ -379,17 +371,8 @@ class NPUModelRunner(LoRAModelRunnerMixin):
# and handling the second as a new request.
for req_id in scheduler_output.finished_req_ids:
self.input_batch.remove_request(req_id)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
# Free the cached encoder outputs.
for req_id, input_id in scheduler_output.free_encoder_input_ids:
encoder_outputs = self.encoder_cache.get(req_id)
if encoder_outputs is not None:
encoder_outputs.pop(input_id, None)
if not encoder_outputs:
self.encoder_cache.pop(req_id, None)
else:
for mm_hash in scheduler_output.free_encoder_mm_hashes:
self.encoder_cache.pop(mm_hash, None)
for mm_hash in scheduler_output.free_encoder_mm_hashes:
self.encoder_cache.pop(mm_hash, None)
# Remove the unscheduled requests from the persistent batch.
# NOTE(woosuk): The unscheduled requests are either preempted requests
# or running requests that are not scheduled in this step. We remove
@@ -438,12 +421,7 @@ class NPUModelRunner(LoRAModelRunnerMixin):
num_computed_tokens=new_req_data.num_computed_tokens,
output_token_ids=[],
lora_request=new_req_data.lora_request,
**({
"mm_hashes": new_req_data.mm_hashes
} if not (vllm_version_is("0.10.1.1")
or vllm_version_is("0.10.1")) else {
"mm_hashes": None
}),
mm_hashes=new_req_data.mm_hashes,
)
# Only relevant for models using M-RoPE (e.g, Qwen2-VL)
@@ -750,25 +728,14 @@ class NPUModelRunner(LoRAModelRunnerMixin):
# Batch the multi-modal inputs.
mm_kwargs = list[MultiModalKwargsItem]()
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
req_ids_pos = list[tuple[str, int, PlaceholderRange]]()
else:
mm_hashes_pos = list[tuple[str, PlaceholderRange]]()
mm_hashes_pos = list[tuple[str, PlaceholderRange]]()
for req_id, encoder_input_ids in scheduled_encoder_inputs.items():
req_state = self.requests[req_id]
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
for mm_input_id in encoder_input_ids:
mm_kwargs.append(req_state.mm_kwargs[mm_input_id])
req_ids_pos.append((req_id, mm_input_id,
req_state.mm_positions[mm_input_id]))
else:
for mm_input_id in encoder_input_ids:
# TODO remove this assert after 0.10.1.1
assert req_state.mm_hashes is not None
mm_hash = req_state.mm_hashes[mm_input_id]
mm_kwargs.append(req_state.mm_kwargs[mm_input_id])
mm_hashes_pos.append(
(mm_hash, req_state.mm_positions[mm_input_id]))
for mm_input_id in encoder_input_ids:
mm_hash = req_state.mm_hashes[mm_input_id]
mm_kwargs.append(req_state.mm_kwargs[mm_input_id])
mm_hashes_pos.append(
(mm_hash, req_state.mm_positions[mm_input_id]))
# Batch mm inputs as much as we can: if a request in the batch has
# multiple modalities or a different modality than the previous one,
# we process it separately to preserve item order.
@@ -799,26 +766,12 @@ class NPUModelRunner(LoRAModelRunnerMixin):
for output in curr_group_outputs:
encoder_outputs.append(output)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
# Cache the encoder outputs.
for (req_id, input_id, pos_info), output in zip(
req_ids_pos,
encoder_outputs,
):
if req_id not in self.encoder_cache:
self.encoder_cache[req_id] = {}
self.encoder_cache[req_id][input_id] = scatter_mm_placeholders(
output,
is_embed=pos_info.is_embed,
)
else:
for (mm_hash, pos_info), output in zip(mm_hashes_pos,
encoder_outputs):
self.encoder_cache[mm_hash] = scatter_mm_placeholders(
output,
is_embed=pos_info.is_embed,
)
for (mm_hash, pos_info), output in zip(mm_hashes_pos, encoder_outputs):
self.encoder_cache[mm_hash] = scatter_mm_placeholders(
output,
is_embed=pos_info.is_embed,
)
def _gather_mm_embeddings(
self,
@@ -831,8 +784,7 @@ class NPUModelRunner(LoRAModelRunnerMixin):
req_state = self.requests[req_id]
num_computed_tokens = req_state.num_computed_tokens
mm_positions = req_state.mm_positions
if not (vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1")):
mm_hashes = req_state.mm_hashes
mm_hashes = req_state.mm_hashes
for i, pos_info in enumerate(mm_positions):
start_pos = pos_info.offset
num_encoder_tokens = pos_info.length
@@ -850,26 +802,15 @@ class NPUModelRunner(LoRAModelRunnerMixin):
continue
start_idx = max(num_computed_tokens - start_pos, 0)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
end_idx = min(
num_computed_tokens - start_pos + num_scheduled_tokens,
num_encoder_tokens)
assert start_idx < end_idx
assert req_id in self.encoder_cache
assert i in self.encoder_cache[req_id]
encoder_output = self.encoder_cache[req_id][i]
else:
end_idx = min(
num_computed_tokens - start_pos + num_scheduled_tokens,
num_encoder_tokens,
)
assert start_idx < end_idx
# TODO remove this assert after 0.10.1.1
assert mm_hashes is not None
mm_hash = mm_hashes[i]
encoder_output = self.encoder_cache.get(mm_hash, None)
assert encoder_output is not None,\
f"Encoder cache miss for {mm_hash}."
end_idx = min(
num_computed_tokens - start_pos + num_scheduled_tokens,
num_encoder_tokens,
)
assert start_idx < end_idx
mm_hash = mm_hashes[i]
encoder_output = self.encoder_cache.get(mm_hash, None)
assert encoder_output is not None,\
f"Encoder cache miss for {mm_hash}."
if (is_embed := pos_info.is_embed) is not None:
is_embed = is_embed[start_idx:end_idx]
@@ -1389,52 +1330,6 @@ class NPUModelRunner(LoRAModelRunnerMixin):
hidden_states, attn_metadata, aux_hidden_states)
return draft_token_ids
def _pool_v010(
self,
hidden_states: torch.Tensor,
num_scheduled_tokens: int,
num_scheduled_tokens_np: np.ndarray,
finished_sending: Optional[set[str]] = None,
finished_recving: Optional[set[str]] = None,
kv_connector_output: Optional["KVConnectorOutput"] = None,
) -> ModelRunnerOutput:
assert self.input_batch.num_reqs ==\
len(self.input_batch.pooling_params), \
"Either all or none of the requests in" \
" a batch must be pooling request"
extracted_hidden_states = list(
torch.split(hidden_states[:num_scheduled_tokens],
num_scheduled_tokens_np.tolist()))
pooling_metadata = self.input_batch.pooling_metadata
raw_pooler_output = self.model.pooler(
hidden_states=extracted_hidden_states,
pooling_metadata=pooling_metadata)
pooler_output: list[Optional[torch.Tensor]] = []
seq_lens = self.seq_lens[:self.input_batch.num_reqs]
for raw_output, seq_len, prompt_len in zip(
raw_pooler_output, seq_lens, pooling_metadata.prompt_lens):
if seq_len == prompt_len:
pooler_output.append(raw_output.data.cpu())
else:
pooler_output.append(None)
extra_args = ({"kv_connector_output": kv_connector_output})
modelrunner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=[],
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
pooler_output=pooler_output,
**extra_args,
)
return modelrunner_output
def _pool(
self,
hidden_states: torch.Tensor,
@@ -1606,19 +1501,11 @@ class NPUModelRunner(LoRAModelRunnerMixin):
logits = None
else:
if self.input_batch.pooling_params:
if vllm_version_is("0.10.1.1") or vllm_version_is(
"0.10.1"):
return self._pool_v010(
hidden_states,
scheduler_output.total_num_scheduled_tokens,
num_scheduled_tokens_np, finished_sending,
finished_recving, kv_connector_output)
else:
return self._pool(
hidden_states,
scheduler_output.total_num_scheduled_tokens,
num_scheduled_tokens_np, finished_sending,
finished_recving, kv_connector_output)
return self._pool(
hidden_states,
scheduler_output.total_num_scheduled_tokens,
num_scheduled_tokens_np, finished_sending,
finished_recving, kv_connector_output)
sample_hidden_states = hidden_states[logits_indices]
logits = self.model.compute_logits(sample_hidden_states, None)
if broadcast_pp_output:
@@ -1759,27 +1646,15 @@ class NPUModelRunner(LoRAModelRunnerMixin):
extra_args = ({"kv_connector_output": kv_connector_output})
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
model_runner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
logprobs=logprobs_lists,
spec_token_ids=self._draft_token_ids,
prompt_logprobs_dict=prompt_logprobs_dict,
pooler_output=[],
**extra_args,
)
else:
model_runner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict=prompt_logprobs_dict,
pooler_output=[],
**extra_args,
)
model_runner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict=prompt_logprobs_dict,
pooler_output=[],
**extra_args,
)
durations = ProfileExecuteDuration().pop_captured_sync()
if durations:
@@ -2079,8 +1954,6 @@ class NPUModelRunner(LoRAModelRunnerMixin):
assert sum(num_scheduled_tokens_list) == num_tokens
assert len(num_scheduled_tokens_list) == num_reqs
hidden_states_list = list(
torch.split(hidden_states, num_scheduled_tokens_list))
req_num_tokens = num_tokens // num_reqs
dummy_token_ids = torch.zeros((num_reqs, req_num_tokens),
@@ -2091,55 +1964,32 @@ class NPUModelRunner(LoRAModelRunnerMixin):
dummy_pooling_params = PoolingParams(task=task)
to_update = model.pooler.get_pooling_updates(task)
to_update.apply(dummy_pooling_params)
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
dummy_prompt_lens = torch.tensor(
[h.shape[0] for h in hidden_states_list],
device=self.device,
)
dummy_metadata = PoolingMetadata(
prompt_lens=dummy_prompt_lens,
prompt_token_ids=dummy_token_ids,
pooling_params=[dummy_pooling_params] * num_reqs,
)
try:
return model.pooler(hidden_states=hidden_states_list,
pooling_metadata=dummy_metadata)
except RuntimeError as e:
if 'out of memory' in str(e):
raise RuntimeError(
"NPU out of memory occurred when warming up pooler "
f"({task=}) with {num_reqs} dummy requests. Please try "
"lowering `max_num_seqs` or `gpu_memory_utilization` when "
"initializing the engine.") from e
else:
raise e
else:
dummy_prompt_lens = torch.tensor(
num_scheduled_tokens_list,
device="cpu",
)
dummy_metadata = PoolingMetadata(
prompt_lens=dummy_prompt_lens,
prompt_token_ids=dummy_token_ids,
pooling_params=[dummy_pooling_params] * num_reqs,
)
dummy_prompt_lens = torch.tensor(
num_scheduled_tokens_list,
device="cpu",
)
dummy_metadata = PoolingMetadata(
prompt_lens=dummy_prompt_lens,
prompt_token_ids=dummy_token_ids,
pooling_params=[dummy_pooling_params] * num_reqs,
)
dummy_metadata.build_pooling_cursor(num_scheduled_tokens_list,
device=hidden_states.device)
dummy_metadata.build_pooling_cursor(num_scheduled_tokens_list,
device=hidden_states.device)
try:
return model.pooler(hidden_states=hidden_states,
pooling_metadata=dummy_metadata)
except RuntimeError as e:
if 'out of memory' in str(e):
raise RuntimeError(
"CUDA out of memory occurred when warming up pooler "
f"({task=}) with {num_reqs} dummy requests. Please try "
"lowering `max_num_seqs` or `gpu_memory_utilization` when "
"initializing the engine.") from e
else:
raise e
try:
return model.pooler(hidden_states=hidden_states,
pooling_metadata=dummy_metadata)
except RuntimeError as e:
if 'out of memory' in str(e):
raise RuntimeError(
"CUDA out of memory occurred when warming up pooler "
f"({task=}) with {num_reqs} dummy requests. Please try "
"lowering `max_num_seqs` or `gpu_memory_utilization` when "
"initializing the engine.") from e
else:
raise e
@torch.inference_mode()
def _dummy_pooler_run(

View File

@@ -39,8 +39,6 @@ from vllm.v1.spec_decode.utils import is_spec_decode_unsupported
from vllm.v1.utils import copy_slice
from vllm.v1.worker.block_table import MultiGroupBlockTable
from vllm_ascend.utils import vllm_version_is
@dataclass
class CachedRequestState:
@@ -49,8 +47,7 @@ class CachedRequestState:
prompt_token_ids: list[int]
mm_kwargs: list[MultiModalKwargsItem]
mm_positions: list[PlaceholderRange]
# TODO: remove Optional after 0.10.1.1
mm_hashes: Optional[list[str]]
mm_hashes: list[str]
sampling_params: Optional[SamplingParams]
pooling_params: Optional[PoolingParams]
generator: Optional[torch.Generator]
@@ -726,20 +723,13 @@ class InputBatch:
pooling_params = [
self.pooling_params[req_id] for req_id in self.req_ids
]
if vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1"):
return PoolingMetadata(
prompt_lens=torch.from_numpy(
self.num_prompt_tokens[:self.num_reqs]).to(self.device),
prompt_token_ids=self.sampling_metadata.prompt_token_ids,
pooling_params=pooling_params,
)
else:
return PoolingMetadata(
prompt_lens=torch.from_numpy(
self.num_prompt_tokens[:self.num_reqs]),
prompt_token_ids=self.sampling_metadata.prompt_token_ids,
pooling_params=pooling_params,
)
return PoolingMetadata(
prompt_lens=torch.from_numpy(
self.num_prompt_tokens[:self.num_reqs]),
prompt_token_ids=self.sampling_metadata.prompt_token_ids,
pooling_params=pooling_params,
)
def _make_prompt_token_ids_tensor(self) -> torch.Tensor:
max_prompt_len = self.num_prompt_tokens[:self.num_reqs].max()

View File

@@ -38,7 +38,8 @@ from vllm.tasks import SupportedTask
from vllm.utils import STR_DTYPE_TO_TORCH_DTYPE, GiB_bytes
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, DraftTokenIds,
ModelRunnerOutput)
from vllm.v1.worker.worker_base import WorkerBase
from vllm_ascend.ascend_config import init_ascend_config
@@ -47,14 +48,9 @@ from vllm_ascend.distributed.parallel_state import init_ascend_model_parallel
from vllm_ascend.platform import NPUPlatform
from vllm_ascend.utils import (init_ascend_soc_version,
register_ascend_customop, sleep_mode_enabled,
try_register_lib, vllm_version_is)
try_register_lib)
from vllm_ascend.worker.model_runner_v1 import NPUModelRunner
if not (vllm_version_is("0.10.1.1") or vllm_version_is("0.10.1")):
from vllm.v1.outputs import DraftTokenIds
else:
DraftTokenIds = None
class NPUWorker(WorkerBase):