Support multistream of MLA vector operations (#1135)

### What this PR does / why we need it?
Move all vector operations to a secondary stream, with the expected
overlaping being:
```
              | q_rmsnorm |                  | kv_norm_rope_cache |       | q_rope |
| matmul W_DQ | matmul W_DKV | index | index |    matmul W_UQ     | split | matmul W_KV_T |
```

Currently, the `IndexByTensor` operators introduced by computation of
`cos` and `sin` can't be offloaded to the secondary stream due to a
known bug of graph fusion optimization pass. So we instead keep it in
the main stream, only requires it be computed before `matmul W_UQ` to
avoid hindering later overlapping. The problem may be solved by later
optimization (#993), which hoists the computation of `cos` and `sin` up
to the first layer.

### Does this PR introduce _any_ user-facing change?
Controlled by `torchair_graph_config.enable_multistream_mla`, defaulted
to False.

### How was this patch tested?
Tested on 1x16 910 node, with tailored 2 layer DSKv2.

Signed-off-by: sdmyzlp <lrwei2@petalmail.com>
This commit is contained in:
sdmyzlp
2025-06-12 21:42:09 +08:00
committed by GitHub
parent 55c0e68883
commit e72f94e38f
5 changed files with 56 additions and 19 deletions

View File

@@ -19,6 +19,7 @@ from vllm_ascend.multistream.base import MSAttentionMetadataSplitConfig
from vllm_ascend.multistream.context import get_multistream_comm_context
from vllm_ascend.multistream.ms_split import model_input_split_v1_mla_attn
from vllm_ascend.ops.attention import vanilla_chunked_prefill_mla
from vllm_ascend.utils import npu_stream_switch, npu_wait_tensor
if TYPE_CHECKING:
from vllm.v1.core.sched.output import SchedulerOutput
@@ -481,6 +482,9 @@ class AscendMLAImpl(MLAAttentionImpl):
ascend_config = get_ascend_config()
self.torchair_graph_enabled = ascend_config.torchair_graph_config.enabled
self.enable_kv_nz = ascend_config.torchair_graph_config.enable_kv_nz
self.enable_multistream_mla = \
ascend_config.torchair_graph_config.enable_multistream_mla
# Adapt torch air graph mode with spec decoding.
speculative_config = get_current_vllm_config().speculative_config
if speculative_config is not None:
@@ -664,17 +668,20 @@ class AscendMLAImpl(MLAAttentionImpl):
# npu_kv_rmsnorm_rope_cache needs [B, N, S, D]
kv = kv.view(B, N, S, self.kv_lora_rank + self.qk_rope_head_dim)
cache_mode = "PA_NZ" if self.enable_kv_nz else "PA"
k_pe, k_nope, _, _ = torch_npu.npu_kv_rmsnorm_rope_cache(
kv,
self.kv_a_layernorm.weight,
cos,
sin,
slots.to(torch.int64),
kv_cache[1],
kv_cache[0],
epsilon=self.kv_a_layernorm.variance_epsilon,
cache_mode=cache_mode,
)
with npu_stream_switch("mla_secondary",
0,
enabled=self.enable_multistream_mla):
k_pe, k_nope, _, _ = torch_npu.npu_kv_rmsnorm_rope_cache(
kv,
self.kv_a_layernorm.weight,
cos,
sin,
slots.to(torch.int64),
kv_cache[1],
kv_cache[0],
epsilon=self.kv_a_layernorm.variance_epsilon,
cache_mode=cache_mode,
)
return k_pe, k_nope
def exec_kv_prefill(
@@ -867,23 +874,38 @@ class AscendMLAImpl(MLAAttentionImpl):
if has_decode:
decode_k_nope = None
assert attn_metadata.decode is not None
decode_ql_nope, decode_q_pe = \
self._q_proj_and_k_up_proj(decode_hs_or_q_c)
if self.running_in_graph:
seq_len = self.rotary_emb.max_position_embeddings
cos = self.rotary_emb.cos_cached[:seq_len].to(
dtype=decode_q_pe.dtype)
dtype=decode_hs_or_q_c.dtype)
sin = self.rotary_emb.sin_cached[:seq_len].to(
dtype=decode_q_pe.dtype)
dtype=decode_hs_or_q_c.dtype)
cos = cos[attn_metadata.decode.input_positions]
sin = sin[attn_metadata.decode.input_positions]
cos = cos[:, None, None, :]
sin = sin[:, None, None, :]
decode_q_pe = self.rope_single(decode_q_pe, cos, sin)
# Without explicitly controlling the order, IndexByTensor operations
# would be placed after `matmul W_KV_T` hindering the overlapping of
# KvRmsNormRopeCache and SingleRope.
npu_wait_tensor(decode_hs_or_q_c,
cos,
enabled=self.enable_multistream_mla)
npu_wait_tensor(decode_hs_or_q_c,
sin,
enabled=self.enable_multistream_mla)
decode_ql_nope, decode_q_pe = \
self._q_proj_and_k_up_proj(decode_hs_or_q_c)
if self.running_in_graph:
decode_k_pe, decode_k_nope = self.exec_kv(
hidden_states_or_kv_c_normed, cos, sin, kv_cache,
attn_metadata.slot_mapping)
with npu_stream_switch("mla_secondary",
0,
enabled=self.enable_multistream_mla):
npu_wait_tensor(decode_q_pe,
decode_k_pe,
enabled=self.enable_multistream_mla)
decode_q_pe = self.rope_single(decode_q_pe, cos, sin)
else:
decode_q_pe[...], decode_k_pe[...] = self.rotary_emb(
attn_metadata.decode.input_positions,