[main2main] upgrade vllm to 0308 (#7213)

### What this PR does / why we need it?
Update main2main to vllm 0308.
breaks:

* https://github.com/vllm-project/vllm/pull/30681
* https://github.com/vllm-project/vllm/pull/35552 remove
self.cudagraph_batch_sizes
* https://github.com/vllm-project/vllm/pull/35158 clear_metadata ->
defer_finalize
* https://github.com/vllm-project/vllm/pull/36006 remove
CacheConfig.cpu_offload_gb
* https://github.com/vllm-project/vllm/pull/35472
* https://github.com/vllm-project/vllm/pull/34552 attn_metadata_builder
* https://github.com/vllm-project/vllm/pull/30515 profile_seq_lens
* https://github.com/vllm-project/vllm/pull/28053 

- vLLM version: v0.16.0
- vLLM main:
4034c3d32e

---------

Signed-off-by: MrZ20 <2609716663@qq.com>
Signed-off-by: menogrey <1299267905@qq.com>
Co-authored-by: MrZ20 <2609716663@qq.com>
This commit is contained in:
zhangyiming
2026-03-18 09:24:43 +08:00
committed by GitHub
parent 79ef41a53d
commit 1c954ff264
16 changed files with 223 additions and 168 deletions

View File

@@ -22,6 +22,7 @@ import vllm_ascend.patch.platform.patch_kv_cache_interface # noqa
import vllm_ascend.patch.platform.patch_mamba_config # noqa
import vllm_ascend.patch.platform.patch_minimax_m2_config # noqa
import vllm_ascend.patch.platform.patch_sched_yield # noqa
import vllm_ascend.patch.platform.patch_torch_accelerator # noqa
if os.getenv("DYNAMIC_EPLB", "false").lower() in ("true", "1") or os.getenv("EXPERT_MAP_RECORD", "false") == "true":
import vllm_ascend.patch.platform.patch_multiproc_executor # noqa

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import threading
import weakref
from collections import deque
@@ -19,6 +21,8 @@ from vllm.v1.executor.multiproc_executor import (
set_multiprocessing_worker_envs,
)
from vllm_ascend.utils import vllm_version_is
class AscendMultiprocExecutor(MultiprocExecutor):
def _init_executor(self) -> None:
@@ -26,7 +30,8 @@ class AscendMultiprocExecutor(MultiprocExecutor):
# and ensure workers will be terminated.
self._finalizer = weakref.finalize(self, self.shutdown)
self.is_failed = False
self.shutdown_event = threading.Event()
if vllm_version_is("0.17.0"):
self.shutdown_event = threading.Event()
self.failure_callback: FailureCallback | None = None
tensor_parallel_size, pp_parallel_size, pcp_parallel_size = self._get_parallel_sizes()
@@ -66,11 +71,31 @@ class AscendMultiprocExecutor(MultiprocExecutor):
success = False
try:
global_start_rank = self.local_world_size * self.parallel_config.node_rank_within_dp
for local_rank in range(self.local_world_size):
global_rank = global_start_rank + local_rank
is_driver_worker = self._is_driver_worker(global_rank)
unready_workers.append(
AscendWorkerProc.make_worker_process(
if vllm_version_is("0.17.0"):
for local_rank in range(self.local_world_size):
global_rank = global_start_rank + local_rank
is_driver_worker = self._is_driver_worker(global_rank)
unready_workers.append(
AscendWorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=global_rank,
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
is_driver_worker=is_driver_worker,
)
)
else:
# When using fork, keep track of socket file descriptors that are
# inherited by the worker, so that we can close them in subsequent
# workers
inherited_fds: list[int] | None = [] if context.get_start_method() == "fork" else None
for local_rank in range(self.local_world_size):
global_rank = global_start_rank + local_rank
is_driver_worker = self._is_driver_worker(global_rank)
unready_worker_handle = AscendWorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=global_rank,
@@ -78,8 +103,12 @@ class AscendMultiprocExecutor(MultiprocExecutor):
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
is_driver_worker=is_driver_worker,
inherited_fds=inherited_fds,
)
)
unready_workers.append(unready_worker_handle)
if inherited_fds is not None:
inherited_fds.append(unready_worker_handle.death_writer.fileno())
inherited_fds.append(unready_worker_handle.ready_pipe.fileno())
# Workers must be created before wait_for_ready to avoid
# deadlock, since worker.init_device() does a device sync.
@@ -124,6 +153,8 @@ class AscendMultiprocExecutor(MultiprocExecutor):
for uw in unready_workers:
if uw.death_writer is not None:
uw.death_writer.close()
if not vllm_version_is("0.17.0"):
uw.death_writer = None
self._ensure_worker_termination([uw.proc for uw in unready_workers])
self.output_rank = self._get_output_rank()
@@ -158,38 +189,76 @@ class AscendWorkerProc(WorkerProc):
input_shm_handle, # Receive SchedulerOutput
shared_worker_lock: LockType,
is_driver_worker: bool = False,
inherited_fds: list[int] | None = None,
) -> UnreadyWorkerProcHandle:
context = get_mp_context()
# (reader, writer)
reader, writer = context.Pipe(duplex=False)
if vllm_version_is("0.17.0"):
# (reader, writer)
reader, writer = context.Pipe(duplex=False)
# Create death pipe to detect parent process exit
death_reader, death_writer = context.Pipe(duplex=False)
# Create death pipe to detect parent process exit
death_reader, death_writer = context.Pipe(duplex=False)
process_kwargs = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"input_shm_handle": input_shm_handle,
"ready_pipe": (reader, writer),
"death_pipe": death_reader,
"shared_worker_lock": shared_worker_lock,
"is_driver_worker": is_driver_worker,
}
# Run EngineCore busy loop in background process.
proc = context.Process(
target=WorkerProc.worker_main,
kwargs=process_kwargs,
name=f"VllmWorker-{rank}",
daemon=False,
)
process_kwargs = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"input_shm_handle": input_shm_handle,
"ready_pipe": (reader, writer),
"death_pipe": death_reader,
"shared_worker_lock": shared_worker_lock,
"is_driver_worker": is_driver_worker,
}
# Run EngineCore busy loop in background process.
proc = context.Process(
target=WorkerProc.worker_main,
kwargs=process_kwargs,
name=f"VllmWorker-{rank}",
daemon=False,
)
proc.start()
writer.close()
# Keep death_writer open in parent - when parent exits,
# death_reader in child will get EOFError
return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)
proc.start()
writer.close()
# Keep death_writer open in parent - when parent exits,
# death_reader in child will get EOFError
return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)
else:
# Ready pipe to communicate readiness from child to parent
ready_reader, ready_writer = context.Pipe(duplex=False)
# Death pipe to let child detect parent process exit
death_reader, death_writer = context.Pipe(duplex=False)
if inherited_fds is not None:
inherited_fds = inherited_fds.copy()
inherited_fds.extend((ready_reader.fileno(), death_writer.fileno()))
process_kwargs = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"input_shm_handle": input_shm_handle,
"ready_pipe": ready_writer,
"death_pipe": death_reader,
"shared_worker_lock": shared_worker_lock,
"is_driver_worker": is_driver_worker,
# Have the worker close parent end of this worker's pipes too
"inherited_fds": inherited_fds if inherited_fds is not None else [],
}
# Run EngineCore busy loop in background process.
proc = context.Process(
target=WorkerProc.worker_main,
kwargs=process_kwargs,
name=f"VllmWorker-{rank}",
daemon=False,
)
proc.start()
# Close child ends of pipes here in the parent
ready_writer.close()
death_reader.close()
# Keep death_writer open in parent - when parent exits,
# death_reader in child will get EOFError
return UnreadyWorkerProcHandle(proc, rank, ready_reader, death_writer)
vllm.v1.executor.multiproc_executor.MultiprocExecutor = AscendMultiprocExecutor

View File

@@ -0,0 +1,8 @@
import torch
def patch_empty_cache() -> None:
torch.npu.empty_cache()
torch.accelerator.empty_cache = patch_empty_cache