There is a lot hack code for v0.11.0, which makes the code hard to
upgrade to newer vLLM version. Since v0.11.0 will release soon. Let's
drop v0.11.0 support first. Then we'll upgrade to v0.11.2 soon.
- vLLM version: v0.11.0
- vLLM main:
2918c1b49c
Signed-off-by: wangxiyuan <wangxiyuan1007@gmail.com>
258 lines
8.4 KiB
Python
258 lines
8.4 KiB
Python
#
|
|
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
|
|
# Copyright 2023 The vLLM team.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# This file is a part of the vllm-ascend project.
|
|
# Adapted from vllm-project/vllm/examples/offline_inference/data_parallel.py
|
|
#
|
|
"""
|
|
Usage:
|
|
Single node:
|
|
Dense models:
|
|
python examples/offline_data_parallel.py \
|
|
--model="Qwen/Qwen2.5-0.5B-Instruct" \
|
|
--dp-size=2 \
|
|
--tp-size=2
|
|
MOE models:
|
|
python examples/offline_data_parallel.py \
|
|
--model="ibm-research/PowerMoE-3b" \
|
|
--dp-size=2 \
|
|
--tp-size=2 \
|
|
--enable-expert-parallel
|
|
|
|
Multi-node:
|
|
Node 0 (assume the node has ip of 10.99.48.128):
|
|
python examples/offline_data_parallel.py \
|
|
--model="ibm-research/PowerMoE-3b" \
|
|
--dp-size=2 \
|
|
--tp-size=2 \
|
|
--node-size=2 \
|
|
--node-rank=0 \
|
|
--enable-expert-parallel \
|
|
--master-addr=10.99.48.128 \
|
|
--master-port=13345
|
|
Node 1:
|
|
python examples/offline_data_parallel.py \
|
|
--model="ibm-research/PowerMoE-3b" \
|
|
--dp-size=2 \
|
|
--tp-size=2 \
|
|
--node-size=2 \
|
|
--node-rank=1 \
|
|
--enable-expert-parallel \
|
|
--master-addr=10.99.48.128 \
|
|
--master-port=13345
|
|
"""
|
|
|
|
import contextlib
|
|
import gc
|
|
import os
|
|
from time import sleep
|
|
|
|
import torch
|
|
from vllm import LLM, SamplingParams
|
|
from vllm.distributed.parallel_state import ( # noqa E402
|
|
destroy_distributed_environment, destroy_model_parallel)
|
|
from vllm.utils.network_utils import get_open_port
|
|
|
|
os.environ["VLLM_USE_MODELSCOPE"] = "True"
|
|
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
|
|
|
|
def parse_args():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Data Parallel Inference")
|
|
parser.add_argument(
|
|
"--model",
|
|
type=str,
|
|
default="ibm-research/PowerMoE-3b",
|
|
help="Model name or path",
|
|
)
|
|
parser.add_argument("--dp-size",
|
|
type=int,
|
|
default=2,
|
|
help="Data parallel size")
|
|
parser.add_argument("--tp-size",
|
|
type=int,
|
|
default=1,
|
|
help="Tensor parallel size")
|
|
parser.add_argument("--node-size",
|
|
type=int,
|
|
default=1,
|
|
help="Total number of nodes")
|
|
parser.add_argument("--node-rank",
|
|
type=int,
|
|
default=0,
|
|
help="Rank of the current node")
|
|
parser.add_argument("--master-addr",
|
|
type=str,
|
|
default="",
|
|
help="Master node IP address")
|
|
parser.add_argument("--master-port",
|
|
type=int,
|
|
default=0,
|
|
help="Master node port")
|
|
parser.add_argument("--enforce-eager",
|
|
action="store_true",
|
|
help="Enforce eager mode execution.")
|
|
parser.add_argument("--trust-remote-code",
|
|
action="store_true",
|
|
help="Trust remote code.")
|
|
parser.add_argument("--enable-expert-parallel",
|
|
action="store_true",
|
|
help="Enable expert parallel, used in MOE models.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def cleanup_env_and_memory():
|
|
destroy_model_parallel()
|
|
destroy_distributed_environment()
|
|
with contextlib.suppress(AssertionError):
|
|
torch.distributed.destroy_process_group()
|
|
gc.collect()
|
|
torch.npu.empty_cache()
|
|
torch.npu.reset_peak_memory_stats()
|
|
|
|
def main(
|
|
model,
|
|
dp_size,
|
|
local_dp_rank,
|
|
global_dp_rank,
|
|
dp_master_ip,
|
|
dp_master_port,
|
|
GPUs_per_dp_rank,
|
|
enable_expert_parallel,
|
|
enforce_eager,
|
|
trust_remote_code,
|
|
):
|
|
# DP only support on V1 engine
|
|
os.environ["VLLM_DP_RANK"] = str(global_dp_rank)
|
|
os.environ["VLLM_DP_RANK_LOCAL"] = str(local_dp_rank)
|
|
os.environ["VLLM_DP_SIZE"] = str(dp_size)
|
|
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
|
|
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)
|
|
|
|
# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
|
|
# engine processes.
|
|
|
|
# Sample prompts.
|
|
prompts = [
|
|
"Hello, my name is",
|
|
"The president of the United States is",
|
|
"The capital of France is",
|
|
"The future of AI is",
|
|
] * 100
|
|
|
|
# with DP, each rank should process different prompts.
|
|
# usually all the DP ranks process a full dataset,
|
|
# and each rank processes a different part of the dataset.
|
|
floor = len(prompts) // dp_size
|
|
remainder = len(prompts) % dp_size
|
|
|
|
# Distribute prompts into even groups.
|
|
def start(rank):
|
|
return rank * floor + min(rank, remainder)
|
|
|
|
prompts = prompts[start(global_dp_rank):start(global_dp_rank + 1)]
|
|
if len(prompts) == 0:
|
|
# if any rank has no prompts to process,
|
|
# we need to set a placeholder prompt
|
|
prompts = ["Placeholder"]
|
|
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")
|
|
|
|
# Create a sampling params object.
|
|
# since we are doing data parallel, every rank can have different
|
|
# sampling params. here we set different max_tokens for different
|
|
# ranks for demonstration.
|
|
sampling_params = SamplingParams(temperature=0.8,
|
|
top_p=0.95,
|
|
max_tokens=[16, 20][global_dp_rank % 2])
|
|
|
|
# Create an LLM.
|
|
llm = LLM(
|
|
model=model,
|
|
tensor_parallel_size=GPUs_per_dp_rank,
|
|
enforce_eager=enforce_eager,
|
|
enable_expert_parallel=enable_expert_parallel,
|
|
trust_remote_code=trust_remote_code,
|
|
)
|
|
outputs = llm.generate(prompts, sampling_params)
|
|
# Print the outputs.
|
|
for i, output in enumerate(outputs):
|
|
if i >= 5:
|
|
# print only 5 outputs
|
|
break
|
|
prompt = output.prompt
|
|
generated_text = output.outputs[0].text
|
|
print(f"DP rank {global_dp_rank}, Prompt: {prompt!r}, "
|
|
f"Generated text: {generated_text!r}")
|
|
|
|
# Give engines time to pause their processing loops before exiting.
|
|
sleep(5)
|
|
del llm
|
|
cleanup_env_and_memory()
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
|
|
dp_size = args.dp_size
|
|
tp_size = args.tp_size
|
|
node_size = args.node_size
|
|
node_rank = args.node_rank
|
|
|
|
if node_size == 1:
|
|
dp_master_ip = "127.0.0.1"
|
|
dp_master_port = get_open_port()
|
|
else:
|
|
dp_master_ip = args.master_addr
|
|
dp_master_port = args.master_port
|
|
|
|
assert dp_size % node_size == 0, "dp_size should be divisible by node_size"
|
|
dp_per_node = dp_size // node_size
|
|
|
|
from multiprocessing import Process
|
|
|
|
procs = []
|
|
for local_dp_rank, global_dp_rank in enumerate(
|
|
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)):
|
|
proc = Process(
|
|
target=main,
|
|
args=(
|
|
args.model,
|
|
dp_size,
|
|
local_dp_rank,
|
|
global_dp_rank,
|
|
dp_master_ip,
|
|
dp_master_port,
|
|
tp_size,
|
|
args.enable_expert_parallel,
|
|
args.enforce_eager,
|
|
args.trust_remote_code,
|
|
),
|
|
)
|
|
proc.start()
|
|
procs.append(proc)
|
|
exit_code = 0
|
|
for proc in procs:
|
|
proc.join(timeout=900)
|
|
if proc.exitcode is None:
|
|
print(
|
|
f"Killing process {proc.pid} that didn't stop within 15 minutes."
|
|
)
|
|
proc.kill()
|
|
exit_code = 1
|
|
elif proc.exitcode:
|
|
exit_code = proc.exitcode
|
|
|
|
exit(exit_code)
|