[CI] fix port conflicts (#5789)
This commit is contained in:
@@ -129,7 +129,7 @@ def init_process_hf(
|
||||
hf_instruct_params = []
|
||||
hf_base_params = []
|
||||
|
||||
print("get parameter in hf instruct model and base model")
|
||||
print("[hf] get parameter in hf instruct model and base model")
|
||||
for parameter_name in checking_parameters:
|
||||
hf_instruct_params.append(
|
||||
hf_instruct_model.get_parameter(parameter_name)[:truncate_size]
|
||||
@@ -152,10 +152,12 @@ def init_process_hf(
|
||||
param_queue.put(("hf_base_params", hf_base_params))
|
||||
|
||||
# Init weight update group for rank 0 (the training engine in RLHF).
|
||||
print(f"rank {rank} world_size: {world_size} init custom process group")
|
||||
port = 60000 + int(os.environ.get("CUDA_VISIBLE_DEVICES", "0")[0]) * 100
|
||||
init_method = f"tcp://localhost:{port}"
|
||||
print(f"[hf] {rank=} {world_size=} init custom process group. {init_method=}")
|
||||
group = init_custom_process_group(
|
||||
backend="nccl",
|
||||
init_method="tcp://localhost:65500",
|
||||
init_method=init_method,
|
||||
world_size=world_size,
|
||||
rank=rank,
|
||||
group_name="test_parameter_update_group",
|
||||
@@ -184,7 +186,7 @@ def init_process_hf(
|
||||
|
||||
# Measure the latency of broadcasting/weights update.
|
||||
broadcast_time = time_end_broadcast - time_begin_broadcast
|
||||
print(f"rank {rank} broadcast parameter time: {broadcast_time:.3f}s")
|
||||
print(f"[hf] {rank=} {broadcast_time=:.3f}s")
|
||||
param_queue.put(("broadcast_time", broadcast_time))
|
||||
|
||||
# Delete the huggingface models to free up memory.
|
||||
@@ -210,17 +212,21 @@ def init_process_sgl(
|
||||
torch.cuda.synchronize()
|
||||
base_gpu_id = 1 if rank == 1 else 1 + tp_size
|
||||
if backend == "Engine":
|
||||
print(f"[sgl] rank {rank} init engine")
|
||||
engine = sgl.Engine(
|
||||
model_path=model_name,
|
||||
random_seed=42,
|
||||
base_gpu_id=base_gpu_id,
|
||||
tp_size=tp_size,
|
||||
cuda_graph_max_bs=2,
|
||||
)
|
||||
else:
|
||||
if rank == 1:
|
||||
url = DEFAULT_URL_FOR_TEST
|
||||
else:
|
||||
url = DEFAULT_URL_FOR_TEST.replace("2157", "2159")
|
||||
host, port = DEFAULT_URL_FOR_TEST.split(":")
|
||||
url = ":".join(host, str(int(port) + 10000))
|
||||
|
||||
print(f"[sgl] rank {rank} init server on url: {url}")
|
||||
process = popen_launch_server(
|
||||
model_name,
|
||||
url,
|
||||
@@ -230,13 +236,11 @@ def init_process_sgl(
|
||||
str(base_gpu_id),
|
||||
"--tp-size",
|
||||
str(tp_size),
|
||||
"--cuda-graph-max-bs",
|
||||
2,
|
||||
),
|
||||
)
|
||||
torch.cuda.synchronize()
|
||||
if backend == "Engine":
|
||||
print(f"rank {rank} init engine")
|
||||
else:
|
||||
print(f"rank {rank} init server on url: {url}")
|
||||
|
||||
# Get weights of instruct model, i.e. pre-training weights.
|
||||
instruct_params = []
|
||||
@@ -252,11 +256,13 @@ def init_process_sgl(
|
||||
|
||||
param_queue.put((f"sgl_dp_{rank}_instruct_params", instruct_params))
|
||||
|
||||
port = 60000 + int(os.environ.get("CUDA_VISIBLE_DEVICES", "0")[0]) * 100
|
||||
|
||||
# Init weight update group with the training engine.
|
||||
if backend == "Engine":
|
||||
engine.init_weights_update_group(
|
||||
master_address="localhost",
|
||||
master_port="65500",
|
||||
master_port=str(port),
|
||||
rank_offset=base_gpu_id,
|
||||
world_size=world_size,
|
||||
group_name="test_parameter_update_group",
|
||||
@@ -267,7 +273,7 @@ def init_process_sgl(
|
||||
f"{url}/init_weights_update_group",
|
||||
json={
|
||||
"master_address": "localhost",
|
||||
"master_port": "65500",
|
||||
"master_port": str(port),
|
||||
"rank_offset": base_gpu_id,
|
||||
"world_size": world_size,
|
||||
"group_name": "test_parameter_update_group",
|
||||
@@ -311,7 +317,7 @@ def init_process_sgl(
|
||||
# Measure the latency of broadcast/weights update.
|
||||
update_time = time_end_update - time_begin_update
|
||||
print(
|
||||
f"fully update model_name {model_name} rank {rank} parameter from distributed time: {update_time:.3f}s"
|
||||
f"[sgl] fully update model_name {model_name} rank {rank} parameter from distributed time: {update_time:.3f}s"
|
||||
)
|
||||
param_queue.put((f"update_sgl_dp_{rank}_time", update_time))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user