diff --git a/tests/e2e/singlecard/spec_decode/test_mtp_eagle_correctness.py b/tests/e2e/singlecard/spec_decode/test_mtp_eagle_correctness.py index c07ce0e8..38859138 100644 --- a/tests/e2e/singlecard/spec_decode/test_mtp_eagle_correctness.py +++ b/tests/e2e/singlecard/spec_decode/test_mtp_eagle_correctness.py @@ -23,6 +23,7 @@ from __future__ import annotations import os +from typing import Union import pytest from vllm import SamplingParams @@ -124,11 +125,11 @@ def test_deepseek_mtp_correctness(model_name: str, num_speculative_tokens: int, @pytest.mark.parametrize("method", ["eagle", "eagle3"]) @pytest.mark.parametrize("disable_padded_drafter_batch", [True, False]) @pytest.mark.parametrize("async_scheduling", [True, False]) -def test_llama_qwen3_eagle_correctness(model_name: str, model_name_main: str, - num_speculative_tokens: int, - method: str, - disable_padded_drafter_batch: bool, - async_scheduling: bool): +@pytest.mark.parametrize("draft_tensor_parallel_size", [None, 1]) +def test_llama_qwen3_eagle_correctness( + model_name: str, model_name_main: str, num_speculative_tokens: int, + method: str, disable_padded_drafter_batch: bool, + async_scheduling: bool, draft_tensor_parallel_size: Union[None, int]): example_prompts = [ "Hello, my name is", @@ -163,6 +164,8 @@ def test_llama_qwen3_eagle_correctness(model_name: str, model_name_main: str, "method": method, "model": model_name, "num_speculative_tokens": num_speculative_tokens, + "draft_tensor_parallel_size": + draft_tensor_parallel_size, "max_model_len": 128, "draft_vocab_size": 128256, }, diff --git a/tests/e2e/singlecard/spec_decode/test_v1_spec_decode.py b/tests/e2e/singlecard/spec_decode/test_v1_spec_decode.py index 9d3b2111..13e17ae4 100644 --- a/tests/e2e/singlecard/spec_decode/test_v1_spec_decode.py +++ b/tests/e2e/singlecard/spec_decode/test_v1_spec_decode.py @@ -4,7 +4,7 @@ from __future__ import annotations import math import os import random -from typing import Any +from typing import Any, Union import pytest from transformers import AutoTokenizer @@ -267,9 +267,11 @@ def test_suffix_acceptance( @pytest.mark.parametrize("use_eagle3", [True], ids=["eagle3"]) +@pytest.mark.parametrize("draft_tensor_parallel_size", [None, 1]) def test_eagle_logprobs( model_name: str, use_eagle3: bool, + draft_tensor_parallel_size: Union[None, int], ): prompt = {"role": "user", "content": "Hello world " * 10} sampling_params = SamplingParams(temperature=0, @@ -296,6 +298,7 @@ def test_eagle_logprobs( "method": "eagle3" if use_eagle3 else "eagle", "model": spec_model_name, "num_speculative_tokens": 2, + "draft_tensor_parallel_size": draft_tensor_parallel_size, "max_model_len": 128, }, max_model_len=128, @@ -321,11 +324,13 @@ def test_eagle_logprobs( @pytest.mark.parametrize("method", MODELS.keys()) @pytest.mark.parametrize("num_speculative_tokens", [3]) +@pytest.mark.parametrize("draft_tensor_parallel_size", [None, 1]) @pytest.mark.parametrize("disable_padded_drafter_batch", [True, False]) @pytest.mark.parametrize("async_scheduling", [True, False]) def test_llama_qwen_eagle_acceptance( method: str, num_speculative_tokens: int, + draft_tensor_parallel_size: Union[None, int], disable_padded_drafter_batch: bool, async_scheduling: bool, ): @@ -376,6 +381,7 @@ def test_llama_qwen_eagle_acceptance( speculative_config = { "method": method, "num_speculative_tokens": num_speculative_tokens, + "draft_tensor_parallel_size": draft_tensor_parallel_size, "disable_padded_drafter_batch": disable_padded_drafter_batch, "model": spec_model_name, } diff --git a/tests/ut/spec_decode/test_eagle_proposer.py b/tests/ut/spec_decode/test_eagle_proposer.py index b6b96a71..fda58137 100644 --- a/tests/ut/spec_decode/test_eagle_proposer.py +++ b/tests/ut/spec_decode/test_eagle_proposer.py @@ -27,6 +27,8 @@ class TestEagleProposerInitialization(TestBase): self.vllm_config.model_config.dtype = torch.float16 self.vllm_config.model_config.max_model_len = 2048 self.vllm_config.model_config.uses_mrope = False + self.vllm_config.parallel_config.tensor_parallel_size = 1 + self.vllm_config.speculative_config.draft_tensor_parallel_size = 1 self.vllm_config.speculative_config.num_speculative_tokens = 2 self.vllm_config.speculative_config.speculative_token_tree = str([ (i + 1) * (0, ) for i in range(2) @@ -115,6 +117,8 @@ class TestEagleProposerLoadModel(TestBase): self.vllm_config.model_config.dtype = torch.float16 self.vllm_config.model_config.max_model_len = 2048 self.vllm_config.model_config.uses_mrope = False + self.vllm_config.parallel_config.tensor_parallel_size = 1 + self.vllm_config.speculative_config.draft_tensor_parallel_size = 1 self.vllm_config.speculative_config.num_speculative_tokens = 2 self.vllm_config.speculative_config.speculative_token_tree = str([ (i + 1) * (0, ) for i in range(2) @@ -256,6 +260,8 @@ class TestEagleProposerDummyRun(TestBase): self.vllm_config.model_config.max_model_len = 2048 self.vllm_config.model_config.uses_mrope = False self.vllm_config.model_config.use_mla = False + self.vllm_config.parallel_config.tensor_parallel_size = 1 + self.vllm_config.speculative_config.draft_tensor_parallel_size = 1 self.vllm_config.speculative_config.speculative_token_tree = str([ (i + 1) * (0, ) for i in range(4) ]) @@ -370,6 +376,8 @@ class TestEagleProposerHelperMethods(TestBase): self.vllm_config.model_config.dtype = torch.float16 self.vllm_config.model_config.max_model_len = 2048 self.vllm_config.model_config.uses_mrope = False + self.vllm_config.parallel_config.tensor_parallel_size = 1 + self.vllm_config.speculative_config.draft_tensor_parallel_size = 1 self.vllm_config.speculative_config.num_speculative_tokens = 2 self.vllm_config.speculative_config.speculative_token_tree = str([ (i + 1) * (0, ) for i in range(2) diff --git a/tests/ut/spec_decode/test_mtp_proposer.py b/tests/ut/spec_decode/test_mtp_proposer.py index 324ae321..d2bb0533 100644 --- a/tests/ut/spec_decode/test_mtp_proposer.py +++ b/tests/ut/spec_decode/test_mtp_proposer.py @@ -42,6 +42,9 @@ class TestMtpProposer: config.model_config.max_model_len = 2048 config.model_config.uses_mrope = False config.model_config.hf_text_config = None + config.model_config.hf_config = None + config.parallel_config.tensor_parallel_size = 1 + config.speculative_config.draft_tensor_parallel_size = 1 config.load_config = None diff --git a/vllm_ascend/spec_decode/eagle_proposer.py b/vllm_ascend/spec_decode/eagle_proposer.py index d5d4afaf..7844d183 100644 --- a/vllm_ascend/spec_decode/eagle_proposer.py +++ b/vllm_ascend/spec_decode/eagle_proposer.py @@ -115,6 +115,27 @@ class EagleProposer(VllmEagleProposer): self.use_sparse = hasattr(vllm_config.model_config.hf_text_config, "index_topk") + # NOTE: + # `draft_tensor_parallel_size` does not take effect for Eagle: + # the draft model uses the same TP size as the target model in practice. + # so we applied this patch to set tp=1 of draft model separately. + # Due to verification of `_verify_and_get_draft_tp` in vllm, + # the value of `draft_tensor_parallel_size` here will either be 1 separately + # or the same as target model. + # TODO(zhaomingyu13): If we want to adapt to the case where draft model tp + # is not 1 and differs from target model, this part should be rewritten. + if (vllm_config.parallel_config.tensor_parallel_size + != self.speculative_config.draft_tensor_parallel_size): + tp_group = init_model_parallel_group( + [[get_world_group().rank]], + get_world_group().rank, + torch.distributed.get_backend(get_world_group().device_group), + use_message_queue_broadcaster=True, + group_name="tp", + ) + self.tp_group_context = patch_tensor_parallel_group(tp_group) + else: + self.tp_group_context = nullcontext() self.use_cuda_graph = (self.runner._use_aclgraph() and not self.speculative_config.enforce_eager diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index f0076c76..135b9a9d 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -170,6 +170,10 @@ def graph_capture(device: torch.device): yield graph_capture_context +def get_tp_context(drafter): + return getattr(drafter, "tp_group_context", nullcontext()) + + class ExecuteModelState(NamedTuple): """Ephemeral cached state transferred between execute_model() and sample_tokens(), after execute_model() returns None.""" @@ -2339,7 +2343,8 @@ class NPUModelRunner(GPUModelRunner): model_register(self.model, self.model_config) if self.drafter: logger.info("Loading drafter model...") - self.drafter.load_model(self.model) + with get_tp_context(self.drafter): + self.drafter.load_model(self.model) if self.use_aux_hidden_state_outputs: self.model.set_aux_hidden_state_layers( self.model.get_eagle3_aux_hidden_state_layers()) @@ -2715,11 +2720,15 @@ class NPUModelRunner(GPUModelRunner): kernel_block_sizes = [] for kv_cache_group_id, kv_cache_group in enumerate( kv_cache_config.kv_cache_groups): - - if isinstance(kv_cache_group.kv_cache_spec, - EncoderOnlyAttentionSpec): + kv_cache_spec = kv_cache_group.kv_cache_spec + if isinstance(kv_cache_spec, UniformTypeKVCacheSpecs): + # All layers in the UniformTypeKVCacheSpecs have the same type, + # Pick an arbitrary one to dispatch. + kv_cache_spec = next( + iter(kv_cache_spec.kv_cache_specs.values())) + if isinstance(kv_cache_spec, EncoderOnlyAttentionSpec): continue - elif isinstance(kv_cache_group.kv_cache_spec, AttentionSpec): + elif isinstance(kv_cache_spec, AttentionSpec): # This is an attention backend that supports virtual # block splitting. Get the supported block sizes from # the backend.