add qwen3

This commit is contained in:
Chranos
2026-02-04 17:22:39 +08:00
parent d1c0f68ab4
commit 8511fe8530
1932 changed files with 300426 additions and 0 deletions

View File

@@ -0,0 +1,48 @@
# 背景
此示例用于在vLLM中演示chunked parallel pipeline功能通过mlu_hijck机制将需要修改的代码劫持到当前目录避免修改主仓库代码。
# 支持模型
- LlamaForCausalLM
- CustomForCausalLM
# Demo运行方式
当前Chunked Parallel Pipeline仅支持通过AsyncLLMEngine方式用paged mode运行。
- 设置环境变量
```bash
export CHUNKED_PIPELINE_PARALLEL_EN=true
```
- 启动server进程
```bash
# 设置engine超时阈值。
export VLLM_ENGINE_ITERATION_TIMEOUT_S=180
python -m vllm.entrypoints.openai.api_server \
--port ${PORT} \
--model ${MODEL_PATH} \
--swap-space 16 \
--pipeline-parallel-size ${PP_SIZE} \
--max-num-batched-tokens ${MAX_TOKENS_NUM} \
--enable-chunked-prefill \
--worker-use-ray \
--enforce-eager
```
- 启动client进程
这里以随机数为例,可以选用真实数据集。
```bash
python benchmarks/benchmark_serving.py \
--backend vllm \
--model ${MODEL_PATH} \
--dataset-name random \
--num-prompts ${NUM_PROMPT} \
--port ${PORT} \
--random-input-len ${INPUT_LEN} \
--random-output-len 1 \
--request-rate inf
```

View File

@@ -0,0 +1 @@
from . import parallel_state

View File

@@ -0,0 +1,223 @@
# Copyright 2023 The vLLM team.
# Adapted from
# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/parallel_state.py
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
"""vLLM distributed state.
It takes over the control of the distributed environment from PyTorch.
The typical workflow is:
- call `init_distributed_environment` to initialize the distributed environment.
- call `initialize_model_parallel` or `ensure_model_parallel_initialized` to
initialize the model parallel groups.
- any code dealing with the distributed stuff
- call `destroy_model_parallel` to destroy the model parallel groups.
- call `destroy_distributed_environment` to destroy the distributed environment.
If you only need to use the distributed environment without model/pipeline
parallelism, you can skip the model parallel initialization and destruction
steps.
"""
from typing import Any, Dict, List, Optional, Tuple, Union
import torch
import torch.distributed
from vllm.distributed.parallel_state import (
GroupCoordinator,
_split_tensor_dict,
TensorMetadata,
)
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
def vllm__distributed__GroupCoordinator__send_tensor_dict(
self,
tensor_dict: Dict[str, Union[torch.Tensor, Any]],
dst: Optional[int] = None,
all_gather_group: Optional["GroupCoordinator"] = None,
) -> Optional[Dict[str, Union[torch.Tensor, Any]]]:
"""Send the input tensor dictionary.
NOTE: `dst` is the local rank of the source rank.
"""
# Bypass the function if we are using only 1 GPU.
if not torch.distributed.is_initialized() or self.world_size == 1:
return tensor_dict
all_gather_size = (1 if all_gather_group is None else
all_gather_group.world_size)
all_gather_rank = (0 if all_gather_group is None else
all_gather_group.rank_in_group)
group = self.device_group
metadata_group = self.cpu_group
if dst is None:
dst = (self.rank_in_group + 1) % self.world_size
assert dst < self.world_size, f"Invalid dst rank ({dst})"
"""
=============================
Modifies by vllm_mlu
=============================
@brief: Skip send tensor metadata list.
"""
assert isinstance(
tensor_dict,
dict), f"Expecting a dictionary, got {type(tensor_dict)}"
_, tensor_list = _split_tensor_dict(tensor_dict)
"""
=============================
End of MLU Hijack
=============================
"""
for tensor in tensor_list:
if tensor.numel() == 0:
# Skip sending empty tensors.
continue
# send-allgather: send only a slice, then do allgather.
if (all_gather_group is not None
and tensor.numel() % all_gather_size == 0):
tensor = tensor.reshape(all_gather_size, -1)[all_gather_rank]
if tensor.is_cpu:
# use metadata_group for CPU tensors
torch.distributed.send(tensor,
dst=self.ranks[dst],
group=metadata_group)
else:
"""
=============================
Modifies by vllm_mlu
=============================
@brief: Modify send to isend.
"""
# use group for GPU tensors
torch.distributed.isend(tensor,
dst=self.ranks[dst],
group=group)
"""
=============================
End of MLU Hijack
=============================
"""
return None
"""
=============================
Modifies by vllm_mlu
=============================
@brief: Add a parameter `recv_metadata_list`.
"""
def vllm__distributed__GroupCoordinator__recv_tensor_dict(
self,
src: Optional[int] = None,
all_gather_group: Optional["GroupCoordinator"] = None,
recv_metadata_list: List[Tuple[str, Any]] = [],
) -> Optional[Dict[str, Union[torch.Tensor, Any]]]:
"""
=============================
End of MLU Hijack
=============================
"""
"""Recv the input tensor dictionary.
NOTE: `src` is the local rank of the source rank.
"""
# Bypass the function if we are using only 1 GPU.
if not torch.distributed.is_initialized() or self.world_size == 1:
return None
all_gather_size = (1 if all_gather_group is None else
all_gather_group.world_size)
all_gather_rank = (0 if all_gather_group is None else
all_gather_group.rank_in_group)
group = self.device_group
metadata_group = self.cpu_group
if src is None:
src = (self.rank_in_group - 1) % self.world_size
assert src < self.world_size, f"Invalid src rank ({src})"
"""
=============================
Modifies by vllm_mlu
=============================
@brief: Skip receiving tensor metadata list.
"""
"""
=============================
End of MLU Hijack
=============================
"""
tensor_dict: Dict[str, Any] = {}
for key, value in recv_metadata_list:
if isinstance(value, TensorMetadata):
tensor = torch.empty(value.size,
dtype=value.dtype,
device=value.device)
if tensor.numel() == 0:
# Skip broadcasting empty tensors.
tensor_dict[key] = tensor
continue
# send-allgather: send only a slice, then do allgather.
use_all_gather = (all_gather_group is not None
and tensor.numel() % all_gather_size == 0)
if use_all_gather:
orig_shape = tensor.shape
tensor = tensor.reshape(all_gather_size,
-1)[all_gather_rank]
if tensor.is_cpu:
# use metadata_group for CPU tensors
torch.distributed.recv(tensor,
src=self.ranks[src],
group=metadata_group)
else:
"""
=============================
Modifies by vllm_mlu
=============================
@brief: Modify recv to irecv, and wait to finish.
"""
# use group for GPU tensors
req = torch.distributed.irecv(tensor,
src=self.ranks[src],
group=group)
req.wait()
"""
=============================
End of MLU Hijack
=============================
"""
if use_all_gather:
# do the allgather
tensor = all_gather_group.all_gather( # type: ignore
tensor, dim=0)
tensor = tensor.reshape(orig_shape)
tensor_dict[key] = tensor
else:
tensor_dict[key] = value
return tensor_dict
MluHijackObject.apply_hijack(
GroupCoordinator,
GroupCoordinator.send_tensor_dict,
vllm__distributed__GroupCoordinator__send_tensor_dict,
)
MluHijackObject.apply_hijack(
GroupCoordinator,
GroupCoordinator.recv_tensor_dict,
vllm__distributed__GroupCoordinator__recv_tensor_dict,
)

View File

@@ -0,0 +1 @@
from . import async_llm_engine

View File

@@ -0,0 +1,310 @@
import asyncio
from typing import (List, Optional, Union)
from vllm.envs import VLLM_ENGINE_ITERATION_TIMEOUT_S as ENGINE_ITERATION_TIMEOUT_S
from vllm.core.scheduler import ScheduledSequenceGroup
from vllm.engine.async_timeout import asyncio_timeout
from vllm.outputs import EmbeddingRequestOutput, RequestOutput
from vllm.sequence import ExecuteModelRequest, SequenceGroup, SequenceGroupMetadata
from vllm.engine.async_llm_engine import (_AsyncLLMEngine, AsyncLLMEngine)
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from vllm.engine.llm_engine import LLMEngine
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
def vllm__engine__async_llm_engine___AsyncLLMEngine____init__(self, *args, **kwargs):
LLMEngine.__init__(self, *args, **kwargs)
"""
=============================
Modifies by vllm_mlu
=============================
@brief: Add a member variable to record parallel chunked prefill tasks,
in which each member means (virtual_engine -> {req_id: task_list})
"""
self.step_tasks = [dict() for _ in range(len(self.scheduler))]
"""
=============================
End of MLU Hijack
=============================
"""
def _update_scheduler_status(
self,
scheduled_seq_groups: List[ScheduledSequenceGroup],
ignored_seq_groups: List[SequenceGroup],
seq_group_metadata_list: List[SequenceGroupMetadata]
) -> None:
"""Update scheduler status after emitting prefill task.
For chunked pipeline parallel, since chunked prefill tasks
are executed asynchronously, we update scheduler status once
tasks are emited.
"""
# Update the scheduled sequence groups.
for scheduled_seq_group, seq_group_meta in zip(
scheduled_seq_groups, seq_group_metadata_list):
seq_group = scheduled_seq_group.seq_group
seq_group.update_num_computed_tokens(
scheduled_seq_group.token_chunk_size)
# Free the finished sequence groups.
for scheduler in self.scheduler:
scheduler.free_finished_seq_groups()
async def vllm__engine__async_llm_engine___AsyncLLMEngine__step_async(
self, virtual_engine: int
) -> Optional[List[Union[RequestOutput, EmbeddingRequestOutput]]]:
"""Performs one decoding iteration and returns newly generated results.
The workers are ran asynchronously if possible.
This function performs one decoding iteration of the engine. It first
schedules the sequences to be executed in the next iteration and the
token blocks to be swapped in/out/copy. Then, it executes the model
and updates the scheduler with the model outputs. Finally, it decodes
the sequences and returns the newly generated results.
"""
# these are cached outputs from previous iterations. None if on first
# iteration
cached_outputs = self.cached_scheduler_outputs[virtual_engine]
seq_group_metadata_list = cached_outputs.seq_group_metadata_list
scheduler_outputs = cached_outputs.scheduler_outputs
allow_async_output_proc = cached_outputs.allow_async_output_proc
ctx = self.scheduler_contexts[virtual_engine]
# Clear outputs for each new scheduler iteration
ctx.request_outputs.clear()
# skip the scheduler if there are any remaining steps in the seq groups.
# This ensures that the scheduler is only called again when the current
# batch has completed.
if not self._has_remaining_steps(seq_group_metadata_list):
# Schedule iteration
(seq_group_metadata_list, scheduler_outputs,
allow_async_output_proc
) = self.scheduler[virtual_engine].schedule()
ctx.seq_group_metadata_list = seq_group_metadata_list
ctx.scheduler_outputs = scheduler_outputs
# Maybe switch from async mode to sync mode
if not allow_async_output_proc and len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx)
if (self.scheduler_config.is_multi_step
and scheduler_outputs.num_lookahead_slots > 0):
# cache the scheduler outputs for the next iteration if we have
# lookahead slots
self._cache_scheduler_outputs_for_multi_step(
virtual_engine, seq_group_metadata_list, scheduler_outputs,
allow_async_output_proc)
assert seq_group_metadata_list is not None
assert scheduler_outputs is not None
if not scheduler_outputs.is_empty():
finished_requests_ids = self.scheduler[
virtual_engine].get_and_reset_finished_requests_ids()
# Check if we have a cached last_output from the previous iteration.
# For supporting PP this is probably the best way to pass the
# sampled_token_ids, as a separate broadcast over all the PP stages
# will cause one virtual engine's microbatch to block the pipeline.
last_sampled_token_ids = \
self._get_last_sampled_token_ids(virtual_engine)
execute_model_req = ExecuteModelRequest(
seq_group_metadata_list=seq_group_metadata_list,
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
blocks_to_copy=scheduler_outputs.blocks_to_copy,
virtual_engine=virtual_engine,
num_lookahead_slots=scheduler_outputs.num_lookahead_slots,
running_queue_size=scheduler_outputs.running_queue_size,
finished_requests_ids=finished_requests_ids,
# We use ExecuteModelRequest to pass the last sampled_token_ids
# to each of the non-last PP stages for in-place prepare_input.
last_sampled_token_ids=last_sampled_token_ids)
if allow_async_output_proc:
execute_model_req.async_callback = self.async_callbacks[
virtual_engine]
# Execute the model.
"""
=============================
Modifies by vllm_mlu
=============================
@brief: for chunked prefill tasks except the final task for a single
request, create them asynchronously. And for the last prefill task,
gather all previous tasks and get the final output.
"""
if seq_group_metadata_list[0].is_prompt:
assert len(seq_group_metadata_list) == 1, \
"Currently we only support schedule single batch in " \
"prefill stage for chunked pipeline parallel."
token_chunk_size = seq_group_metadata_list[0].token_chunk_size
seq_data = list(seq_group_metadata_list[0].seq_data.values())[0]
prefill_loc = seq_data.get_num_computed_tokens()
task = asyncio.create_task(
self.model_executor.execute_model_async(execute_model_req, [prefill_loc], [token_chunk_size]))
request_id = seq_group_metadata_list[0].request_id
self.step_tasks[virtual_engine].setdefault(request_id, []).append(task)
# Gather point: if all prefill tasks for current sequence group
# have been dispatched, we wait all prompt tasks and get the
# final output.
seq_len = seq_data.get_len()
if token_chunk_size + prefill_loc == seq_len:
outputs = await asyncio.gather(*self.step_tasks[virtual_engine][request_id])
outputs = outputs[-1]
else:
# Since prefill stage has not been completely finished, we
# just update scheduler and sequence status and return None.
_update_scheduler_status(self, scheduler_outputs.scheduled_seq_groups,
scheduler_outputs.ignored_seq_groups, seq_group_metadata_list)
return None
else:
"""
=============================
End of MLU Hijack
=============================
"""
outputs = await self.model_executor.execute_model_async(
execute_model_req)
# we need to do this here so that last step's sampled_token_ids can
# be passed to the next iteration for PP.
if self.scheduler_config.is_multi_step:
self._update_cached_scheduler_output(virtual_engine, outputs)
else:
if len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx)
outputs = []
# Finish the current step for all the sequence groups.
if self.scheduler_config.is_multi_step:
for seq_group in seq_group_metadata_list:
seq_group.finish_step()
if not self._has_remaining_steps(seq_group_metadata_list):
# Clear the cache if we have finished all the steps
if self.scheduler_config.is_multi_step:
self.cached_scheduler_outputs[
virtual_engine] = SchedulerOutputState()
# is_first_step_output is True only when the num_steps of all
# the sequences are 1. When the num_steps > 1,
# multi_step_model_runner does the first-step output append.
is_first_step_output: bool = False if not seq_group_metadata_list \
else seq_group_metadata_list[0].state.num_steps == 1
ctx.append_output(outputs=outputs,
seq_group_metadata_list=seq_group_metadata_list,
scheduler_outputs=scheduler_outputs,
is_async=allow_async_output_proc,
is_last_step=True,
is_first_step_output=is_first_step_output)
if outputs and allow_async_output_proc:
assert len(
outputs
) == 1, "Async postprocessor expects only a single output set"
self._advance_to_next_step(
outputs[0], seq_group_metadata_list,
scheduler_outputs.scheduled_seq_groups)
if not allow_async_output_proc:
self._process_model_outputs(ctx=ctx)
# Log stats.
self.do_log_stats(scheduler_outputs, outputs)
# Tracing
self.do_tracing(scheduler_outputs)
else:
# Multi-step case
return ctx.request_outputs
if not self.has_unfinished_requests():
# Drain async postprocessor (if exists)
if len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx)
assert len(ctx.output_queue) == 0
return ctx.request_outputs
async def vllm__engine__async_llm_engine__AsyncLLMEngine__engine_step(
self, virtual_engine: int
) -> bool:
"""Kick the engine to process the waiting requests.
Returns True if there are in-progress requests."""
new_requests, aborted_requests = (
self._request_tracker.get_new_and_aborted_requests())
for new_request in new_requests:
# Add the request into the vLLM engine's waiting queue.
try:
await self.engine.add_request_async(**new_request)
except ValueError as e:
# TODO: use a vLLM specific error for failed validation
self._request_tracker.process_exception(
new_request["request_id"],
e,
verbose=self.log_requests,
)
if aborted_requests:
await self._engine_abort(aborted_requests)
request_outputs = await self.engine.step_async(virtual_engine)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
When request_outputs is None, it means prefill tasks are not finished.
"""
if request_outputs is None:
return True
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
# Put the outputs into the corresponding streams.
# If used as a callback, then already invoked inside
# LLMEngine's _process_model_outputs
if not self.use_process_request_outputs_callback:
all_finished = self.process_request_outputs(request_outputs)
else:
# For callback case, we only need to detect when all
# requests are finished
all_finished = all(request_output.finished
for request_output in request_outputs)
return not all_finished
MluHijackObject.apply_hijack(
_AsyncLLMEngine,
_AsyncLLMEngine.__init__,
vllm__engine__async_llm_engine___AsyncLLMEngine____init__
)
MluHijackObject.apply_hijack(
_AsyncLLMEngine,
_AsyncLLMEngine.step_async,
vllm__engine__async_llm_engine___AsyncLLMEngine__step_async
)
MluHijackObject.apply_hijack(
AsyncLLMEngine,
AsyncLLMEngine.engine_step,
vllm__engine__async_llm_engine__AsyncLLMEngine__engine_step
)

View File

@@ -0,0 +1,3 @@
from . import distributed_gpu_executor
from . import distributed_mlu_executor
from . import ray_mlu_executor

View File

@@ -0,0 +1,75 @@
import asyncio
from abc import abstractmethod
from typing import List, Optional
from vllm.executor.distributed_gpu_executor import DistributedGPUExecutorAsync
from vllm.sequence import ExecuteModelRequest
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
'''
=============================
Modify by vllm_mlu
=============================
@brief: Add two parameters, in which prefill_locs indicates the start location
and token_chunk_sizes indicates the chunk size for each task.
'''
async def vllm__executor__distributed_gpu_executor__DistributedGPUExecutorAsync__execute_model_async(
self,
execute_model_req: ExecuteModelRequest,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> List[SamplerOutput]:
'''
==================
End of MLU Hijack
==================
'''
if self.parallel_worker_tasks is None:
# Start model execution loop running in the parallel workers
self.parallel_worker_tasks = asyncio.create_task(
self._start_worker_execution_loop())
# Only the driver worker returns the sampling results.
return await self._driver_execute_model_async(execute_model_req, prefill_locs, token_chunk_sizes)
'''
=============================
Modify by vllm_mlu
=============================
@brief: Add two parameters, in which prefill_locs indicates the start location
and token_chunk_sizes indicates the chunk size for each task.
'''
@abstractmethod
async def vllm__executor__distributed_gpu_executor__DistributedGPUExecutorAsync___driver_execute_model_async(
self,
execute_model_req: Optional[ExecuteModelRequest] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> List[SamplerOutput]:
'''
==================
End of MLU Hijack
==================
'''
"""Execute the model asynchronously in the driver worker.
Passing None will cause the driver to stop the model execution
loop running in each of the remote workers.
"""
raise NotImplementedError
MluHijackObject.apply_hijack(
DistributedGPUExecutorAsync,
DistributedGPUExecutorAsync.execute_model_async,
vllm__executor__distributed_gpu_executor__DistributedGPUExecutorAsync__execute_model_async
)
MluHijackObject.apply_hijack(
DistributedGPUExecutorAsync,
DistributedGPUExecutorAsync._driver_execute_model_async,
vllm__executor__distributed_gpu_executor__DistributedGPUExecutorAsync___driver_execute_model_async
)

View File

@@ -0,0 +1,75 @@
import asyncio
from abc import abstractmethod
from typing import List, Optional
from vllm.executor.distributed_mlu_executor import DistributedMLUExecutorAsync
from vllm.sequence import ExecuteModelRequest
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
'''
=============================
Modify by vllm_mlu
=============================
@brief: Add two parameters, in which prefill_locs indicates the start location
and token_chunk_sizes indicates the chunk size for each task.
'''
async def vllm__executor__distributed_mlu_executor__DistributedMLUExecutorAsync__execute_model_async(
self,
execute_model_req: ExecuteModelRequest,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> List[SamplerOutput]:
'''
==================
End of MLU Hijack
==================
'''
if self.parallel_worker_tasks is None:
# Start model execution loop running in the parallel workers
self.parallel_worker_tasks = asyncio.create_task(
self._start_worker_execution_loop())
# Only the driver worker returns the sampling results.
return await self._driver_execute_model_async(execute_model_req, prefill_locs, token_chunk_sizes)
'''
=============================
Modify by vllm_mlu
=============================
@brief: Add two parameters, in which prefill_locs indicates the start location
and token_chunk_sizes indicates the chunk size for each task.
'''
@abstractmethod
async def vllm__executor__distributed_mlu_executor__DistributedMLUExecutorAsync___driver_execute_model_async(
self,
execute_model_req: Optional[ExecuteModelRequest] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> List[SamplerOutput]:
'''
==================
End of MLU Hijack
==================
'''
"""Execute the model asynchronously in the driver worker.
Passing None will cause the driver to stop the model execution
loop running in each of the remote workers.
"""
raise NotImplementedError
MluHijackObject.apply_hijack(
DistributedMLUExecutorAsync,
DistributedMLUExecutorAsync.execute_model_async,
vllm__executor__distributed_mlu_executor__DistributedMLUExecutorAsync__execute_model_async
)
MluHijackObject.apply_hijack(
DistributedMLUExecutorAsync,
DistributedMLUExecutorAsync._driver_execute_model_async,
vllm__executor__distributed_mlu_executor__DistributedMLUExecutorAsync___driver_execute_model_async
)

View File

@@ -0,0 +1,175 @@
import asyncio
from typing import List, Optional
from vllm.executor.distributed_mlu_executor import DistributedMLUExecutorAsync
from vllm.executor.ray_mlu_executor import RayMLUExecutorAsync
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.sequence import ExecuteModelRequest
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
from ..lock_utils import (_run_task_with_priority_lock, PriorityLock)
logger = init_logger(__name__)
vllm__executor__ray_mlu_executor__RayMLUExecutorAsync____init____org = RayMLUExecutorAsync.__init__
def vllm__executor__ray_mlu_executor__RayMLUExecutorAsync____init__(self, *args, **kwargs):
vllm__executor__ray_mlu_executor__RayMLUExecutorAsync____init____org(self, *args, **kwargs)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
For the prefill stage of a request in chunked pipeline parallel, tasks
in the same pp_rank must be executed in order. Here, we use priority lock
to implement this function.
To ensure different requests executed in order, we will reserve a certain
priority interval for each request. And the interval length is
`max_model_len`, which is no less than the model execution rounds.
And for each execution round, the priority is:
`request_id * max_model_len + model_execution_time`
"""
self.priority = dict()
self.priority_interval = self.model_config.max_model_len
# To ensure pp tasks for the same prefill tokens are created atomically, we
# use an extra lock to guard it.
self.lock = asyncio.Lock()
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
'''
=============================
Modify by vllm_mlu
=============================
@brief: Add two parameters, in which prefill_locs indicates the start location
and token_chunk_sizes indicates the chunk size for each task.
'''
async def vllm__executor__ray_mlu_executor__RayMLUExecutorAsync__execute_model_async(
self,
execute_model_req: ExecuteModelRequest,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> List[SamplerOutput]:
'''
==================
End of MLU Hijack
==================
'''
assert not self.use_ray_spmd_worker, (
"RayMLUExecutorAsync is not supported for spmd mode.")
return await DistributedMLUExecutorAsync.execute_model_async(
self, execute_model_req, prefill_locs, token_chunk_sizes)
'''
=============================
Modify by vllm_mlu
=============================
@brief: Add two parameters, in which prefill_locs indicates the start location
and token_chunk_sizes indicates the chunk size for each task.
'''
async def vllm__executor__ray_mlu_executor__RayMLUExecutorAsync___driver_execute_model_async(
self,
execute_model_req: Optional[ExecuteModelRequest] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> List[SamplerOutput]:
'''
==================
End of MLU Hijack
==================
'''
assert not self.use_ray_spmd_worker, (
"driver_worker does not exist for VLLM_USE_RAY_SPMD_WORKER=1")
if not self.tp_driver_workers:
return await self.driver_exec_method(
"execute_model", execute_model_req, prefill_locs, token_chunk_sizes)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
Use PriorityLock instead of lock to ensure that tasks in the same pp rank
are executed with the dispatched order.
"""
request_id = 'dummy'
update_priority_threshold = False
is_prompt = False
if execute_model_req is not None:
assert len(execute_model_req.seq_group_metadata_list) == 1, \
"Only single batch is supported for chunked pipeline parallel mode."
request_id = execute_model_req.seq_group_metadata_list[0].request_id
seq_group_metadata = execute_model_req.seq_group_metadata_list[0]
request_priority = self.priority.setdefault(
request_id, len(self.priority)*self.model_config.max_model_len)
seq_data = list(seq_group_metadata.seq_data.values())[0]
seq_len = seq_data.get_len()
# Update priority threshold to schedule next request.
is_prompt = seq_group_metadata.is_prompt
if is_prompt and seq_len == prefill_locs[0] + token_chunk_sizes[0]:
update_priority_threshold = True
else:
request_priority = -1
if self.pp_locks is None:
# This locks each pipeline parallel stage so multiple virtual
# engines can't execute on the same stage at the same time
# We create the locks here to avoid creating them in the constructor
# which uses a different asyncio loop.
self.pp_locks = [
PriorityLock(init_priority_threshold=self.model_config.max_model_len,
priority_interval=self.priority_interval)
for _ in range(self.parallel_config.pipeline_parallel_size)
]
async with self.lock:
tasks = [
asyncio.create_task(
_run_task_with_priority_lock(
self.driver_exec_method, self.pp_locks[0], request_priority,
update_priority_threshold,
"execute_model", execute_model_req, prefill_locs, token_chunk_sizes,
request_priority))
]
for pp_rank, driver_worker in enumerate(self.tp_driver_workers,
start=1):
tasks.append(
asyncio.create_task(
_run_task_with_priority_lock(
driver_worker.execute_method.remote,
self.pp_locks[pp_rank], request_priority,
update_priority_threshold,
"execute_model", execute_model_req, prefill_locs, token_chunk_sizes,
request_priority)))
if execute_model_req is not None:
self.priority[request_id] += (token_chunk_sizes[0] if is_prompt else 1)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
results = await asyncio.gather(*tasks)
# Only the last PP stage has the final results.
return results[-1]
MluHijackObject.apply_hijack(
RayMLUExecutorAsync,
RayMLUExecutorAsync.__init__,
vllm__executor__ray_mlu_executor__RayMLUExecutorAsync____init__
)
MluHijackObject.apply_hijack(
RayMLUExecutorAsync,
RayMLUExecutorAsync.execute_model_async,
vllm__executor__ray_mlu_executor__RayMLUExecutorAsync__execute_model_async
)
MluHijackObject.apply_hijack(
RayMLUExecutorAsync,
RayMLUExecutorAsync._driver_execute_model_async,
vllm__executor__ray_mlu_executor__RayMLUExecutorAsync___driver_execute_model_async
)

View File

@@ -0,0 +1,218 @@
import asyncio
from typing import Callable
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
class PriorityLock:
"""
A lock class that prioritizes tasks based on their priority level and supports dynamic
updating of priority thresholds after each lock release.
Attributes:
-----------
_lock : asyncio.Lock
An internal asyncio lock used to ensure mutual exclusion.
_queue : asyncio.PriorityQueue
A priority queue to store tasks by their priority. Tasks with lower numerical priority
values have higher priority.
_condition : asyncio.Condition
A condition variable to manage the waiting and notification of tasks.
_active_task : asyncio.Task or None
Tracks the task currently holding the lock, or None if the lock is not held.
_current_priority_threshold : int
The current priority threshold for tasks allowed to acquire the lock.
_priority_interval : int
The value by which the priority threshold is incremented after a lock release when
`update_priority_threshold` is enabled.
"""
def __init__(self, init_priority_threshold: int, priority_interval: int):
"""
Initializes a PriorityLock with an initial priority threshold and interval.
Parameters:
-----------
init_priority_threshold : int
The initial threshold for task priorities that can acquire the lock.
priority_interval : int
The interval by which the priority threshold increases after each lock release.
"""
self._lock = asyncio.Lock() # Internal asyncio lock
self._queue = asyncio.PriorityQueue() # Priority queue to manage tasks by priority
self._condition = asyncio.Condition() # Condition variable to manage waiting tasks
self._active_task = None # Keep track of the current active task holding the lock
self._current_priority_threshold = init_priority_threshold
self._priority_interval = priority_interval
async def acquire(self, priority):
"""
Acquires the lock for a task based on its priority.
Parameters:
-----------
priority : int
The priority level of the task attempting to acquire the lock.
Behavior:
---------
- The task is enqueued based on its priority.
- The task waits until it is the highest-priority task in the queue, has a priority
below the current threshold, and the lock is available.
"""
queue_item = (priority, asyncio.current_task())
async with self._condition:
await self._queue.put(queue_item)
# Wait until the current task is the one with the highest priority and the lock is available
while True:
# Check if the current task is at the front of the queue and the lock is available
current_priority, current_task = self._queue._queue[0] # Peek at the highest priority task
if current_priority < self._current_priority_threshold and current_task is asyncio.current_task() and not self._lock.locked():
await self._lock.acquire() # Acquire the lock
self._active_task = current_task # Mark the current task as holding the lock
await self._queue.get() # Remove the task from the queue
break
# If not the highest priority task, wait until notified
await self._condition.wait()
async def release(self, update_priority_threshold):
"""
Releases the lock, optionally updating the priority threshold.
Parameters:
-----------
update_priority_threshold : bool
If True, increments the priority threshold by the configured interval.
"""
# Notify waiting tasks that the lock has been released
async with self._condition:
self._active_task = None # Clear the reference to the current task
self._lock.release()
if update_priority_threshold:
self._current_priority_threshold += self._priority_interval
self._condition.notify_all() # Wake up all waiting tasks to recheck their priority
async def __aenter__(self, priority):
"""
Async context manager entry. Acquires the lock with the specified priority.
Parameters:
-----------
priority : int
The priority level of the task acquiring the lock.
Returns:
--------
self : PriorityLock
The lock instance.
"""
await self.acquire(priority)
return self
async def __aexit__(self, exc_type, exc, tb, update_priority_threshold):
"""
Async context manager exit. Releases the lock and optionally updates the priority threshold.
Parameters:
-----------
exc_type : Exception or None
The exception type, if any, raised in the 'async with' block.
exc : Exception or None
The exception instance, if any, raised in the 'async with' block.
tb : traceback or None
The traceback object, if any, associated with the exception.
update_priority_threshold : bool
If True, increments the priority threshold after releasing the lock.
"""
await self.release(update_priority_threshold) # Now release is async
class PriorityLockManager:
"""
A helper class to manage the acquisition and release of a PriorityLock using an 'async with' block.
Attributes:
-----------
_lock : PriorityLock
The PriorityLock instance to be managed.
_priority : int
The priority level for the current task.
_update_priority_threshold : bool
Whether to update the priority threshold after the lock is released.
"""
def __init__(self, lock, priority, update_priority_threshold):
"""
Initializes a PriorityLockManager with a PriorityLock and task-specific parameters.
Parameters:
-----------
lock : PriorityLock
The lock instance to manage.
priority : int
The priority level for the current task.
update_priority_threshold : bool
Whether to update the priority threshold after releasing the lock.
"""
self._lock = lock # The lock being managed
self._priority = priority # The priority level for the current task
self._update_priority_threshold = update_priority_threshold
async def __aenter__(self):
"""
Async context manager entry. Acquires the lock with the specified priority.
Returns:
--------
lock : PriorityLock
The lock instance that was acquired.
"""
await self._lock.acquire(self._priority) # Acquire the lock with priority
return self._lock
async def __aexit__(self, exc_type, exc, tb):
"""
Async context manager exit. Releases the lock and optionally updates the priority threshold.
Parameters:
-----------
exc_type : Exception or None
The exception type, if any, raised in the 'async with' block.
exc : Exception or None
The exception instance, if any, raised in the 'async with' block.
tb : traceback or None
The traceback object, if any, associated with the exception.
"""
await self._lock.__aexit__(exc_type, exc, tb, self._update_priority_threshold) # Release the lock
async def _run_task_with_priority_lock(
task: Callable, lock: asyncio.Lock, priority: int,
update_priority_threshold: bool, *args, **kwargs):
"""
Runs a task within the context of a PriorityLock, ensuring proper acquisition and release.
Parameters:
-----------
task : Callable
The async function representing the task to be executed.
lock : PriorityLock
The PriorityLock instance managing access.
priority : int
The priority level for the task.
update_priority_threshold : bool
Whether to update the priority threshold after releasing the lock.
*args, **kwargs:
Additional arguments to pass to the task function.
Returns:
--------
result : Any
The result of the task execution.
"""
async with PriorityLockManager(lock, priority, update_priority_threshold): # Acquire the lock based on priority
return await task(*args, **kwargs)

View File

@@ -0,0 +1,14 @@
from vllm_mlu._mlu_utils import *
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
from . import distributed
from . import engine
from . import executor
from . import model_executor
from . import worker
logger.info("Apply Chunked Pipeline Parallel Demo!")

View File

@@ -0,0 +1,2 @@
# hijack vllm models
from .models import custom, llama

View File

@@ -0,0 +1,25 @@
from typing import Any, List, Tuple
import torch
from vllm.distributed.parallel_state import TensorMetadata
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from vllm_mlu.model_executor.custom_model.custom import CustomForCausalLM
def vllm__module_executor__models__custom_model__CustomForCausalLM__get_intermediate_tensor_metadata(
self,
batch_size: int,
dtype: torch.dtype,
device: torch.device,
) -> List[Tuple[str, Any]]:
metadata_list: List[Tuple[str, Any]] = []
size = torch.Size([batch_size, self.config.hidden_size])
metadata_list.append(("hidden_states", TensorMetadata(device.type, dtype, size)))
metadata_list.append(("residual", None))
return metadata_list
MluHijackObject.apply_hijack(
CustomForCausalLM,
"get_intermediate_tensor_metadata",
vllm__module_executor__models__custom_model__CustomForCausalLM__get_intermediate_tensor_metadata
)

View File

@@ -0,0 +1,24 @@
from typing import Any, List, Tuple
import torch
from vllm.distributed.parallel_state import TensorMetadata
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from vllm.model_executor.models.llama import LlamaForCausalLM
def vllm__module_executor__models__llama__LlamaForCausalLM__get_intermediate_tensor_metadata(
self,
batch_size: int,
dtype: torch.dtype,
device: torch.device,
) -> List[Tuple[str, Any]]:
metadata_list: List[Tuple[str, Any]] = []
size = torch.Size([batch_size, self.config.hidden_size])
metadata_list.append(("hidden_states", TensorMetadata(device.type, dtype, size)))
return metadata_list
MluHijackObject.apply_hijack(
LlamaForCausalLM,
"get_intermediate_tensor_metadata",
vllm__module_executor__models__llama__LlamaForCausalLM__get_intermediate_tensor_metadata
)

View File

@@ -0,0 +1,3 @@
from . import mlu_model_runner
from . import model_runner
from . import worker_base

View File

@@ -0,0 +1,176 @@
import weakref
from typing import (List, Optional)
import torch
import torch.distributed
from vllm.compilation.compile_context import set_compile_context
from vllm.distributed import get_pp_group
from vllm.inputs import INPUT_REGISTRY
from vllm.lora.request import LoRARequest
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.sampling_params import SamplingParams
from vllm.sequence import SequenceGroupMetadata
from vllm.worker.model_runner import (
TModelInputForGPU,
LORA_WARMUP_RANK,
_BATCH_SIZES_TO_CAPTURE
)
from vllm.worker.mlu_model_runner import (
MLUModelRunnerBase,
ModelInputForMLUBuilder
)
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
@torch.inference_mode()
def vllm__worker__mlu_model_runner__MLUModelRunnerBase__profile_run(self) -> None:
# Enable top-k sampling to reflect the accurate memory usage.
sampling_params = SamplingParams(top_p=0.99, top_k=self.vocab_size - 1)
max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens
max_num_seqs = self.scheduler_config.max_num_seqs
# This represents the maximum number of different requests
# that will have unique loras, an therefore the max amount of memory
# consumption create dummy lora request copies from the lora request
# passed in, which contains a lora from the lora warmup path.
dummy_lora_requests: List[LoRARequest] = []
dummy_lora_requests_per_seq: List[LoRARequest] = []
if self.lora_config:
assert self.lora_manager is not None
with self.lora_manager.dummy_lora_cache():
for idx in range(self.lora_config.max_loras):
lora_id = idx + 1
dummy_lora_request = LoRARequest(
lora_name=f"warmup_{lora_id}",
lora_int_id=lora_id,
lora_path="/not/a/real/path",
)
self.lora_manager.add_dummy_lora(dummy_lora_request,
rank=LORA_WARMUP_RANK)
dummy_lora_requests.append(dummy_lora_request)
dummy_lora_requests_per_seq = [
dummy_lora_requests[idx % len(dummy_lora_requests)]
for idx in range(max_num_seqs)
]
# Profile memory usage with max_num_sequences sequences and the total
# number of tokens equal to max_num_batched_tokens.
seqs: List[SequenceGroupMetadata] = []
# Additional GPU memory may be needed for multi-modal encoding, which
# needs to be accounted for when calculating the GPU blocks for
# vLLM blocker manager.
# To exercise the worst scenario for GPU memory consumption,
# the number of seqs (batch_size) is chosen to maximize the number
# of images processed.
max_mm_tokens = self.mm_registry.get_max_multimodal_tokens(
self.model_config)
if max_mm_tokens > 0:
max_num_seqs_orig = max_num_seqs
max_num_seqs = min(max_num_seqs,
max_num_batched_tokens // max_mm_tokens)
if max_num_seqs < 1:
expr = (f"min({max_num_seqs_orig}, "
f"{max_num_batched_tokens} // {max_mm_tokens})")
logger.warning(
"Computed max_num_seqs (%s) to be less than 1. "
"Setting it to the minimum value of 1.", expr)
max_num_seqs = 1
batch_size = 0
for group_id in range(max_num_seqs):
seq_len = (max_num_batched_tokens // max_num_seqs +
(group_id < max_num_batched_tokens % max_num_seqs))
batch_size += seq_len
dummy_data = self.input_registry \
.dummy_data_for_profiling(self.model_config,
seq_len,
self.mm_registry)
seq = SequenceGroupMetadata(
request_id=str(group_id),
is_prompt=True,
seq_data={group_id: dummy_data.seq_data},
sampling_params=sampling_params,
block_tables=None,
lora_request=dummy_lora_requests_per_seq[group_id]
if dummy_lora_requests_per_seq else None,
multi_modal_data=dummy_data.multi_modal_data,
multi_modal_placeholders=dummy_data.multi_modal_placeholders,
)
seqs.append(seq)
# Run the model with the dummy inputs.
num_layers = self.model_config.get_num_layers(self.parallel_config)
# use an empty tensor instead of `None`` to force Dynamo to pass
# it by reference, rather by specializing on the value ``None``.
# the `dtype` argument does not matter, and we use `float32` as
# a placeholder (it has wide hardware support).
# it is important to create tensors inside the loop, rather than
# multiplying the list, to avoid Dynamo from treating them as
# tensor aliasing.
'''
=============================
Modify by vllm_mlu
=============================
@brief: support kv cache int8
'''
kv_caches = []
for _ in range(num_layers):
kv_cache_ = torch.tensor([], dtype=torch.float32, device=self.device)
kv_cache_scale_ = torch.tensor([], dtype=torch.float32, device=self.device)
kv_caches.append([kv_cache_, kv_cache_scale_])
'''
==================
End of MLU Hijack
==================
'''
finished_requests_ids = [seq.request_id for seq in seqs]
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: Add two parameters: prefill_loc and token_chunk_size.
"""
token_chunk_sizes = [seq.token_chunk_size for seq in seqs]
model_input = self.prepare_model_input(
seqs,
finished_requests_ids=finished_requests_ids,
prefill_locs=[0]*len(seqs),
token_chunk_sizes=token_chunk_sizes,
)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
intermediate_tensors = None
if not get_pp_group().is_first_rank:
intermediate_tensors = self.model.make_empty_intermediate_tensors(
batch_size=batch_size,
dtype=self.model_config.dtype,
device=self.device)
graph_batch_size = self.max_batchsize_to_capture
batch_size_capture_list = [
bs for bs in _BATCH_SIZES_TO_CAPTURE if bs <= graph_batch_size
]
if self.model_config.enforce_eager:
batch_size_capture_list = []
with set_compile_context(batch_size_capture_list):
self.execute_model(model_input, kv_caches, intermediate_tensors)
torch.mlu.synchronize()
return
MluHijackObject.apply_hijack(
MLUModelRunnerBase,
MLUModelRunnerBase.profile_run,
vllm__worker__mlu_model_runner__MLUModelRunnerBase__profile_run
)

View File

@@ -0,0 +1,304 @@
import dataclasses
import weakref
from typing import (List, Optional, TypeVar)
from vllm.distributed import get_pp_group
from vllm.model_executor import SamplingMetadata
from vllm.sequence import SequenceGroupMetadata
from vllm.worker.model_runner import (
GPUModelRunnerBase,
ModelInputForGPUBuilder,
ModelInputForGPUWithSamplingMetadata,
ModelRunner,
TModelInputForGPU
)
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: Add two parameters, prefill_loc and token_chunk_size.
"""
def vllm__worker__model_runner__ModelInputForGPUBuilder___compute_lens(
self, inter_data: ModelInputForGPUBuilder.InterDataForSeqGroup,
seq_idx: int, seq_group_metadata: SequenceGroupMetadata,
prefill_loc: Optional[int] = None,
token_chunk_size: Optional[int] = None,
):
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
"""Compute context length, sequence length and tokens
for the given sequence data.
"""
seq_data = seq_group_metadata.seq_data[inter_data.seq_ids[seq_idx]]
if token_chunk_size is None:
token_chunk_size = seq_group_metadata.token_chunk_size
# Compute context length (the number of tokens that are
# already computed) and sequence length (total number of tokens).
seq_len = seq_data.get_len()
if inter_data.is_prompt:
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: For chunked pipeline parallel, since multiple tasks
use the same sequence data with different prefill location,
an extra parameter is provided to indicate the prefill location.
"""
context_len = (
prefill_loc if prefill_loc is not None
else seq_data.get_num_computed_tokens()
)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
seq_len = min(seq_len, context_len + token_chunk_size)
elif self.runner.scheduler_config.is_multi_step or \
self.runner.model_config.is_encoder_decoder:
assert prefill_loc is None, "Chunked Parallel Pipeline does not support multi-step."
context_len = seq_len - 1
else:
context_len = seq_data.get_num_computed_tokens()
# Compute tokens.
tokens = seq_data.get_token_ids()[context_len:seq_len]
inter_data.seq_lens[seq_idx] = seq_len
inter_data.orig_seq_lens[seq_idx] = seq_len
inter_data.context_lens[seq_idx] = context_len
inter_data.input_tokens[seq_idx].extend(tokens)
inter_data.input_positions[seq_idx].extend(range(context_len, seq_len))
inter_data.query_lens[seq_idx] = seq_len - context_len
if seq_data.mrope_position_delta is not None:
if inter_data.mrope_input_positions is None:
inter_data.mrope_input_positions = [None] * inter_data.n_seqs
inter_data.mrope_input_positions[
seq_idx] = MRotaryEmbedding.get_next_input_positions(
seq_data.mrope_position_delta,
context_len,
seq_len,
)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: Add two parameters, prefill_loc and token_chunk_size.
"""
def vllm__worker__model_runner__ModelInputForGPUBuilder__add_seq_group(
self, seq_group_metadata: SequenceGroupMetadata,
prefill_loc: Optional[int] = None,
token_chunk_size: Optional[int] = None,
):
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
"""Add a sequence group to the builder."""
seq_ids = seq_group_metadata.seq_data.keys()
n_seqs = len(seq_ids)
is_prompt = seq_group_metadata.is_prompt
if is_prompt:
assert n_seqs == 1
self.decode_only = False
encoder_seq_len = 0
if self.runner.model_config.is_encoder_decoder:
encoder_seq_len = seq_group_metadata.encoder_seq_data.get_len()
inter_data = self.init_cached_inter_data(
request_id=seq_group_metadata.request_id,
seq_ids=seq_ids,
is_prompt=is_prompt,
block_tables=seq_group_metadata.block_tables,
computed_block_nums=seq_group_metadata.computed_block_nums,
reinit=True,
reinit_use_defaults=True,
encoder_seq_len=encoder_seq_len)
self.inter_data_list.append(inter_data)
for seq_idx in range(n_seqs):
for per_seq_fn in self.per_seq_compute_fns:
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: Add prefill location and token chunk size parameters.
"""
if per_seq_fn.__qualname__ == \
"vllm__worker__model_runner__ModelInputForGPUBuilder___compute_lens":
per_seq_fn(inter_data, seq_idx, seq_group_metadata, prefill_loc, token_chunk_size)
else:
per_seq_fn(inter_data, seq_idx, seq_group_metadata)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
for per_seq_group_fn in self.per_seq_group_compute_fns:
per_seq_group_fn(inter_data, seq_group_metadata)
def vllm__worker__model_runner__GPUModelRunnerBase___prepare_model_input_tensors(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
finished_requests_ids: Optional[List[str]] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> TModelInputForGPU:
"""Helper method to prepare the model input based on a given sequence
group. Prepares metadata needed for the base model forward pass but not
metadata for possible additional steps, e.g., sampling.
The API assumes seq_group_metedata_list is sorted by prefill -> decode.
The result tensors and data structure also batches input in prefill
-> decode order. For example,
- input_tokens[:num_prefill_tokens] contains prefill tokens.
- input_tokens[num_prefill_tokens:] contains decode tokens.
If cuda graph is required, this API automatically pads inputs.
"""
builder = self._builder_cls(weakref.proxy(self), finished_requests_ids)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: Add two parameters: prefill_loc and token_chunk_size, and
check whether they are same as sequence group length or empty.
"""
if prefill_locs is None:
prefill_locs = [None] * len(seq_group_metadata_list)
assert len(prefill_locs) == len(seq_group_metadata_list), \
"the lengths of prefill locs and seq_group_metadata are different."
if token_chunk_sizes is None:
token_chunk_sizes = [None] * len(seq_group_metadata_list)
assert len(token_chunk_sizes) == len(seq_group_metadata_list), \
"the lengths of token_chunk_sizes and seq_group_metadata are different."
for seq_group_metadata, prefill_loc, token_chunk_size in zip(
seq_group_metadata_list, prefill_locs, token_chunk_sizes
):
builder.add_seq_group(seq_group_metadata, prefill_loc, token_chunk_size)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
builder.reset_cached_inter_data()
return builder.build() # type: ignore
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: Add two parameters, prefill_loc and token_chunk_size.
"""
def vllm__worker__model_runner__ModelRunner__prepare_model_input(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
virtual_engine: int = 0,
finished_requests_ids: Optional[List[str]] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[List[int]] = None,
) -> ModelInputForGPUWithSamplingMetadata:
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
"""Prepare the model input based on a given sequence group, including
metadata for the sampling step.
The API assumes seq_group_metadata_list is sorted by prefill -> decode.
The result tensors and data structure also batches input in prefill
-> decode order. For example,
- input_tokens[:num_prefill_tokens] contains prefill tokens.
- input_tokens[num_prefill_tokens:] contains decode tokens.
If cuda graph is required, this API automatically pads inputs.
"""
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
Add prefill location parameter.
"""
model_input = self._prepare_model_input_tensors(
seq_group_metadata_list,
finished_requests_ids,
prefill_locs,
token_chunk_sizes)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
if get_pp_group().is_last_rank:
# Sampling metadata is only required for the final pp group
generators = self.get_generators(finished_requests_ids)
sampling_metadata = SamplingMetadata.prepare(
seq_group_metadata_list, model_input.seq_lens,
model_input.query_lens, self.device, self.pin_memory,
generators, self.sampling_metadata_cache)
else:
sampling_metadata = None
is_prompt = (seq_group_metadata_list[0].is_prompt
if seq_group_metadata_list else None)
return dataclasses.replace(model_input,
sampling_metadata=sampling_metadata,
is_prompt=is_prompt,
virtual_engine=virtual_engine)
MluHijackObject.apply_hijack(
ModelInputForGPUBuilder,
ModelInputForGPUBuilder._compute_lens,
vllm__worker__model_runner__ModelInputForGPUBuilder___compute_lens
)
MluHijackObject.apply_hijack(
ModelInputForGPUBuilder,
ModelInputForGPUBuilder.add_seq_group,
vllm__worker__model_runner__ModelInputForGPUBuilder__add_seq_group
)
MluHijackObject.apply_hijack(
GPUModelRunnerBase,
GPUModelRunnerBase._prepare_model_input_tensors,
vllm__worker__model_runner__GPUModelRunnerBase___prepare_model_input_tensors
)
MluHijackObject.apply_hijack(
ModelRunner,
ModelRunner.prepare_model_input,
vllm__worker__model_runner__ModelRunner__prepare_model_input
)

View File

@@ -0,0 +1,219 @@
import dataclasses
import importlib
import os
import time
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
import torch
from vllm.distributed import broadcast_tensor_dict, get_pp_group, get_tp_group
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.platforms import current_platform
from vllm.sequence import (ExecuteModelRequest, IntermediateTensors)
from vllm.utils import (enable_trace_function_call_for_thread,
update_environment_variables)
from vllm.worker.model_runner_base import (BroadcastableModelInput,
ModelRunnerBase,
ModelRunnerInputBase)
from vllm.worker.worker_base import (LocalOrDistributedWorkerBase,
WorkerInput,
extract_previous_hidden_states)
from vllm_mlu.mlu_hijack_utils import MluHijackObject
from examples.cambricon_custom_func.vllm.mlu_hijack.common import init_logger
logger = init_logger(__name__)
def vllm__worker__worker_base__LocalOrDistributedWorkerBase___get_driver_input_and_broadcast(
self, execute_model_req: ExecuteModelRequest,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[int] = None,
) -> Tuple[BroadcastableModelInput, WorkerInput, Dict[str, torch.Tensor]]:
""" Get the driver input and broadcast it to other workers. """
assert self.is_driver_worker
worker_input: WorkerInput = self.prepare_worker_input(
execute_model_req=execute_model_req)
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
Pass prefill location and chunk size parameters.
"""
model_input: ModelRunnerInputBase = (
self.model_runner.prepare_model_input(
execute_model_req.seq_group_metadata_list,
execute_model_req.virtual_engine,
execute_model_req.finished_requests_ids,
prefill_locs,
token_chunk_sizes))
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
kwargs = extract_previous_hidden_states(execute_model_req)
if self.do_metadata_broadcast:
broadcast_data = worker_input.as_broadcastable_tensor_dict()
broadcast_data.update(model_input.as_broadcastable_tensor_dict())
broadcast_data.update(kwargs)
broadcast_tensor_dict(broadcast_data, src=0)
if execute_model_req.async_callback:
model_input = dataclasses.replace( # type: ignore
model_input,
async_callback=execute_model_req.async_callback)
return model_input, worker_input, kwargs
def vllm__worker__worker_base__LocalOrDistributedWorkerBase__prepare_input(
self,
execute_model_req: Optional[ExecuteModelRequest] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[int] = None,
) -> Optional[Tuple[BroadcastableModelInput, WorkerInput, Dict[
str, torch.Tensor]]]:
"""
Prepare the inputs to ModelRunner and workers.
"""
if self.is_driver_worker:
if execute_model_req is None:
if self.do_metadata_broadcast:
# This signals that there's no more requests to process for
# now. All workers are running infinite loop with
# broadcast_tensor_dict, and it stops the loop when the
# driver broadcasts an empty input. Send an empty input to
# notify all other workers to stop their execution loop.
broadcast_tensor_dict({}, src=0)
return None
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
Pass prefill location and chunk size parameters.
"""
return self._get_driver_input_and_broadcast(
execute_model_req, prefill_locs, token_chunk_sizes)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
else:
return self._get_worker_input_from_broadcast()
def vllm__worker__worker_base__LocalOrDistributedWorkerBase__execute_model(
self,
execute_model_req: Optional[ExecuteModelRequest] = None,
prefill_locs: Optional[List[int]] = None,
token_chunk_sizes: Optional[int] = None,
priority: int = -1,
) -> Optional[List[SamplerOutput]]:
"""Executes at least one model step on the given sequences, unless no
sequences are provided."""
start_time = time.perf_counter()
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
Pass prefill location and chunk size parameters.
"""
inputs = self.prepare_input(execute_model_req, prefill_locs, token_chunk_sizes)
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
if inputs is None:
return None
model_input, worker_input, kwargs = inputs
num_steps = worker_input.num_steps
self.execute_worker(worker_input)
# If there is no input, we don't need to execute the model.
if worker_input.num_seq_groups == 0:
return []
"""
======================================
Modified by Chunked Parallel Pipeline.
======================================
@brief: To prevent the execution of mlu pipeline interrupted by host communication,
cancel the host communication and prepare metadata list directly.
"""
assert (token_chunk_sizes is not None and len(token_chunk_sizes) == 1)
batch_size = token_chunk_sizes[0]
metadata_list = self.model_runner.model.get_intermediate_tensor_metadata(
batch_size,
dtype=self.model_runner.model_config.dtype,
device=self.model_runner.device)
intermediate_tensors = None
orig_model_execute_time = 0.0
if not get_pp_group().is_first_rank:
intermediate_tensors = IntermediateTensors(
get_pp_group().recv_tensor_dict(
all_gather_group=get_tp_group(),
recv_metadata_list=metadata_list))
if (self.observability_config is not None
and self.observability_config.collect_model_execute_time):
orig_model_execute_time = intermediate_tensors.tensors.get(
"model_execute_time", torch.tensor(0)).item()
"""
======================================
End by Chunked Parallel Pipeline.
======================================
"""
output = self.model_runner.execute_model(
model_input=model_input,
kv_caches=self.kv_cache[worker_input.virtual_engine]
if self.kv_cache is not None else None,
intermediate_tensors=intermediate_tensors,
num_steps=num_steps,
**kwargs,
)
model_execute_time = time.perf_counter() - start_time
if not get_pp_group().is_last_rank:
# output is IntermediateTensors
if (self.observability_config is not None
and self.observability_config.collect_model_execute_time):
output.tensors["model_execute_time"] = torch.tensor(
model_execute_time + orig_model_execute_time)
get_pp_group().send_tensor_dict(output.tensors,
all_gather_group=get_tp_group())
return [None]
if (self.observability_config is not None
and self.observability_config.collect_model_execute_time
and output is not None):
for o in output:
o.model_execute_time = (orig_model_execute_time +
model_execute_time)
# output is List[SamplerOutput]
return output
MluHijackObject.apply_hijack(
LocalOrDistributedWorkerBase,
LocalOrDistributedWorkerBase.prepare_input,
vllm__worker__worker_base__LocalOrDistributedWorkerBase__prepare_input
)
MluHijackObject.apply_hijack(
LocalOrDistributedWorkerBase,
LocalOrDistributedWorkerBase._get_driver_input_and_broadcast,
vllm__worker__worker_base__LocalOrDistributedWorkerBase___get_driver_input_and_broadcast
)
MluHijackObject.apply_hijack(
LocalOrDistributedWorkerBase,
LocalOrDistributedWorkerBase.execute_model,
vllm__worker__worker_base__LocalOrDistributedWorkerBase__execute_model
)