[Bugfix] Tweak distributed process group initialization and add dummy… (#816)
fix batch execution method to enable DP in V1 Signed-off-by: Yizhou Liu <liu_yizhou@outlook.com>
This commit is contained in:
@@ -47,7 +47,7 @@ def ascend_destroy_model_parallel():
|
|||||||
destory_ascend_model_parallel()
|
destory_ascend_model_parallel()
|
||||||
|
|
||||||
|
|
||||||
def ascend_stateless_init_torch_distributed_process_group(
|
def stateless_init_torch_distributed_process_group(
|
||||||
host: str, port: int, rank: int, world_size: int,
|
host: str, port: int, rank: int, world_size: int,
|
||||||
backend: str) -> ProcessGroup:
|
backend: str) -> ProcessGroup:
|
||||||
"""
|
"""
|
||||||
@@ -96,10 +96,16 @@ def ascend_stateless_init_torch_distributed_process_group(
|
|||||||
# different systems (e.g. RPC) in case the store is multi-tenant.
|
# different systems (e.g. RPC) in case the store is multi-tenant.
|
||||||
prefix_store = PrefixStore(init_method, store)
|
prefix_store = PrefixStore(init_method, store)
|
||||||
|
|
||||||
|
# TODO(Yizhou): The reason we need to set options while vllm does not
|
||||||
|
# seems to be related to the version of PyTorch. In the latest version,
|
||||||
|
# there is no need to set options. While in the older version, 2.5.1
|
||||||
|
# specifically, we need to set options.
|
||||||
|
options = ProcessGroup.Options(backend=backend)
|
||||||
pg: ProcessGroup = ProcessGroup(
|
pg: ProcessGroup = ProcessGroup(
|
||||||
prefix_store,
|
prefix_store,
|
||||||
group_rank,
|
group_rank,
|
||||||
group_size,
|
group_size,
|
||||||
|
options,
|
||||||
)
|
)
|
||||||
if backend == "gloo":
|
if backend == "gloo":
|
||||||
from torch.distributed.distributed_c10d import ProcessGroupGloo
|
from torch.distributed.distributed_c10d import ProcessGroupGloo
|
||||||
@@ -136,7 +142,10 @@ def ascend_stateless_init_torch_distributed_process_group(
|
|||||||
else:
|
else:
|
||||||
raise RuntimeError(f"Unsupported torch distributed backend: {backend}")
|
raise RuntimeError(f"Unsupported torch distributed backend: {backend}")
|
||||||
|
|
||||||
pg._set_default_backend(backend_type)
|
# TODO(Yizhou): Like we mentioned above, _set_default_backend is not
|
||||||
|
# implemented in the 2.5.1 version of PyTorch. But we need to set it
|
||||||
|
# after the latest version is released.
|
||||||
|
# pg._set_default_backend(backend_type)
|
||||||
backend_class._set_sequence_number_for_group()
|
backend_class._set_sequence_number_for_group()
|
||||||
|
|
||||||
pg._register_backend(device, backend_type, backend_class)
|
pg._register_backend(device, backend_type, backend_class)
|
||||||
@@ -163,20 +172,21 @@ def parallel_config_get_dp_port(self) -> int:
|
|||||||
|
|
||||||
|
|
||||||
def ascend_stateless_init_dp_group(self) -> "ProcessGroup":
|
def ascend_stateless_init_dp_group(self) -> "ProcessGroup":
|
||||||
from vllm.distributed.utils import \
|
# TODO(Yizhou): Currently we have to set the backend to gloo
|
||||||
stateless_init_torch_distributed_process_group
|
# because in vllm.config.ParallelConfig.has_unfinished_dp the
|
||||||
|
# device is set to cpu. We need to fix this in the future.
|
||||||
|
# We need to compare the performance of gloo and hccl and then
|
||||||
|
# decide which one to use.
|
||||||
dp_group = stateless_init_torch_distributed_process_group(
|
dp_group = stateless_init_torch_distributed_process_group(
|
||||||
self.data_parallel_master_ip,
|
self.data_parallel_master_ip,
|
||||||
self.get_next_dp_init_port(),
|
self.get_next_dp_init_port(),
|
||||||
self.data_parallel_rank,
|
self.data_parallel_rank,
|
||||||
self.data_parallel_size,
|
self.data_parallel_size,
|
||||||
backend="hccl")
|
backend="gloo")
|
||||||
|
|
||||||
return dp_group
|
return dp_group
|
||||||
|
|
||||||
|
|
||||||
vllm.distributed.parallel_state.destroy_model_parallel = ascend_destroy_model_parallel
|
vllm.distributed.parallel_state.destroy_model_parallel = ascend_destroy_model_parallel
|
||||||
vllm.distributed.stateless_init_torch_distributed_process_group = ascend_stateless_init_torch_distributed_process_group
|
|
||||||
ParallelConfig.get_next_dp_init_port = parallel_config_get_dp_port
|
ParallelConfig.get_next_dp_init_port = parallel_config_get_dp_port
|
||||||
ParallelConfig.stateless_init_dp_group = ascend_stateless_init_dp_group
|
ParallelConfig.stateless_init_dp_group = ascend_stateless_init_dp_group
|
||||||
|
|||||||
@@ -216,6 +216,9 @@ class NPUWorker(WorkerBase):
|
|||||||
else:
|
else:
|
||||||
self.profiler.stop()
|
self.profiler.stop()
|
||||||
|
|
||||||
|
def execute_dummy_batch(self) -> None:
|
||||||
|
self.model_runner._dummy_run(1)
|
||||||
|
|
||||||
def _init_worker_distributed_environment(self) -> None:
|
def _init_worker_distributed_environment(self) -> None:
|
||||||
"""Initialize the distributed environment."""
|
"""Initialize the distributed environment."""
|
||||||
additional_config = self.vllm_config.additional_config
|
additional_config = self.vllm_config.additional_config
|
||||||
|
|||||||
Reference in New Issue
Block a user