[UT] refactor test_expert_load_balancer and fix broken CI (#1293)

refactor test_expert_load_balancer to keep the ut code style

This PR also fixed the break change from
https://github.com/vllm-project/vllm/pull/16188/files#diff-e2942ece30a5c580437694ffb964bfc664b510c59244c08e5921b8f5cefb4280

This is just a quick fix. We'll support embedding on V1 later

Closes: https://github.com/vllm-project/vllm-ascend/issues/1299

Signed-off-by: wangxiyuan <wangxiyuan1007@gmail.com>
This commit is contained in:
wangxiyuan
2025-06-20 01:02:52 +08:00
committed by GitHub
parent ebb2a70dbb
commit b350edae9a
4 changed files with 205 additions and 140 deletions

View File

@@ -98,11 +98,7 @@ def create_scheduler(
)
kv_cache_config = KVCacheConfig(
num_blocks=num_blocks, # A large number of blocks to hold all requests
**({
"tensors": {}
} if vllm_version_is("0.9.0") else {
"kv_cache_tensors": []
}),
kv_cache_tensors=[],
kv_cache_groups=[
KVCacheGroupSpec(['layer'],
FullAttentionSpec(block_size, 1, 1, torch.float32,
@@ -145,8 +141,8 @@ def create_requests(num_requests: int,
multi_modal_hashes=None,
eos_token_id=EOS_TOKEN_ID,
**({
"arrival_time": 0.0
} if vllm_version_is("0.9.0") else {}),
"pooling_params": None
} if not vllm_version_is("0.9.1") else {}),
)
requests.append(request)
return requests
@@ -262,7 +258,9 @@ def test_schedule_concurrent_partial_requests(enable_prefix_caching: bool):
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(output, model_runner_output)
# Schedule the next step. All three requests are running.
@@ -286,7 +284,10 @@ def test_schedule_concurrent_partial_requests(enable_prefix_caching: bool):
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(output1, model_runner_output)
output2 = scheduler.schedule()
assert len(scheduler.running) == 3
@@ -337,7 +338,10 @@ def test_stop_via_update_from_output():
11]], # First request hits EOS, second continues
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -385,7 +389,10 @@ def test_stop_via_update_from_output():
[13, 14]], # First request hits stop token
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -432,7 +439,10 @@ def test_stop_via_update_from_output():
[13]], # First request exceeds max_tokens
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -474,7 +484,10 @@ def test_stop_via_update_from_output():
sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]],
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -524,7 +537,10 @@ def test_schedule_concurrent_batches(enable_prefix_caching: Optional[bool],
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output0, model_runner_output)
# Schedule the next step.
@@ -541,7 +557,10 @@ def test_schedule_concurrent_batches(enable_prefix_caching: Optional[bool],
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output1, model_runner_output)
@@ -565,8 +584,6 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected):
1. Speculated tokens get scheduled correctly
2. Spec decoding stats properly count number of draft and accepted tokens
"""
if vllm_version_is("0.9.0"):
return
num_spec_tokens = max(1, max(len(t) for t in spec_tokens))
scheduler = create_scheduler(num_speculative_tokens=num_spec_tokens)
requests = create_requests(num_requests=len(spec_tokens), num_tokens=1)
@@ -593,7 +610,10 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected):
spec_token_ids=spec_tokens,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
engine_core_outputs = scheduler.update_from_output(output,
model_runner_output)
@@ -632,7 +652,10 @@ def test_schedule_spec_decoding_stats(spec_tokens, output_tokens, expected):
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
engine_core_outputs = scheduler.update_from_output(output,
model_runner_output)
@@ -727,7 +750,9 @@ def make_output(scheduler: AscendScheduler):
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={},
)
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
def assert_scheduler_empty(scheduler: AscendScheduler):
@@ -744,11 +769,10 @@ def assert_scheduler_empty(scheduler: AscendScheduler):
assert len(scheduler.encoder_cache_manager.cached) == 0
# KVCache Manager.
if not vllm_version_is("0.9.0"):
assert len(scheduler.kv_cache_manager.coordinator.
single_type_managers[0].req_to_blocks) == 0
assert len(scheduler.kv_cache_manager.coordinator.
single_type_managers[0].num_cached_block) == 0
assert len(scheduler.kv_cache_manager.coordinator.single_type_managers[0].
req_to_blocks) == 0
assert len(scheduler.kv_cache_manager.coordinator.single_type_managers[0].
num_cached_block) == 0
assert len(scheduler.kv_cache_manager.req_to_block_hashes) == 0
num_free_blocks = (
scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks)
@@ -789,4 +813,4 @@ def test_memory_leak():
scheduler.update_from_output(scheduler_output, model_runner_output)
# Confirm no memory leak.
assert_scheduler_empty(scheduler)
assert_scheduler_empty(scheduler)

View File

@@ -31,6 +31,7 @@ from vllm.v1.request import Request, RequestStatus
from vllm.v1.structured_output import StructuredOutputManager
from vllm_ascend.core.scheduler import AscendScheduler
from vllm_ascend.utils import vllm_version_is
EOS_TOKEN_ID = 50256
@@ -130,6 +131,9 @@ def create_requests(num_requests: int,
multi_modal_placeholders=mm_position,
multi_modal_hashes=None,
eos_token_id=EOS_TOKEN_ID,
**({
"pooling_params": None
} if not vllm_version_is("0.9.1") else {}),
)
requests.append(request)
return requests
@@ -237,7 +241,10 @@ def test_stop_via_update_from_output():
11]], # First request hits EOS, second continues
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -283,7 +290,10 @@ def test_stop_via_update_from_output():
[13, 14]], # First request hits stop token
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -328,7 +338,10 @@ def test_stop_via_update_from_output():
[13]], # First request exceeds max_tokens
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)
@@ -369,7 +382,10 @@ def test_stop_via_update_from_output():
sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]],
spec_token_ids=None,
logprobs=None,
prompt_logprobs_dict={})
prompt_logprobs_dict={},
**({
"pooler_output": []
} if not vllm_version_is("0.9.1") else {}))
scheduler.update_from_output(scheduler_output, model_output)

View File

@@ -3,9 +3,10 @@
import vllm_ascend.patch.worker.patch_common.patch_utils # type: ignore[import] # isort: skip # noqa
import json
import unittest
from typing import List, TypedDict
from unittest import mock
import pytest
import torch
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
@@ -46,101 +47,101 @@ MOCK_DATA: MockData = {
}
@pytest.fixture
def mock_expert_load_balancer(tmp_path):
json_file = tmp_path / "expert_map.json"
with open(json_file, 'w') as f:
json.dump(MOCK_DATA, f)
class TestExpertLoadBalancer(unittest.TestCase):
return ExpertLoadBalancer(str(json_file), global_expert_num=8)
def setUp(self):
json_file = "expert_map.json"
with open(json_file, 'w') as f:
json.dump(MOCK_DATA, f)
self.expert_load_balancer = ExpertLoadBalancer(json_file,
global_expert_num=8)
def test_init(mock_expert_load_balancer):
assert isinstance(mock_expert_load_balancer.expert_map_tensor,
torch.Tensor)
assert mock_expert_load_balancer.layers_num == MOCK_DATA["moe_layer_count"]
assert mock_expert_load_balancer.ranks_num == MOCK_DATA["layer_list"][0][
"device_count"]
def test_init(self):
self.assertIsInstance(self.expert_load_balancer.expert_map_tensor,
torch.Tensor)
self.assertEqual(self.expert_load_balancer.layers_num,
MOCK_DATA["moe_layer_count"])
self.assertEqual(self.expert_load_balancer.ranks_num,
MOCK_DATA["layer_list"][0]["device_count"])
def test_generate_index_dicts(mock_expert_load_balancer):
tensor_2d = torch.tensor([[7, 2, 0, 3, 5], [6, 1, 4, 7, 2]])
result = mock_expert_load_balancer.generate_index_dicts(tensor_2d)
expected_result = [{
7: 0,
2: 1,
0: 2,
3: 3,
5: 4
}, {
6: 5,
1: 6,
4: 7,
7: 8,
2: 9
}]
assert result == expected_result
def test_generate_index_dicts(self):
tensor_2d = torch.tensor([[7, 2, 0, 3, 5], [6, 1, 4, 7, 2]])
result = self.expert_load_balancer.generate_index_dicts(tensor_2d)
expected_result = [{
7: 0,
2: 1,
0: 2,
3: 3,
5: 4
}, {
6: 5,
1: 6,
4: 7,
7: 8,
2: 9
}]
self.assertEqual(result, expected_result)
def test_generate_expert_placement_map(self):
expert_placement_map = self.expert_load_balancer.generate_expert_placement_map(
)
self.assertEqual(expert_placement_map.shape,
(self.expert_load_balancer.layers_num,
self.expert_load_balancer.ranks_num, 8))
self.assertTrue(torch.all(expert_placement_map >= -1))
def test_generate_expert_placement_map(mock_expert_load_balancer):
expert_placement_map = mock_expert_load_balancer.generate_expert_placement_map(
)
assert expert_placement_map.shape == (mock_expert_load_balancer.layers_num,
mock_expert_load_balancer.ranks_num,
8)
assert torch.all(expert_placement_map >= -1)
def test_generate_log2phy_expert_map(self):
layer_id = 0
log2phy_map = self.expert_load_balancer.generate_log2phy_expert_map(
layer_id)
self.assertEqual(log2phy_map.shape,
(self.expert_load_balancer.ranks_num, 8))
self.assertTrue(torch.all(log2phy_map >= -1))
@mock.patch("torch_npu.npu._lazy_init")
@mock.patch("torch.npu.current_device", return_value="cpu")
def test_get_rank_placement_map(self, mock_current_device, mock_lazy_init):
layer_id = 0
rank_id = 0
rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map(
layer_id, rank_id)
self.assertEqual(rank_local_expert_num, 5)
expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0],
dtype=torch.int32).to(
rank_expert_map.device)
self.assertTrue(rank_expert_map.equal(expected_tensor))
def test_generate_log2phy_expert_map(mock_expert_load_balancer):
layer_id = 0
log2phy_map = mock_expert_load_balancer.generate_log2phy_expert_map(
layer_id)
assert log2phy_map.shape == (mock_expert_load_balancer.ranks_num, 8)
assert torch.all(log2phy_map >= -1)
rank_id = 1
rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map(
layer_id, rank_id)
expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3],
dtype=torch.int32).to(
rank_expert_map.device)
self.assertTrue(rank_expert_map.equal(expected_tensor))
def test_get_rank_log2phy_map(self):
layer_id = 0
rank_id = 0
log2phy_map = self.expert_load_balancer.get_rank_log2phy_map(
layer_id, rank_id)
expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0],
dtype=torch.int32).to(
log2phy_map.device)
self.assertTrue(log2phy_map.equal(expected_tensor))
def test_get_rank_placement_map(mock_expert_load_balancer, mocker):
mocker.patch("torch_npu.npu._lazy_init")
mocker.patch('torch.npu.current_device', return_value='cpu')
layer_id = 0
rank_id = 0
rank_local_expert_num, rank_expert_map = mock_expert_load_balancer.get_rank_placement_map(
layer_id, rank_id)
assert rank_local_expert_num == 5
expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0],
dtype=torch.int32).to(
rank_expert_map.device)
assert rank_expert_map.equal(expected_tensor)
rank_id = 1
log2phy_map = self.expert_load_balancer.get_rank_log2phy_map(
layer_id, rank_id)
expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8],
dtype=torch.int32).to(
log2phy_map.device)
self.assertTrue(log2phy_map.equal(expected_tensor))
rank_id = 1
rank_local_expert_num, rank_expert_map = mock_expert_load_balancer.get_rank_placement_map(
layer_id, rank_id)
expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3],
dtype=torch.int32).to(
rank_expert_map.device)
assert rank_expert_map.equal(expected_tensor)
def test_get_rank_log2phy_map(mock_expert_load_balancer):
layer_id = 0
rank_id = 0
log2phy_map = mock_expert_load_balancer.get_rank_log2phy_map(
layer_id, rank_id)
expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0],
dtype=torch.int32).to(log2phy_map.device)
assert log2phy_map.equal(expected_tensor)
rank_id = 1
log2phy_map = mock_expert_load_balancer.get_rank_log2phy_map(
layer_id, rank_id)
expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8],
dtype=torch.int32).to(log2phy_map.device)
assert log2phy_map.equal(expected_tensor)
def test_get_global_redundant_expert_num(mock_expert_load_balancer):
redundant_expert_num = mock_expert_load_balancer.get_global_redundant_expert_num(
)
expected_redundant_expert_num = len(MOCK_DATA["layer_list"][0]["device_list"][0]["device_expert"]) * \
MOCK_DATA["layer_list"][0]["device_count"] - 8
assert redundant_expert_num == expected_redundant_expert_num
def test_get_global_redundant_expert_num(self):
redundant_expert_num = self.expert_load_balancer.get_global_redundant_expert_num(
)
expected_redundant_expert_num = len(MOCK_DATA["layer_list"][0]["device_list"][0]["device_expert"]) * \
MOCK_DATA["layer_list"][0]["device_count"] - 8
self.assertEqual(redundant_expert_num, expected_redundant_expert_num)

View File

@@ -74,7 +74,7 @@ from vllm_ascend.attention.attention_v1 import AscendAttentionState
from vllm_ascend.attention.mla_v1 import CommonAttentionMetadata
from vllm_ascend.platform import NPUPlatform
from vllm_ascend.sample.rejection_sampler import AscendRejectionSampler
from vllm_ascend.utils import ProfileExecuteDuration
from vllm_ascend.utils import ProfileExecuteDuration, vllm_version_is
from vllm_ascend.worker.mtp_proposer_v1 import MtpProposer
if TYPE_CHECKING:
@@ -420,19 +420,33 @@ class NPUModelRunner(LoRAModelRunnerMixin):
generator.manual_seed(sampling_params.seed)
else:
generator = None
self.requests[req_id] = CachedRequestState(
req_id=req_id,
prompt_token_ids=new_req_data.prompt_token_ids,
mm_inputs=new_req_data.mm_inputs,
mm_positions=new_req_data.mm_positions,
sampling_params=sampling_params,
generator=generator,
block_ids=new_req_data.block_ids,
num_computed_tokens=new_req_data.num_computed_tokens,
output_token_ids=[],
lora_request=new_req_data.lora_request,
)
if vllm_version_is("0.9.1"):
self.requests[req_id] = CachedRequestState(
req_id=req_id,
prompt_token_ids=new_req_data.prompt_token_ids,
mm_inputs=new_req_data.mm_inputs,
mm_positions=new_req_data.mm_positions,
sampling_params=sampling_params,
generator=generator,
block_ids=new_req_data.block_ids,
num_computed_tokens=new_req_data.num_computed_tokens,
output_token_ids=[],
lora_request=new_req_data.lora_request,
)
else:
self.requests[req_id] = CachedRequestState(
req_id=req_id,
prompt_token_ids=new_req_data.prompt_token_ids,
mm_inputs=new_req_data.mm_inputs,
mm_positions=new_req_data.mm_positions,
sampling_params=sampling_params,
pooling_params=None,
generator=generator,
block_ids=new_req_data.block_ids,
num_computed_tokens=new_req_data.num_computed_tokens,
output_token_ids=[],
lora_request=new_req_data.lora_request,
)
# Only relevant for models using M-RoPE (e.g, Qwen2-VL)
if self.uses_mrope:
@@ -1305,15 +1319,25 @@ class NPUModelRunner(LoRAModelRunnerMixin):
hidden_states,
attn_metadata,
)
model_runner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
spec_token_ids=spec_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict={},
)
if vllm_version_is("0.9.1"):
model_runner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
spec_token_ids=spec_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict={},
)
else:
model_runner_output = ModelRunnerOutput(
req_ids=self.input_batch.req_ids,
req_id_to_index=self.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
spec_token_ids=spec_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict={},
pooler_output=[],
)
durations = ProfileExecuteDuration().pop_captured_sync()
if durations: