[e2e]Fixed the issue that pyhccl e2e cannot run continuously with other tests (#1246)
### What this PR does / why we need it?
1.Fixed the issue that pyhccl e2e cannot run continuously with other
tests.
2.Cleaned up the resources occupied by the dynamic_npugraph_batchsize
e2e test.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This is a e2e test
e2e multi-cards tests local running successfully.
- vLLM version: v0.9.2
- vLLM main:
0df4d9b06b
Signed-off-by: leo-pony <nengjunma@outlook.com>
This commit is contained in:
@@ -16,7 +16,9 @@
|
|||||||
#
|
#
|
||||||
import pytest
|
import pytest
|
||||||
import torch
|
import torch
|
||||||
from vllm import LLM, SamplingParams
|
from vllm import SamplingParams
|
||||||
|
|
||||||
|
from tests.e2e.conftest import VllmRunner
|
||||||
|
|
||||||
MODELS = [
|
MODELS = [
|
||||||
"Qwen/Qwen2.5-0.5B-Instruct",
|
"Qwen/Qwen2.5-0.5B-Instruct",
|
||||||
@@ -38,10 +40,10 @@ prompts = [
|
|||||||
def test_models(model: str, tp_size: int, max_tokens: int, temperature: int,
|
def test_models(model: str, tp_size: int, max_tokens: int, temperature: int,
|
||||||
ignore_eos: bool) -> None:
|
ignore_eos: bool) -> None:
|
||||||
# Create an LLM.
|
# Create an LLM.
|
||||||
llm = LLM(
|
with VllmRunner(
|
||||||
model=model,
|
model_name=model,
|
||||||
tensor_parallel_size=tp_size,
|
tensor_parallel_size=tp_size,
|
||||||
)
|
) as vllm_model:
|
||||||
# Prepare sampling_parames
|
# Prepare sampling_parames
|
||||||
sampling_params = SamplingParams(
|
sampling_params = SamplingParams(
|
||||||
max_tokens=max_tokens,
|
max_tokens=max_tokens,
|
||||||
@@ -51,7 +53,7 @@ def test_models(model: str, tp_size: int, max_tokens: int, temperature: int,
|
|||||||
|
|
||||||
# Generate texts from the prompts.
|
# Generate texts from the prompts.
|
||||||
# The output is a list of RequestOutput objects
|
# The output is a list of RequestOutput objects
|
||||||
outputs = llm.generate(prompts, sampling_params)
|
outputs = vllm_model.generate(prompts, sampling_params)
|
||||||
torch.npu.synchronize()
|
torch.npu.synchronize()
|
||||||
# The output length should be equal to prompts length.
|
# The output length should be equal to prompts length.
|
||||||
assert len(outputs) == len(prompts)
|
assert len(outputs) == len(prompts)
|
||||||
|
|||||||
@@ -24,9 +24,39 @@ from vllm.distributed.parallel_state import (get_world_group,
|
|||||||
init_distributed_environment)
|
init_distributed_environment)
|
||||||
from vllm.utils import update_environment_variables
|
from vllm.utils import update_environment_variables
|
||||||
|
|
||||||
|
from tests.e2e.conftest import cleanup_dist_env_and_memory
|
||||||
from vllm_ascend.distributed.device_communicators.pyhccl import \
|
from vllm_ascend.distributed.device_communicators.pyhccl import \
|
||||||
PyHcclCommunicator
|
PyHcclCommunicator
|
||||||
|
|
||||||
|
os.environ["TOKENIZERS_PARALLELISM"] = "true"
|
||||||
|
|
||||||
|
multiprocessing.set_start_method("spawn", force=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _worker_entry(env, fn):
|
||||||
|
# `multiprocessing.Process` cannot accept environment variables directly
|
||||||
|
# so we need to pass the environment variables as arguments
|
||||||
|
# and update the environment variables in the function
|
||||||
|
update_environment_variables(env)
|
||||||
|
|
||||||
|
rank = int(os.environ['RANK'])
|
||||||
|
local_rank = int(os.environ['LOCAL_RANK'])
|
||||||
|
word_size = int(os.environ['WORLD_SIZE'])
|
||||||
|
|
||||||
|
distributed_init_method = "tcp://localhost:12345"
|
||||||
|
|
||||||
|
device = torch.device(f"npu:{local_rank}")
|
||||||
|
torch.npu.set_device(device)
|
||||||
|
|
||||||
|
init_distributed_environment(
|
||||||
|
world_size=word_size,
|
||||||
|
rank=rank,
|
||||||
|
distributed_init_method=distributed_init_method,
|
||||||
|
local_rank=local_rank,
|
||||||
|
backend="hccl")
|
||||||
|
fn()
|
||||||
|
cleanup_dist_env_and_memory()
|
||||||
|
|
||||||
|
|
||||||
def distributed_run(fn, world_size):
|
def distributed_run(fn, world_size):
|
||||||
number_of_processes = world_size
|
number_of_processes = world_size
|
||||||
@@ -37,9 +67,7 @@ def distributed_run(fn, world_size):
|
|||||||
env['LOCAL_RANK'] = str(i)
|
env['LOCAL_RANK'] = str(i)
|
||||||
env['WORLD_SIZE'] = str(number_of_processes)
|
env['WORLD_SIZE'] = str(number_of_processes)
|
||||||
env['LOCAL_WORLD_SIZE'] = str(number_of_processes)
|
env['LOCAL_WORLD_SIZE'] = str(number_of_processes)
|
||||||
env['MASTER_ADDR'] = 'localhost'
|
p = multiprocessing.Process(target=_worker_entry, args=(env, fn))
|
||||||
env['MASTER_PORT'] = '12345'
|
|
||||||
p = multiprocessing.Process(target=fn, args=(env, ))
|
|
||||||
processes.append(p)
|
processes.append(p)
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
@@ -50,22 +78,6 @@ def distributed_run(fn, world_size):
|
|||||||
assert p.exitcode == 0
|
assert p.exitcode == 0
|
||||||
|
|
||||||
|
|
||||||
def worker_fn_wrapper(fn):
|
|
||||||
# `multiprocessing.Process` cannot accept environment variables directly
|
|
||||||
# so we need to pass the environment variables as arguments
|
|
||||||
# and update the environment variables in the function
|
|
||||||
def wrapped_fn(env):
|
|
||||||
update_environment_variables(env)
|
|
||||||
local_rank = os.environ['LOCAL_RANK']
|
|
||||||
device = torch.device(f"npu:{local_rank}")
|
|
||||||
torch.npu.set_device(device)
|
|
||||||
init_distributed_environment(backend="hccl")
|
|
||||||
fn()
|
|
||||||
|
|
||||||
return wrapped_fn
|
|
||||||
|
|
||||||
|
|
||||||
@worker_fn_wrapper
|
|
||||||
def worker_fn():
|
def worker_fn():
|
||||||
pynccl_comm = PyHcclCommunicator(get_world_group().cpu_group,
|
pynccl_comm = PyHcclCommunicator(get_world_group().cpu_group,
|
||||||
device=get_world_group().device)
|
device=get_world_group().device)
|
||||||
@@ -76,11 +88,10 @@ def worker_fn():
|
|||||||
assert torch.all(tensor == pynccl_comm.world_size).cpu().item()
|
assert torch.all(tensor == pynccl_comm.world_size).cpu().item()
|
||||||
|
|
||||||
|
|
||||||
# def test_pyhccl():
|
def test_pyhccl():
|
||||||
# distributed_run(worker_fn, 2)
|
distributed_run(worker_fn, 4)
|
||||||
|
|
||||||
|
|
||||||
@worker_fn_wrapper
|
|
||||||
def broadcast_worker_fn():
|
def broadcast_worker_fn():
|
||||||
# Test broadcast for every root rank.
|
# Test broadcast for every root rank.
|
||||||
# Essentially this is an all-gather operation.
|
# Essentially this is an all-gather operation.
|
||||||
@@ -106,5 +117,5 @@ def broadcast_worker_fn():
|
|||||||
assert torch.all(recv_tensors[i] == i).cpu().item()
|
assert torch.all(recv_tensors[i] == i).cpu().item()
|
||||||
|
|
||||||
|
|
||||||
# def test_pyhccl_broadcast():
|
def test_pyhccl_broadcast():
|
||||||
# distributed_run(broadcast_worker_fn, 4)
|
distributed_run(broadcast_worker_fn, 4)
|
||||||
|
|||||||
Reference in New Issue
Block a user