[bugfix] some bugs maybe fail to run (#896)

### What this PR does / why we need it?
Solve the bug that the graph mode is the same as p and d, and some other
bugs.
### Does this PR introduce _any_ user-facing change?
Wouldn't be
### How was this patch tested?
Follow the end-to-end test

Signed-off-by: ningbenzhe1 <ningbenzhe@huawei.com>
This commit is contained in:
NINGBENZHE
2025-06-03 11:07:33 +08:00
committed by GitHub
parent 92bc5576d8
commit 6ec64a3f96
7 changed files with 15 additions and 7 deletions

View File

@@ -129,6 +129,9 @@ class AscendMetadata:
attn_state: AscendAttentionState = AscendAttentionState.ChunkedPrefill attn_state: AscendAttentionState = AscendAttentionState.ChunkedPrefill
attn_mask: Optional[torch.Tensor] = None attn_mask: Optional[torch.Tensor] = None
# For logging.
num_input_tokens: int = 0 # Number of tokens including padding.
class AscendAttentionMetadataBuilder: class AscendAttentionMetadataBuilder:

View File

@@ -21,12 +21,18 @@ def get_etp_group() -> GroupCoordinator:
return _ETP return _ETP
def model_parallel_initialized():
return (_ETP is not None and _EP is not None)
def init_ascend_model_parallel( def init_ascend_model_parallel(
expert_parallel_size: int = 1, expert_parallel_size: int = 1,
expert_tensor_parallel_size: int = 1, expert_tensor_parallel_size: int = 1,
world_size: Optional[int] = None, world_size: Optional[int] = None,
backend: Optional[str] = None, backend: Optional[str] = None,
): ):
if model_parallel_initialized():
return
assert torch.distributed.is_initialized() assert torch.distributed.is_initialized()
world_size = world_size or torch.distributed.get_world_size() world_size = world_size or torch.distributed.get_world_size()
backend = backend or torch.distributed.get_backend( backend = backend or torch.distributed.get_backend(

View File

@@ -66,8 +66,7 @@ def fused_experts_with_mc2(
local_rank = torch.distributed.get_rank(group=ep_group) local_rank = torch.distributed.get_rank(group=ep_group)
all_to_all_group_size = torch.distributed.get_world_size(ep_group) all_to_all_group_size = torch.distributed.get_world_size(ep_group)
world_szie = torch.distributed.get_world_size() tp_size = get_etp_group().world_size
tp_size = world_szie // all_to_all_group_size
tp_rank = rank % tp_size tp_rank = rank % tp_size
stage1_kwargs = { stage1_kwargs = {

View File

@@ -20,6 +20,7 @@
import torch import torch
import vllm import vllm
import vllm.distributed import vllm.distributed
import vllm.envs as envs
from torch.distributed import ProcessGroup from torch.distributed import ProcessGroup
from torch.distributed.distributed_c10d import (Backend, PrefixStore, from torch.distributed.distributed_c10d import (Backend, PrefixStore,
_get_default_timeout, _get_default_timeout,
@@ -164,10 +165,9 @@ def parallel_config_get_dp_port(self) -> int:
""" """
answer = self.data_parallel_master_port answer = self.data_parallel_master_port
self.data_parallel_master_port += 1 self.data_parallel_master_port += 1
import os
# NOTE: Get port from envs directly when using torchrun # NOTE: Get port from envs directly when using torchrun
port = int(os.environ.get("MASTER_PORT", answer)) # type: ignore port = envs.VLLM_DP_MASTER_PORT if envs.VLLM_DP_MASTER_PORT else answer
return port return port

View File

@@ -173,7 +173,7 @@ class NPUWorker(WorkerBase):
scheduler_output: "SchedulerOutput", scheduler_output: "SchedulerOutput",
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
output = self.model_runner.execute_model(scheduler_output) output = self.model_runner.execute_model(scheduler_output)
return output if self.rank == 0 else None return output if self.is_driver_worker else None
def load_model(self) -> None: def load_model(self) -> None:
self.model_runner.load_model() self.model_runner.load_model()