Files
xc-llm-ascend/vllm_ascend/patch/platform/patch_distributed.py
zzzzwwjj 136ea9ff56 [refact] unified soc_version code (#4359)
### What this PR does / why we need it?

Currently, there are two paths to judge the chip type in code,
`get_ascend_soc_version` use `get_soc_version` api in torch_npu, and
`is_310p` `use _build_info.__soc_version__`, which generate when
install. We need to unify the two paths.

We need to unify these codes based on the following points:

1. We need to ensure consistency in chip type judgment between compiling
and running states;
2. In compiling state, we need chip type to complete op's compilation,
but in running state, we only need device
type(910B/910_93/310P/910_95/etc) to make code branch judgement;
3. In compiling state, torch_npu may not have been installed yet, so we
can't use torch_npu's api.

Based on the above points, we have made the following changes:

1. When user set env `SOC_VERSION`, use it; when not set, query
soc_version by `npu-smi`;
2. generate device_type based on soc_version when compiling, and write
`__device_type__` instead of `__soc_version__` in `_build_info.py`;
3. In running state, use `__device_type__` to judge code branch.

### Does this PR introduce _any_ user-facing change?

When not set env `SOC_VERSION`, it will not be `ASCEND910B1` by default,
we will query soc_version by `npu-smi`. And env `SOC_VERSION` must be in
the list `soc_to_device` in `setup.py`.

- vLLM version: v0.11.0
- vLLM main:
2918c1b49c

Signed-off-by: zzzzwwjj <1183291235@qq.com>
2025-11-26 14:28:55 +08:00

116 lines
4.0 KiB
Python

#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# Copyright 2023 The vLLM team.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Adapted from vllm/model_executor/models/qwen2_vl.py
# This file is a part of the vllm-ascend project.
import torch
import vllm.envs as envs_vllm
from vllm.config import ParallelConfig
from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type
def parallel_config_get_dp_port(self) -> int:
"""
We might need to initialize process groups in multiple
processes that is related to data parallelism,
e.g. both in the worker and in the engine, which
can live in different processes. To avoid port conflicts, we
increment the port number each time we need to initialize a
new process group related to data parallelism.
"""
answer = self.data_parallel_master_port
self.data_parallel_master_port += 1
# NOTE: Get port from envs directly when using torchrun
port = envs_vllm.VLLM_DP_MASTER_PORT if envs_vllm.VLLM_DP_MASTER_PORT else answer
return port
ParallelConfig.get_next_dp_init_port = parallel_config_get_dp_port
class NullHandle:
def __init__(self):
pass
def wait(self):
pass
def communication_adaptation_310p():
def broadcast310p_wrapper(fn):
def broadcast310p(tensor, src, group=None, async_op=False):
if tensor.device == torch.device('cpu'):
return fn(tensor, src, group, async_op)
rank = torch.distributed.get_rank(group)
world_size = torch.distributed.get_world_size(group)
tensor_list = [torch.empty_like(tensor) for _ in range(world_size)]
tensor_list[rank] = tensor
torch.distributed.all_gather(tensor_list, tensor, group=group)
tensor[...] = tensor_list[src]
if async_op:
return NullHandle()
else:
return None
return broadcast310p
torch.distributed.broadcast = broadcast310p_wrapper(
torch.distributed.broadcast)
torch.distributed.distributed_c10d.broadcast = broadcast310p_wrapper(
torch.distributed.distributed_c10d.broadcast)
def all_reduce_wrapper_310p(fn):
def all_reduce(
tensor,
op=torch.distributed.ReduceOp.SUM,
group=None,
async_op=False,
):
if tensor.dtype != torch.int64:
return fn(tensor, op, group, async_op)
rank = torch.distributed.get_rank(group)
world_size = torch.distributed.get_world_size(group)
tensor_list = [torch.empty_like(tensor) for _ in range(world_size)]
tensor_list[rank] = tensor
torch.distributed.all_gather(tensor_list, tensor, group=group)
if op == torch.distributed.ReduceOp.SUM:
return torch.stack(tensor_list).sum(0)
elif op == torch.distributed.ReduceOp.MAX:
return torch.tensor(
torch.stack(tensor_list).cpu().numpy().max(0),
device=tensor.device,
)
else:
raise RuntimeError(f"not implement op {op}")
return all_reduce
torch.distributed.all_reduce = all_reduce_wrapper_310p(
torch.distributed.all_reduce)
torch.distributed.distributed_c10d.all_reduce = all_reduce_wrapper_310p(
torch.distributed.distributed_c10d.all_reduce)
if get_ascend_device_type() == AscendDeviceType._310P:
communication_adaptation_310p()