### What this PR does / why we need it?
1. [Refact] Refact MLA/SFA weight prefetch to consist with moe weight
prefetch
2. Remove duplicated o_proj weight prefetch in forward for MLA/SFA
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
1) Performance result:
Perf test data:
*) MLA:
| | 1st test | 2nd test | Output Token Throughput(Avg) | Performance
improvement percentage |
| --- | --- | --- | --- | --- |
| o_proj duplicate prefetch | 11.9669 token/s | 12.0287 token/s |
11.9978 |
| o_proj no duplicate prefetch | 12.5594 token/s | 12.6216 token/s |
12.5905 | 4.94%| |
single layer performace improve: 5%~8%
*) SFA:
| | 1st test | 2nd test | Output Token Throughput(Avg) | Performance
improvement percentage |
| --- | --- | --- | --- | --- |
| o_proj duplicate prefetch | 13.0523 token/s | 13.1084 token/s |
13.08035 | |
| o_proj no duplicate prefetch | 13.9844 token/s | 14.1678 token/s |
14.0761 | 7.6% |
- vLLM version: v0.15.0
- vLLM main:
d7e17aaacd
---------
Signed-off-by: leo-pony <nengjunma@outlook.com>
165 lines
5.8 KiB
Python
165 lines
5.8 KiB
Python
import os
|
|
import unittest
|
|
from unittest import mock
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import torch
|
|
from vllm import config
|
|
|
|
from tests.ut.base import TestBase
|
|
from vllm_ascend import ascend_config
|
|
from vllm_ascend.distributed import parallel_state
|
|
from vllm_ascend.ops.linear import (AscendMergedColumnParallelLinear,
|
|
AscendReplicatedLinear,
|
|
AscendRowParallelLinear,
|
|
AscendUnquantizedLinearMethod)
|
|
|
|
|
|
class BaseLinearTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.mock_group = mock.MagicMock()
|
|
self.mock_group.world_size = 2
|
|
self.mock_group.rank_in_group = 0
|
|
|
|
parallel_state._MLP_TP = self.mock_group
|
|
parallel_state._OTP = self.mock_group
|
|
|
|
self.mock_ascend_config = MagicMock()
|
|
self.mock_ascend_config.finegrained_tp_config.oproj_tensor_parallel_size = 2
|
|
self.mock_ascend_config.finegrained_tp_config.mlp_tensor_parallel_size = 2
|
|
|
|
self.patches = [
|
|
patch("vllm_ascend.ascend_config.get_ascend_config",
|
|
return_value=self.mock_ascend_config),
|
|
patch("vllm_ascend.distributed.parallel_state.get_otp_group",
|
|
return_value=self.mock_group),
|
|
patch("vllm_ascend.distributed.parallel_state.get_mlp_tp_group",
|
|
return_value=self.mock_group),
|
|
patch("vllm_ascend.ops.linear_op.get_tp_group",
|
|
return_value=self.mock_group),
|
|
patch(
|
|
"vllm.distributed.parallel_state.get_tp_group",
|
|
return_value=self.mock_group,
|
|
),
|
|
patch("vllm_ascend.utils.mlp_tp_enable", return_value=True),
|
|
patch("vllm_ascend.utils.oproj_tp_enable", return_value=True)
|
|
]
|
|
|
|
for p in self.patches:
|
|
p.start()
|
|
|
|
def tearDown(self):
|
|
for p in self.patches:
|
|
p.stop()
|
|
|
|
|
|
class TestAscendUnquantizedLinearMethod(TestBase):
|
|
|
|
def setUp(self):
|
|
self.method = AscendUnquantizedLinearMethod()
|
|
self.layer = mock.MagicMock()
|
|
mock_dtype = mock.PropertyMock(return_value=torch.float16)
|
|
type(self.layer.weight.data).dtype = mock_dtype
|
|
|
|
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "0"})
|
|
@mock.patch("torch_npu.npu_format_cast")
|
|
def test_process_weights_after_loading_with_nz0(self, mock_format_cast):
|
|
self.method.process_weights_after_loading(self.layer)
|
|
mock_format_cast.assert_not_called()
|
|
|
|
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "1"})
|
|
@mock.patch("torch_npu.npu_format_cast")
|
|
def test_process_weights_after_loading_with_nz1(self, mock_format_cast):
|
|
self.method.process_weights_after_loading(self.layer)
|
|
mock_format_cast.assert_not_called()
|
|
|
|
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "2"})
|
|
@mock.patch("torch_npu.npu_format_cast")
|
|
def test_process_weights_after_loading_with_nz2(self, mock_format_cast):
|
|
self.method.process_weights_after_loading(self.layer)
|
|
mock_format_cast.assert_called_once()
|
|
|
|
|
|
class TestAscendRowParallelLinear(BaseLinearTest):
|
|
|
|
@patch("vllm_ascend.ops.linear_op.get_weight_prefetch_method",
|
|
return_value=MagicMock())
|
|
def test_mlp_optimize(self, mock_get_weight_prefetch_method):
|
|
|
|
ascend_config._ASCEND_CONFIG = MagicMock()
|
|
ascend_config._ASCEND_CONFIG.recompute_scheduler_enable = False
|
|
ascend_config._ASCEND_CONFIG.finegrained_tp_config.mlp_tensor_parallel_size = 2
|
|
ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False
|
|
|
|
linear = AscendRowParallelLinear(
|
|
input_size=16,
|
|
output_size=8,
|
|
prefix="down_proj",
|
|
)
|
|
self.assertEqual(linear.custom_op.comm_group, parallel_state._MLP_TP)
|
|
|
|
input_tensor = torch.randn(16, 8)
|
|
linear(input_tensor)
|
|
|
|
@patch("vllm_ascend.ops.linear_op.get_weight_prefetch_method",
|
|
return_value=MagicMock())
|
|
def test_oproj_tp(self, mock_get_weight_prefetch_method):
|
|
|
|
config._current_vllm_config = MagicMock()
|
|
|
|
ascend_config._ASCEND_CONFIG = MagicMock()
|
|
ascend_config._ASCEND_CONFIG.recompute_scheduler_enable = False
|
|
ascend_config._ASCEND_CONFIG.finegrained_tp_config.oproj_tensor_parallel_size = 2
|
|
ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False
|
|
|
|
linear = AscendRowParallelLinear(
|
|
input_size=16,
|
|
output_size=8,
|
|
prefix="o_proj",
|
|
)
|
|
self.assertEqual(linear.custom_op.comm_group, parallel_state._OTP)
|
|
|
|
input_tensor = torch.randn(16, 8)
|
|
linear(input_tensor)
|
|
|
|
|
|
class TestAscendMergedColumnParallelLinear(BaseLinearTest):
|
|
|
|
def test_merged_mlp_tp_init(self):
|
|
|
|
ascend_config._ASCEND_CONFIG = MagicMock()
|
|
ascend_config._ASCEND_CONFIG.recompute_scheduler_enable = False
|
|
ascend_config._ASCEND_CONFIG.finegrained_tp_config.mlp_tensor_parallel_size = 2
|
|
ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False
|
|
|
|
linear = AscendMergedColumnParallelLinear(
|
|
input_size=16,
|
|
output_sizes=[8, 8],
|
|
prefix="gate_up_proj",
|
|
)
|
|
self.assertEqual(linear.custom_op.comm_group, parallel_state._MLP_TP)
|
|
|
|
|
|
class TestAscendReplicatedLinear(BaseLinearTest):
|
|
|
|
def test_init_disable_tp(self):
|
|
linear = AscendReplicatedLinear(
|
|
input_size=16,
|
|
output_size=8,
|
|
)
|
|
self.assertTrue(
|
|
isinstance(linear.quant_method, AscendUnquantizedLinearMethod))
|
|
|
|
def test_init_without_disable_tp(self):
|
|
linear = AscendReplicatedLinear(
|
|
input_size=16,
|
|
output_size=8,
|
|
)
|
|
self.assertTrue(
|
|
isinstance(linear.quant_method, AscendUnquantizedLinearMethod))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|