### What this PR does / why we need it? avoid mrope fusion op when running qwen25vl on x86 machine --------- Signed-off-by: 李少鹏 <lishaopeng21@huawei.com>
470 lines
20 KiB
Python
470 lines
20 KiB
Python
import math
|
|
import unittest
|
|
from unittest.mock import MagicMock, PropertyMock, patch
|
|
|
|
import torch
|
|
from transformers.configuration_utils import PretrainedConfig
|
|
from vllm.config import ModelConfig, VllmConfig
|
|
from vllm.model_executor.layers.rotary_embedding import (
|
|
DeepseekScalingRotaryEmbedding, MRotaryEmbedding, RotaryEmbedding)
|
|
from vllm.platforms import CpuArchEnum
|
|
|
|
from tests.ut.base import TestBase
|
|
from vllm_ascend.ascend_forward_context import set_ascend_forward_context
|
|
from vllm_ascend.ops.rotary_embedding import _custom_rotary_embedding_enabled
|
|
|
|
MODEL = "Qwen3-0.6B"
|
|
MODEL_VL = "Qwen/Qwen2.5-VL-3B-Instruct"
|
|
MAX_NUM_BATCHED_TOKEND = 10000
|
|
|
|
|
|
class TestCustomRotaryEmbeddingEnabled(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# Common setup for tests
|
|
self.positions = torch.tensor([1, 2, 3])
|
|
self.query = torch.randn(3, 4, dtype=torch.float16)
|
|
self.key = torch.randn(3, 4, dtype=torch.float16)
|
|
self.head_size = 32
|
|
self.cos_sin_cache = torch.randn(3, 4)
|
|
|
|
# Mock self object for rope_forward_oot
|
|
self.mock_self = MagicMock()
|
|
self.mock_self.head_size = self.head_size
|
|
self.mock_self.cos_sin_cache = self.cos_sin_cache
|
|
self.mock_self.is_neox_style = True
|
|
self.mock_self.forward_native.return_value = (self.query, self.key)
|
|
|
|
def test_custom_rotary_embedding_enabled(self):
|
|
# Test when all conditions are True
|
|
with patch('vllm_ascend.ops.rotary_embedding.enable_custom_op',
|
|
return_value=True):
|
|
result = _custom_rotary_embedding_enabled(self.query, True,
|
|
self.head_size)
|
|
self.assertTrue(result)
|
|
|
|
# Test when dtype is not float16
|
|
with patch('vllm_ascend.ops.rotary_embedding.enable_custom_op',
|
|
return_value=True):
|
|
query = self.query.to(torch.float32)
|
|
result = _custom_rotary_embedding_enabled(query, True,
|
|
self.head_size)
|
|
self.assertFalse(result)
|
|
|
|
# Test when neox_style is False
|
|
with patch('vllm_ascend.ops.rotary_embedding.enable_custom_op',
|
|
return_value=True):
|
|
result = _custom_rotary_embedding_enabled(self.query, False,
|
|
self.head_size)
|
|
self.assertFalse(result)
|
|
|
|
# Test when head_size is not divisible by 32
|
|
with patch('vllm_ascend.ops.rotary_embedding.enable_custom_op',
|
|
return_value=True):
|
|
result = _custom_rotary_embedding_enabled(self.query, True,
|
|
self.head_size + 1)
|
|
self.assertFalse(result)
|
|
|
|
# Test when custom op is disabled
|
|
with patch('vllm_ascend.ops.rotary_embedding.enable_custom_op',
|
|
return_value=False):
|
|
result = _custom_rotary_embedding_enabled(self.query, True,
|
|
self.head_size)
|
|
self.assertFalse(result)
|
|
|
|
|
|
class TestAscendRotaryEmbedding(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# Common setup for tests
|
|
self.positions = torch.tensor([1, 2, 3])
|
|
self.query = torch.randn(3, 1, 32, dtype=torch.float16)
|
|
self.key = torch.randn(3, 1, 32, dtype=torch.float16)
|
|
self.head_size = 32
|
|
self.rotary_dim = self.head_size
|
|
self.max_position = 16
|
|
self.rope_theta = 10000
|
|
self.is_neox_style = True
|
|
self.cos_sin_cache = torch.randn(3, 1, 32)
|
|
self.layer = RotaryEmbedding(self.head_size, self.rotary_dim,
|
|
self.max_position, self.rope_theta,
|
|
self.is_neox_style, torch.float16)
|
|
|
|
# Mock self object for rope_forward_oot
|
|
self.mock_self = MagicMock()
|
|
self.mock_self.head_size = self.head_size
|
|
self.mock_self.cos_sin_cache = self.cos_sin_cache
|
|
self.mock_self.is_neox_style = self.is_neox_style
|
|
|
|
@patch('torch.ops._C_ascend')
|
|
@patch('vllm_ascend.ops.rotary_embedding.is_310p', return_value=False)
|
|
@patch('vllm_ascend.ops.rotary_embedding._custom_rotary_embedding_enabled',
|
|
return_value=True)
|
|
@patch('torch.ops._npu_rotary_embedding')
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_rope_forward_oot_custom_kernel(self, mock_rotary_embedding,
|
|
mock_custom_enabled, mock_is_310p,
|
|
mock__c):
|
|
mock_config = MagicMock()
|
|
mock_config.torchair_graph_config.enabled = False
|
|
|
|
# Setup mock for custom kernel path
|
|
|
|
mock__c.rotary_embedding.return_value = self.query, self.key
|
|
vllm_config = VllmConfig()
|
|
model_config = ModelConfig(MODEL,
|
|
tokenizer=MODEL,
|
|
max_model_len=MAX_NUM_BATCHED_TOKEND)
|
|
model_config.hf_config = PretrainedConfig()
|
|
vllm_config.model_config = model_config
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
result_q, result_k = self.layer.forward(self.positions, self.query,
|
|
self.key)
|
|
|
|
mock__c.rotary_embedding.assert_called_once()
|
|
self.assertEqual(result_q.shape, self.query.shape)
|
|
self.assertEqual(result_k.shape, self.key.shape)
|
|
|
|
@patch('vllm_ascend.ops.rotary_embedding._custom_rotary_embedding_enabled',
|
|
return_value=False)
|
|
@patch('torch_npu._npu_rotary_embedding')
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_rope_forward_oot_contiguous(self, mock_npu_rotary,
|
|
mock_custom_enabled):
|
|
mock_config = MagicMock()
|
|
mock_config.torchair_graph_config.enabled = False
|
|
|
|
# Test contiguous path when custom is disabled
|
|
non_contig_query = self.query.transpose(0, 1)
|
|
non_contig_key = self.key.transpose(0, 1)
|
|
vllm_config = VllmConfig()
|
|
model_config = ModelConfig(MODEL,
|
|
tokenizer=MODEL,
|
|
max_model_len=MAX_NUM_BATCHED_TOKEND)
|
|
model_config.hf_config = PretrainedConfig()
|
|
vllm_config.model_config = model_config
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
result_q, result_k = self.layer.forward(self.positions,
|
|
non_contig_query,
|
|
non_contig_key)
|
|
|
|
mock_npu_rotary.assert_called_once()
|
|
self.assertEqual(result_q.shape, non_contig_query.shape)
|
|
self.assertEqual(result_k.shape, non_contig_key.shape)
|
|
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_rope_forward_oot_with_offsets(self):
|
|
mock_config = MagicMock()
|
|
mock_config.torchair_graph_config.enabled = False
|
|
|
|
# Test that NotImplementedError is raised when offsets is provided
|
|
offsets = torch.tensor([1, 2, 3])
|
|
with self.assertRaises(NotImplementedError):
|
|
vllm_config = VllmConfig()
|
|
model_config = ModelConfig(MODEL,
|
|
tokenizer=MODEL,
|
|
max_model_len=MAX_NUM_BATCHED_TOKEND)
|
|
model_config.hf_config = PretrainedConfig()
|
|
vllm_config.model_config = model_config
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
self.layer.forward(self.positions, self.query, self.key,
|
|
offsets)
|
|
|
|
@patch('vllm_ascend.ops.rotary_embedding._custom_rotary_embedding_enabled',
|
|
return_value=False)
|
|
@patch('torch_npu._npu_rotary_embedding')
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_rope_forward_oot_neox_style_override(self, mock_npu_rotary,
|
|
mock_custom_enabled):
|
|
mock_config = MagicMock()
|
|
mock_config.torchair_graph_config.enabled = False
|
|
|
|
# Test neox_style override
|
|
vllm_config = VllmConfig()
|
|
model_config = ModelConfig(MODEL,
|
|
tokenizer=MODEL,
|
|
max_model_len=MAX_NUM_BATCHED_TOKEND)
|
|
model_config.hf_config = PretrainedConfig()
|
|
vllm_config.model_config = model_config
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
result_q, result_k = self.layer.forward(
|
|
self.positions,
|
|
self.query,
|
|
self.key,
|
|
is_neox_style_override=False)
|
|
# Check that neox_style=False was passed to the NPU function
|
|
args, kwargs = mock_npu_rotary.call_args
|
|
self.assertFalse(args[-1])
|
|
|
|
@patch('vllm_ascend.ops.rotary_embedding._custom_rotary_embedding_enabled',
|
|
return_value=False)
|
|
@patch('torch_npu._npu_rotary_embedding')
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_rope_forward_oot_rotary_dim_less_than_head_size(
|
|
self, mock_npu_rotary, mock_custom_enabled):
|
|
mock_config = MagicMock()
|
|
mock_config.torchair_graph_config.enabled = False
|
|
|
|
# test case when rotary_dim < head_size
|
|
org_rotary_dim = self.layer.rotary_dim
|
|
self.layer.rotary_dim = self.layer.head_size // 2
|
|
|
|
vllm_config = VllmConfig()
|
|
model_config = ModelConfig(MODEL,
|
|
tokenizer=MODEL,
|
|
max_model_len=MAX_NUM_BATCHED_TOKEND)
|
|
model_config.hf_config = PretrainedConfig()
|
|
vllm_config.model_config = model_config
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
result_q, result_k = self.layer.forward(self.positions, self.query,
|
|
self.key)
|
|
|
|
mock_npu_rotary.assert_called_once()
|
|
self.assertEqual(result_q.shape, self.query.shape)
|
|
self.assertEqual(result_k.shape, self.key.shape)
|
|
|
|
# restore rotary_dim
|
|
self.layer.rotary_dim = org_rotary_dim
|
|
|
|
|
|
class MockRopeModule:
|
|
|
|
def __init__(self, max_seq_len=2048, is_neox_style=True):
|
|
self.max_seq_len = max_seq_len
|
|
self.is_neox_style = is_neox_style
|
|
self.cos_cached = None
|
|
self.sin_cached = None
|
|
self.rotary_dim = 1
|
|
self.base = 1
|
|
|
|
|
|
class TestAscendDeepseekScalingRotaryEmbedding(TestBase):
|
|
|
|
def setUp(self):
|
|
# Common setup for tests
|
|
self.positions = torch.tensor([1, 2, 3])
|
|
self.query = torch.randn(3, 1, 32, dtype=torch.float16)
|
|
self.key = torch.randn(3, 1, 32, dtype=torch.float16)
|
|
self.head_size = 32
|
|
self.rotary_dim = self.head_size
|
|
self.max_position = 16
|
|
self.rope_theta = 10000
|
|
self.is_neox_style = True
|
|
self.scaling_factor = 1
|
|
self.layer = None
|
|
|
|
def _create_layer(self):
|
|
self.layer = DeepseekScalingRotaryEmbedding(
|
|
self.head_size, self.rotary_dim, self.max_position,
|
|
self.rope_theta, self.is_neox_style, self.scaling_factor,
|
|
torch.float16)
|
|
return self.layer
|
|
|
|
@patch("vllm.platforms.current_platform.device_type",
|
|
new=torch.device("cpu"))
|
|
@patch("vllm_ascend.ops.rotary_embedding.NPUPlatform",
|
|
new_callable=PropertyMock)
|
|
def test_native_rope_deepseek_forward_base(self, mock_npuplatform):
|
|
mock_npuplatform.device_type = torch.device("cpu")
|
|
self.layer = self._create_layer()
|
|
with patch("vllm_ascend.ops.rotary_embedding._rope_forward_oot",
|
|
return_value=(self.query,
|
|
self.key)) as mock_rope_forward_oot:
|
|
q_pe, k_pe = self.layer.forward(self.positions, self.query,
|
|
self.key)
|
|
mock_rope_forward_oot.assert_called_once()
|
|
assert q_pe.shape == self.query.shape
|
|
assert k_pe.shape == self.key.shape
|
|
|
|
@patch('vllm_ascend.ops.rotary_embedding._rope_forward_oot')
|
|
@patch("vllm.platforms.current_platform.device_type",
|
|
new=torch.device("cpu"))
|
|
@patch("vllm_ascend.ops.rotary_embedding.NPUPlatform",
|
|
new_callable=PropertyMock)
|
|
def test_native_rope_deepseek_forward_key_reshaping(
|
|
self, mock_npuplatform, mock_rope_forward_oot):
|
|
mock_npuplatform.device_type = torch.device("cpu")
|
|
self.layer = self._create_layer()
|
|
|
|
key = torch.randn(1, 32)
|
|
|
|
mock_rope_forward_oot.return_value = (self.query, key)
|
|
|
|
q_pe, k_pe = self.layer.forward(self.positions, self.query, key)
|
|
mock_rope_forward_oot.assert_called_once()
|
|
assert q_pe.shape == self.query.shape
|
|
assert k_pe.shape == key.shape
|
|
|
|
@patch('vllm_ascend.ops.rotary_embedding._rope_forward_oot')
|
|
@patch("vllm.platforms.current_platform.device_type",
|
|
new=torch.device("cpu"))
|
|
@patch("vllm_ascend.ops.rotary_embedding.NPUPlatform",
|
|
new_callable=PropertyMock)
|
|
def test_native_rope_deepseek_forward_non_neox_style(
|
|
self, mock_npuplatform, mock_rope_forward_oot):
|
|
mock_npuplatform.device_type = torch.device("cpu")
|
|
self.layer = self._create_layer()
|
|
|
|
mock_rope_forward_oot.return_value = (self.query, self.key)
|
|
|
|
q_pe, k_pe = self.layer.forward(self.positions, self.query, self.key)
|
|
|
|
mock_rope_forward_oot.assert_called_once()
|
|
assert q_pe.shape == self.query.shape
|
|
assert k_pe.shape == self.key.shape
|
|
|
|
@patch("vllm.platforms.current_platform.device_type",
|
|
new=torch.device("cpu"))
|
|
@patch("vllm_ascend.ops.rotary_embedding.NPUPlatform",
|
|
new_callable=PropertyMock)
|
|
def test_basic_case(self, mock_npuplatform):
|
|
# Test with standard values
|
|
mock_npuplatform.device_type = torch.device("cpu")
|
|
self.layer = self._create_layer()
|
|
num_rotations = 100
|
|
dim = 512
|
|
base = 10000
|
|
max_position_embeddings = 2048
|
|
|
|
result = self.layer._yarn_find_correction_dim(num_rotations, dim, base,
|
|
max_position_embeddings)
|
|
|
|
# Calculate expected value manually
|
|
expected = (dim * torch.log(
|
|
torch.tensor(max_position_embeddings) /
|
|
(num_rotations * 2 * torch.pi))) / (2 *
|
|
torch.log(torch.tensor(base)))
|
|
|
|
self.assertTrue(torch.allclose(result, expected))
|
|
|
|
@patch("vllm.platforms.current_platform.device_type",
|
|
new=torch.device("cpu"))
|
|
@patch("vllm_ascend.ops.rotary_embedding.NPUPlatform",
|
|
new_callable=PropertyMock)
|
|
def test_yarn_get_mscale(self, mock_npuplatform):
|
|
mock_npuplatform.device_type = torch.device("cpu")
|
|
self.layer = self._create_layer()
|
|
|
|
# test_scale_less_than_or_equal_1
|
|
self.assertEqual(self.layer._yarn_get_mscale(scale=0.5), 1.0)
|
|
self.assertEqual(self.layer._yarn_get_mscale(scale=1.0), 1.0)
|
|
self.assertEqual(self.layer._yarn_get_mscale(scale=0.999), 1.0)
|
|
|
|
# test_scale_greater_than_1:
|
|
test_cases = [(2.0, 1.0, 1.0 + 0.1 * math.log(2.0)),
|
|
(10.0, 1.0, 1.0 + 0.1 * math.log(10.0)),
|
|
(5.0, 2.0, 1.0 + 0.2 * math.log(5.0)),
|
|
(math.e, 1.0, 1.0 + 0.1)]
|
|
|
|
for scale, mscale, expected in test_cases:
|
|
result = self.layer._yarn_get_mscale(scale, mscale)
|
|
self.assertAlmostEqual(
|
|
result,
|
|
expected,
|
|
places=6,
|
|
msg=f"Failed for scale={scale}, mscale={mscale}")
|
|
|
|
|
|
class TestAscendMRotaryEmbedding(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# Common setup for tests
|
|
self.number_tokens = 3
|
|
self.num_head = 8
|
|
self.num_kvhead = 8
|
|
self.head_size = 128
|
|
self.max_position_embeddings = 128000
|
|
self.is_neox_style = True
|
|
self.rope_theta = 1000000.0
|
|
self.positions_1d = torch.tensor([1, 2, 3])
|
|
self.positions_2d = torch.randint(1, 10, (3, self.number_tokens))
|
|
|
|
self.query = torch.randn(
|
|
(self.number_tokens, self.num_head * self.head_size),
|
|
dtype=torch.bfloat16)
|
|
self.key = torch.randn(
|
|
(self.number_tokens, self.num_kvhead * self.head_size),
|
|
dtype=torch.bfloat16)
|
|
|
|
# Qwen2.5-VL mrope section case
|
|
self.mrope_section = [16, 24, 24]
|
|
|
|
self.layer = MRotaryEmbedding(self.head_size,
|
|
self.head_size,
|
|
self.max_position_embeddings,
|
|
base=self.rope_theta,
|
|
is_neox_style=self.is_neox_style,
|
|
dtype=torch.bfloat16,
|
|
mrope_section=self.mrope_section)
|
|
|
|
self.mock_config = MagicMock()
|
|
self.mock_config.torchair_graph_config.enabled = False
|
|
|
|
def _create_vllm_config(self):
|
|
vllm_config = VllmConfig()
|
|
model_config = ModelConfig(MODEL_VL,
|
|
tokenizer=MODEL_VL,
|
|
max_model_len=MAX_NUM_BATCHED_TOKEND)
|
|
model_config.hf_config = PretrainedConfig()
|
|
vllm_config.model_config = model_config
|
|
return vllm_config
|
|
|
|
@patch('torch_npu.npu_mrope')
|
|
@patch('vllm_ascend.platform.NPUPlatform.get_cpu_architecture')
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_forward_oot_1d_positions(self, mock_cpu_arc, mock_npu_mrope):
|
|
mock_cpu_arc.return_value = CpuArchEnum.ARM
|
|
|
|
mock_npu_mrope.return_value = (torch.zeros_like(self.query),
|
|
torch.zeros_like(self.key))
|
|
|
|
vllm_config = self._create_vllm_config()
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
result_q, result_k = self.layer.forward_oot(
|
|
self.positions_1d, self.query, self.key)
|
|
|
|
mock_npu_mrope.assert_called_once()
|
|
self.assertFalse(torch.isnan(result_q).any().item())
|
|
self.assertFalse(torch.isnan(result_k).any().item())
|
|
self.assertEqual(result_q.shape, self.query.shape)
|
|
|
|
@patch('torch_npu.npu_mrope')
|
|
@patch('vllm_ascend.platform.NPUPlatform.get_cpu_architecture')
|
|
@patch('vllm.config.ModelConfig.__post_init__', MagicMock())
|
|
@patch('vllm.config.VllmConfig.__post_init__', MagicMock())
|
|
@patch('vllm.distributed.parallel_state._DP', MagicMock(world_size=1))
|
|
@patch('vllm.distributed.parallel_state._TP', MagicMock(world_size=1))
|
|
def test_forward_oot_2d_positions(self, mock_cpu_arc, mock_npu_mrope):
|
|
mock_cpu_arc.return_value = CpuArchEnum.ARM
|
|
|
|
mock_npu_mrope.return_value = (torch.zeros_like(self.query),
|
|
torch.zeros_like(self.key))
|
|
|
|
vllm_config = self._create_vllm_config()
|
|
with set_ascend_forward_context(None, vllm_config):
|
|
result_q, result_k = self.layer.forward_oot(
|
|
self.positions_2d, self.query, self.key)
|
|
|
|
mock_npu_mrope.assert_called_once()
|
|
self.assertFalse(torch.isnan(result_q).any().item())
|
|
self.assertFalse(torch.isnan(result_k).any().item())
|
|
self.assertEqual(result_q.shape, self.query.shape)
|