From 4df8e0027c58e332660c3335c5c7b012ca2d9151 Mon Sep 17 00:00:00 2001 From: leo-pony Date: Tue, 29 Jul 2025 19:38:30 +0800 Subject: [PATCH] [e2e]Fixed the issue that pyhccl e2e cannot run continuously with other tests (#1246) ### What this PR does / why we need it? 1.Fixed the issue that pyhccl e2e cannot run continuously with other tests. 2.Cleaned up the resources occupied by the dynamic_npugraph_batchsize e2e test. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a e2e test e2e multi-cards tests local running successfully. - vLLM version: v0.9.2 - vLLM main: https://github.com/vllm-project/vllm/commit/0df4d9b06b15fa39eeb2d440e7742da93afd5e6c Signed-off-by: leo-pony --- .../test_dynamic_npugraph_batchsize.py | 36 +++++------ .../e2e/multicard/test_pyhccl_distributed.py | 59 +++++++++++-------- 2 files changed, 54 insertions(+), 41 deletions(-) diff --git a/tests/e2e/multicard/test_dynamic_npugraph_batchsize.py b/tests/e2e/multicard/test_dynamic_npugraph_batchsize.py index e5c7042..8d0ad49 100644 --- a/tests/e2e/multicard/test_dynamic_npugraph_batchsize.py +++ b/tests/e2e/multicard/test_dynamic_npugraph_batchsize.py @@ -16,7 +16,9 @@ # import pytest import torch -from vllm import LLM, SamplingParams +from vllm import SamplingParams + +from tests.e2e.conftest import VllmRunner MODELS = [ "Qwen/Qwen2.5-0.5B-Instruct", @@ -38,20 +40,20 @@ prompts = [ def test_models(model: str, tp_size: int, max_tokens: int, temperature: int, ignore_eos: bool) -> None: # Create an LLM. - llm = LLM( - model=model, - tensor_parallel_size=tp_size, - ) - # Prepare sampling_parames - sampling_params = SamplingParams( - max_tokens=max_tokens, - temperature=temperature, - ignore_eos=ignore_eos, - ) + with VllmRunner( + model_name=model, + tensor_parallel_size=tp_size, + ) as vllm_model: + # Prepare sampling_parames + sampling_params = SamplingParams( + max_tokens=max_tokens, + temperature=temperature, + ignore_eos=ignore_eos, + ) - # Generate texts from the prompts. - # The output is a list of RequestOutput objects - outputs = llm.generate(prompts, sampling_params) - torch.npu.synchronize() - # The output length should be equal to prompts length. - assert len(outputs) == len(prompts) + # Generate texts from the prompts. + # The output is a list of RequestOutput objects + outputs = vllm_model.generate(prompts, sampling_params) + torch.npu.synchronize() + # The output length should be equal to prompts length. + assert len(outputs) == len(prompts) diff --git a/tests/e2e/multicard/test_pyhccl_distributed.py b/tests/e2e/multicard/test_pyhccl_distributed.py index 42918c9..e3d9aed 100644 --- a/tests/e2e/multicard/test_pyhccl_distributed.py +++ b/tests/e2e/multicard/test_pyhccl_distributed.py @@ -24,9 +24,39 @@ from vllm.distributed.parallel_state import (get_world_group, init_distributed_environment) from vllm.utils import update_environment_variables +from tests.e2e.conftest import cleanup_dist_env_and_memory from vllm_ascend.distributed.device_communicators.pyhccl import \ PyHcclCommunicator +os.environ["TOKENIZERS_PARALLELISM"] = "true" + +multiprocessing.set_start_method("spawn", force=True) + + +def _worker_entry(env, fn): + # `multiprocessing.Process` cannot accept environment variables directly + # so we need to pass the environment variables as arguments + # and update the environment variables in the function + update_environment_variables(env) + + rank = int(os.environ['RANK']) + local_rank = int(os.environ['LOCAL_RANK']) + word_size = int(os.environ['WORLD_SIZE']) + + distributed_init_method = "tcp://localhost:12345" + + device = torch.device(f"npu:{local_rank}") + torch.npu.set_device(device) + + init_distributed_environment( + world_size=word_size, + rank=rank, + distributed_init_method=distributed_init_method, + local_rank=local_rank, + backend="hccl") + fn() + cleanup_dist_env_and_memory() + def distributed_run(fn, world_size): number_of_processes = world_size @@ -37,9 +67,7 @@ def distributed_run(fn, world_size): env['LOCAL_RANK'] = str(i) env['WORLD_SIZE'] = str(number_of_processes) env['LOCAL_WORLD_SIZE'] = str(number_of_processes) - env['MASTER_ADDR'] = 'localhost' - env['MASTER_PORT'] = '12345' - p = multiprocessing.Process(target=fn, args=(env, )) + p = multiprocessing.Process(target=_worker_entry, args=(env, fn)) processes.append(p) p.start() @@ -50,22 +78,6 @@ def distributed_run(fn, world_size): assert p.exitcode == 0 -def worker_fn_wrapper(fn): - # `multiprocessing.Process` cannot accept environment variables directly - # so we need to pass the environment variables as arguments - # and update the environment variables in the function - def wrapped_fn(env): - update_environment_variables(env) - local_rank = os.environ['LOCAL_RANK'] - device = torch.device(f"npu:{local_rank}") - torch.npu.set_device(device) - init_distributed_environment(backend="hccl") - fn() - - return wrapped_fn - - -@worker_fn_wrapper def worker_fn(): pynccl_comm = PyHcclCommunicator(get_world_group().cpu_group, device=get_world_group().device) @@ -76,11 +88,10 @@ def worker_fn(): assert torch.all(tensor == pynccl_comm.world_size).cpu().item() -# def test_pyhccl(): -# distributed_run(worker_fn, 2) +def test_pyhccl(): + distributed_run(worker_fn, 4) -@worker_fn_wrapper def broadcast_worker_fn(): # Test broadcast for every root rank. # Essentially this is an all-gather operation. @@ -106,5 +117,5 @@ def broadcast_worker_fn(): assert torch.all(recv_tensors[i] == i).cpu().item() -# def test_pyhccl_broadcast(): -# distributed_run(broadcast_worker_fn, 4) +def test_pyhccl_broadcast(): + distributed_run(broadcast_worker_fn, 4)