diff --git a/python/sglang/srt/layers/attention/triton_ops/rocm_mla_decode_rope.py b/python/sglang/srt/layers/attention/triton_ops/rocm_mla_decode_rope.py new file mode 100644 index 000000000..218244501 --- /dev/null +++ b/python/sglang/srt/layers/attention/triton_ops/rocm_mla_decode_rope.py @@ -0,0 +1,446 @@ +# Copyright 2023-2024 SGLang Team +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Memory-efficient attention for decoding. +It supports page size = 1. +""" + +# Adapted from +# https://github.com/ModelTC/lightllm/blob/96353e868a840db4d103138caf15ed9dbea8c186/lightllm/models/deepseek2/triton_kernel/gqa_flash_decoding_stage1.py +# https://github.com/ModelTC/lightllm/blob/96353e868a840db4d103138caf15ed9dbea8c186/lightllm/models/deepseek2/triton_kernel/gqa_flash_decoding_stage2.py + +import argparse +import logging +import sys + +import pytest +import torch +import triton +import triton.language as tl + +from sglang.srt.layers.attention.triton_ops.decode_attention import ( + _decode_softmax_reducev_fwd, +) +from sglang.srt.layers.rotary_embedding import DeepseekScalingRotaryEmbedding + + +def is_hip(): + return triton.runtime.driver.active.get_current_target().backend == "hip" + + +is_hip_ = is_hip() + + +@triton.jit +def tanh(x): + # Tanh is just a scaled sigmoid + return 2 * tl.sigmoid(2 * x) - 1 + + +@triton.jit +def _fwd_grouped_kernel_stage1_rope( + Q, # Holds [Q_NOPE; Q_PE], b x h x (d+r) + K_Buffer, # Holds [KV; K_PE], b*s x (c+r) + V_buffer, # Holds [KV], b*s x (c) + cos_sin_cache, # max_seq_len x (rotary_dim * 2) + positions, # sequence positions + sm_scale, + kv_indptr, + kv_indices, + Att_Out, # b x h x NUM_KV_SPLITS x (kv_lora_rank + 1) + k_pe_t_out, + stride_qb, + stride_qh, + stride_buf_kbs, + stride_buf_vbs, + stride_mid_ob, + stride_mid_oh, + stride_mid_os, + stride_kpe_tokens_out_b, + stride_cos_sin_cache_s, + stride_positions_b, + rotary_dim: tl.constexpr, + kv_lora_rank: tl.constexpr, + qk_rope_head_dim: tl.constexpr, + kv_group_num: tl.constexpr, + q_head_num: tl.constexpr, + BLOCK_C: tl.constexpr, + BLOCK_R: tl.constexpr, + BLOCK_N: tl.constexpr, + BLOCK_H: tl.constexpr, + NUM_KV_SPLITS: tl.constexpr, + logit_cap: tl.constexpr, + USE_ROPE: tl.constexpr, + IS_NEOX_STYLE: tl.constexpr, +): + + cur_batch = tl.program_id(0) + cur_head_id = tl.program_id(1) + split_kv_id = tl.program_id(2) + + if BLOCK_H < kv_group_num: + VALID_BLOCK_H: tl.constexpr = BLOCK_H + else: + VALID_BLOCK_H: tl.constexpr = kv_group_num + cur_head = cur_head_id * VALID_BLOCK_H + tl.arange(0, BLOCK_H) + mask_h = cur_head < (cur_head_id + 1) * VALID_BLOCK_H + mask_h = mask_h & (cur_head < q_head_num) + + offs_c = tl.arange(0, BLOCK_C) + offs_qk_r = tl.arange(kv_lora_rank, kv_lora_rank + BLOCK_R) # to get the k_pe + + off_q_pe = ( + cur_batch * stride_qb + cur_head[:, None] * stride_qh + offs_qk_r[None, :] + ) + offs_q = cur_batch * stride_qb + cur_head[:, None] * stride_qh + offs_c[None, :] + + mask_c = offs_c < kv_lora_rank + mask_qk_r = offs_qk_r < (kv_lora_rank + qk_rope_head_dim) + + cur_batch_kv_start_idx = tl.load(kv_indptr + cur_batch) + cur_batch_seq_len = tl.load(kv_indptr + cur_batch + 1) - cur_batch_kv_start_idx + + q = tl.load(Q + offs_q, mask=(mask_h[:, None]) & (mask_c[None, :]), other=0.0) + q_pe = tl.load( + Q + off_q_pe, mask=(mask_h[:, None]) & (mask_qk_r[None, :]), other=0.0 + ) + + kv_len_per_split = tl.cdiv(cur_batch_seq_len, NUM_KV_SPLITS) + split_kv_start = kv_len_per_split * split_kv_id + split_kv_end = tl.minimum(split_kv_start + kv_len_per_split, cur_batch_seq_len) + + # apply rotary embedding for q_pe, and k_pe (last token per batch of K_PE) + LAST_SPLIT = split_kv_end == cur_batch_seq_len + k_pe_last_token = tl.zeros([BLOCK_R], dtype=q.dtype) + + if USE_ROPE: + if IS_NEOX_STYLE: + # [BLOCK_ROTARY // 2, BLOCK_ROTARY // 2 + 1, BLOCK_ROTARY // 2 + 2, ..., 0, 1, 2, ..., BLOCK_ROTARY // 2 - 1, pass:] + offs_qk_rot_r = kv_lora_rank + ( + (tl.arange(0, BLOCK_R) + (rotary_dim // 2)) % rotary_dim + ) + # Which elements to flip + mask_rotate = tl.arange(0, BLOCK_R) < (rotary_dim // 2) + # [0 , 1, 2, ..., rotary_dim // 2 - 1, 0 , 1, 2, ..., rotary_dim // 2 - 1] + offs_rotary = tl.arange(0, BLOCK_R) % (rotary_dim // 2) + else: + # [1, 0, 3, 2, 5, 4, ..., BLOCK_R, BLOCK_R - 1] + offs_qk_rot_r = ( + kv_lora_rank + + (((tl.arange(0, BLOCK_R) + 1) % 2) * 2) + - 1 + + tl.arange(0, BLOCK_R) + ) + mask_rotate = tl.arange(0, BLOCK_R) % 2 < 1 + # [0, 0, 1, 1, ..., rotary_dim // 2 - 1, rotary_dim // 2 - 1] + offs_rotary = tl.arange(0, BLOCK_R) // 2 + + if qk_rope_head_dim > rotary_dim: + offs_qk_rot_r = tl.where( + tl.arange(0, BLOCK_R) < rotary_dim, offs_qk_rot_r, tl.arange(0, BLOCK_R) + ) + offs_rotary = tl.where( + tl.arange(0, BLOCK_R) < rotary_dim, offs_rotary, tl.arange(0, BLOCK_R) + ) + + mask_rotary = tl.arange(0, BLOCK_R) < rotary_dim + + pos = tl.load(positions + cur_batch * stride_positions_b) + cos = tl.load( + cos_sin_cache + pos * stride_cos_sin_cache_s + offs_rotary, + mask=mask_rotary, + other=1.0, + ) + sin = tl.load( + cos_sin_cache + + pos * stride_cos_sin_cache_s + + offs_rotary + + rotary_dim // 2, + mask_rotary, + other=0.0, + ) + + off_q_pe_rot = ( + cur_batch * stride_qb + + cur_head[:, None] * stride_qh + + offs_qk_rot_r[None, :] + ) + mask_qk_rot_r = offs_qk_rot_r < (kv_lora_rank + qk_rope_head_dim) + + # 0, 2, 4,.... 1, 3, 5... + q_pe_rot = tl.load( + Q + off_q_pe_rot, + mask=(mask_h[:, None]) & (mask_qk_rot_r[None, :]), + other=0.0, + ) + q_pe_rot = tl.where(mask_rotate[None, :], -q_pe_rot, q_pe_rot) + + q_pe = q_pe * cos + q_pe_rot * sin + + # we only apply to the last token in the K_PE + if LAST_SPLIT: + # debug assert + if (cur_batch == 0 and cur_head == 0) and split_kv_id < NUM_KV_SPLITS - 1: + tl.device_assert(False, "Only last split should compute k_pe") + + kv_loc = tl.load( + kv_indices + cur_batch_kv_start_idx + cur_batch_seq_len - 1 + ) + offs_buf_k_pe_last_token = kv_loc * stride_buf_kbs + offs_qk_r + offs_buf_k_pe_rot_last_token = kv_loc * stride_buf_kbs + offs_qk_rot_r + k_pe_last_token = tl.load(K_Buffer + offs_buf_k_pe_last_token) + + k_pe_rot_last_token = tl.load(K_Buffer + offs_buf_k_pe_rot_last_token) + k_pe_rot_last_token = tl.where( + mask_rotate, -k_pe_rot_last_token, k_pe_rot_last_token + ) + + k_pe_last_token = k_pe_last_token * cos + k_pe_rot_last_token * sin + + e_max = tl.zeros([BLOCK_H], dtype=tl.float32) - float("inf") + e_sum = tl.zeros([BLOCK_H], dtype=tl.float32) + acc = tl.zeros([BLOCK_H, BLOCK_C], dtype=tl.float32) + + if split_kv_end > split_kv_start: + for start_n in range(split_kv_start, split_kv_end, BLOCK_N): + offs_n = start_n + tl.arange(0, BLOCK_N) + kv_loc = tl.load( + kv_indices + cur_batch_kv_start_idx + offs_n, + mask=offs_n < split_kv_end, + other=0, + ) + + offs_buf_kv = kv_loc[None, :] * stride_buf_kbs + offs_c[:, None] + offs_buf_k_pe = kv_loc[None, :] * stride_buf_kbs + offs_qk_r[:, None] + + k_pe = tl.load( + K_Buffer + offs_buf_k_pe, + mask=(offs_n[None, :] < split_kv_end) & (mask_qk_r[:, None]), + other=0.0, + ) # positional embedding part of keys + + if USE_ROPE and start_n >= cur_batch_seq_len - BLOCK_N: + k_pe = tl.where( + offs_n[None, :] != (split_kv_end - 1), + k_pe, + k_pe_last_token[:, None], + ) + + # (16, 64) x (64, 32) + # dot product of rope parts + qk = tl.dot(q_pe, k_pe.to(q_pe.dtype)) + + kv = tl.load( + K_Buffer + offs_buf_kv, + mask=(offs_n[None, :] < split_kv_end) & (mask_c[:, None]), + other=0.0, + ) # the shared latent tensor for keys and values + + # (16, 512) x (512, 32) + # dot product of nope parts + qk += tl.dot(q, kv) + + qk *= sm_scale + + if logit_cap > 0: + qk = logit_cap * tanh(qk / logit_cap) + + qk = tl.where( + mask_h[:, None] & (offs_n[None, :] < split_kv_end), qk, float("-inf") + ) + + offs_buf_v = kv_loc[:, None] * stride_buf_vbs + offs_c[None, :] + v = tl.load( + V_buffer + offs_buf_v, + mask=(offs_n[:, None] < split_kv_end) & (mask_c[None, :]), + other=0.0, + ) + + n_e_max = tl.maximum(tl.max(qk, 1), e_max) + re_scale = tl.exp(e_max - n_e_max) + p = tl.exp(qk - n_e_max[:, None]) + acc *= re_scale[:, None] + # (16, 32) x (32, 512) + acc += tl.dot(p.to(v.dtype), v) + + e_sum = e_sum * re_scale + tl.sum(p, 1) + e_max = n_e_max + + offs_mid_o = ( + cur_batch * stride_mid_ob + + cur_head[:, None] * stride_mid_oh + + split_kv_id * stride_mid_os + + offs_c[None, :] + ) + + if USE_ROPE: + if LAST_SPLIT: + k_pe_last_token_ptrs = ( + k_pe_t_out + + cur_batch * stride_kpe_tokens_out_b + + tl.arange(0, BLOCK_R) + ) + tl.store(k_pe_last_token_ptrs, k_pe_last_token, mask=mask_qk_r) + + tl.store( + Att_Out + offs_mid_o, + acc / e_sum[:, None], + mask=(mask_h[:, None]) & (mask_c[None, :]), + ) + + offs_mid_o_1 = ( + cur_batch * stride_mid_ob + + cur_head * stride_mid_oh + + split_kv_id * stride_mid_os + + kv_lora_rank + ) + + tl.store( + Att_Out + offs_mid_o_1, + e_max + tl.log(e_sum), + mask=mask_h, + ) + + +# TODO rope offset +def _decode_grouped_att_m_fwd_rope( + q, + k_buffer, + v_buffer, + att_out, + k_pe_tokens_out, + kv_lora_rank, # c + cos_sin_cache, + positions, + rotary_dim, + kv_indptr, + kv_indices, + num_kv_splits, + sm_scale, + logit_cap, + use_rope, + is_neox_style=True, +): + if use_rope: + assert ( + k_pe_tokens_out is not None + ), "We must output the k_pe tokens with rope applied if rope fusion enabled." + + BLOCK = 32 + + # # [TODO] work around shmem limit on MI3xx + # if is_hip_ and kv_lora_rank >= 576: + # BLOCK = 16 + + qk_rope_head_dim = k_buffer.shape[-1] - kv_lora_rank + batch, head_num = kv_indptr.shape[0] - 1, q.shape[1] + kv_group_num = q.shape[1] // k_buffer.shape[1] + + BLOCK_C = triton.next_power_of_2(kv_lora_rank) + BLOCK_R = triton.next_power_of_2(qk_rope_head_dim) + + BLOCK_H = 16 + NUM_KV_SPLITS = num_kv_splits + grid = ( + batch, + triton.cdiv(head_num, min(BLOCK_H, kv_group_num)), + NUM_KV_SPLITS, + ) + + extra_kargs = {} + num_stages = 2 + if is_hip_: + # https://rocm.docs.amd.com/en/docs-6.2.0/how-to/llm-fine-tuning-optimization/optimizing-triton-kernel.html + # https://github.com/triton-lang/triton/blob/main/third_party/amd/backend/compiler.py + extra_kargs = {"waves_per_eu": 1, "matrix_instr_nonkdim": 16, "kpack": 2} + num_stages = 1 + + _fwd_grouped_kernel_stage1_rope[grid]( + q, + k_buffer, + v_buffer, + cos_sin_cache, + positions, + sm_scale, + kv_indptr, + kv_indices, + att_out, + k_pe_tokens_out, + q.stride(0), + q.stride(1), + k_buffer.stride(0), + v_buffer.stride(0), + att_out.stride(0), + att_out.stride(1), + att_out.stride(2), + k_pe_tokens_out.stride(0) if use_rope else 0, + cos_sin_cache.stride(0) if use_rope else 0, + positions.stride(0) if use_rope else 0, + rotary_dim, + kv_lora_rank, + qk_rope_head_dim, + kv_group_num=kv_group_num, + q_head_num=head_num, + BLOCK_C=BLOCK_C, + BLOCK_R=BLOCK_R, + BLOCK_N=BLOCK, + BLOCK_H=BLOCK_H, + NUM_KV_SPLITS=NUM_KV_SPLITS, + logit_cap=logit_cap, + USE_ROPE=use_rope, + IS_NEOX_STYLE=is_neox_style, + num_warps=4, + num_stages=num_stages, + **extra_kargs + ) + + +def decode_attention_fwd_grouped_rope( + q, + k_buffer, + v_buffer, + o, + kv_indptr, + kv_indices, + k_pe_tokens, + kv_lora_rank, + rotary_dim, + cos_sin_cache, + positions, + attn_logits, + num_kv_splits, + sm_scale, + logit_cap=0.0, + use_rope=False, + is_neox_style=False, +): + _decode_grouped_att_m_fwd_rope( + q, + k_buffer, + v_buffer, + attn_logits, + k_pe_tokens, + kv_lora_rank, + cos_sin_cache, + positions, + rotary_dim, + kv_indptr, + kv_indices, + num_kv_splits, + sm_scale, + logit_cap, + use_rope, + is_neox_style, + ) + _decode_softmax_reducev_fwd(attn_logits, q, o, v_buffer, kv_indptr, num_kv_splits) diff --git a/python/sglang/srt/models/deepseek_v2.py b/python/sglang/srt/models/deepseek_v2.py old mode 100644 new mode 100755 index 6a6b5f387..0de01f3f9 --- a/python/sglang/srt/models/deepseek_v2.py +++ b/python/sglang/srt/models/deepseek_v2.py @@ -16,6 +16,7 @@ # https://github.com/vllm-project/vllm/blob/fb6af8bc086328ca6659e72d11ffd4309ce4de22/vllm/model_executor/models/deepseek_v2.py """Inference-only DeepseekV2 model.""" +import os from typing import Any, Dict, Iterable, Optional, Tuple import torch @@ -31,6 +32,9 @@ from sglang.srt.distributed import ( tensor_model_parallel_all_reduce, ) from sglang.srt.layers.activation import SiluAndMul +from sglang.srt.layers.attention.triton_ops.rocm_mla_decode_rope import ( + decode_attention_fwd_grouped_rope, +) from sglang.srt.layers.layernorm import RMSNorm from sglang.srt.layers.linear import ( ColumnParallelLinear, @@ -533,7 +537,18 @@ class DeepseekV2AttentionMLA(nn.Module): if no_absorb(): return self.forward_normal(positions, hidden_states, forward_batch) else: - return self.forward_absorb(positions, hidden_states, forward_batch) + if is_hip_: + if ( + os.getenv("SGLANG_ROCM_FUSED_DECODE_MLA") == "1" + and forward_batch.forward_mode.is_decode() + ): + return self.forward_absorb_fused_mla_rope( + positions, hidden_states, forward_batch + ) + else: + return self.forward_absorb(positions, hidden_states, forward_batch) + else: + return self.forward_absorb(positions, hidden_states, forward_batch) def forward_normal( self, @@ -652,6 +667,149 @@ class DeepseekV2AttentionMLA(nn.Module): return output + def forward_absorb_fused_mla_rope( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + forward_batch: ForwardBatch, + ) -> torch.Tensor: + enable_rope_fusion = ( + os.getenv("SGLANG_FUSED_MLA_ENABLE_ROPE_FUSION", "1") == "1" + ) + q_len = hidden_states.shape[0] + q_input = hidden_states.new_empty( + q_len, self.num_local_heads, self.kv_lora_rank + self.qk_rope_head_dim + ) + if self.q_lora_rank is not None: + q = self.q_a_proj(hidden_states)[0] + q = self.q_a_layernorm(q) + q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim) + else: + q = self.q_proj(hidden_states)[0].view( + -1, self.num_local_heads, self.qk_head_dim + ) + q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1) + + if self.w_kc.dtype == torch.float8_e4m3fnuz: + # TODO(kernel): add bmm_fp8 for torch.float8_e4m3fnuz + q_nope_out = torch.bmm( + q_nope.to(torch.bfloat16).transpose(0, 1), + self.w_kc.to(torch.bfloat16) * self.w_scale, + ) + elif self.w_kc.dtype == torch.float8_e4m3fn: + q_nope_val, q_nope_scale = input_to_float8( + q_nope.transpose(0, 1), torch.float8_e4m3fn + ) + q_nope_out = bmm_fp8( + q_nope_val, self.w_kc, q_nope_scale, self.w_scale, torch.bfloat16 + ) + else: + q_nope_out = torch.bmm(q_nope.transpose(0, 1), self.w_kc) + q_input[..., : self.kv_lora_rank] = q_nope_out.transpose(0, 1) + + latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0] + v_input = latent_cache[..., : self.kv_lora_rank] + v_input = self.kv_a_layernorm(v_input.contiguous()).unsqueeze(1) + k_input = latent_cache.unsqueeze(1) + k_input[..., : self.kv_lora_rank] = v_input + + if not enable_rope_fusion: + k_pe = k_input[..., self.kv_lora_rank :] + q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe) + q_input[..., self.kv_lora_rank :] = q_pe + k_input[..., self.kv_lora_rank :] = k_pe + k_pe_output = None + else: + k_pe_output = torch.empty_like(k_input[..., self.kv_lora_rank :]) + + q_input[..., self.kv_lora_rank :] = q_pe + + # attn_output = self.attn_mqa(q_input, k_input, v_input, forward_batch) + # Use Fused ROPE with use_rope=OFF. + attn_output = torch.empty( + (q_len, self.num_local_heads, self.kv_lora_rank), + dtype=q.dtype, + device=q.device, + ) + attn_logits, _, kv_indptr, kv_indices, _, _, _ = ( + forward_batch.attn_backend.forward_metadata + ) + cos_sin_cache = self.rotary_emb.cos_sin_cache + num_kv_split = forward_batch.attn_backend.num_kv_splits + sm_scale = self.attn_mqa.scaling + if attn_logits is None: + attn_logits = torch.empty( + ( + forward_batch.batch_size, + self.num_local_heads, + num_kv_split, + self.kv_lora_rank + 1, + ), + dtype=torch.float32, + device=q.device, + ) + + # save current latent cache. + forward_batch.token_to_kv_pool.set_kv_buffer( + self.attn_mqa, forward_batch.out_cache_loc, k_input, None + ) + key_cache_buf = forward_batch.token_to_kv_pool.get_key_buffer( + self.attn_mqa.layer_id + ) + val_cache_buf = key_cache_buf[..., : self.kv_lora_rank] + + decode_attention_fwd_grouped_rope( + q_input, + key_cache_buf, + val_cache_buf, + attn_output, + kv_indptr, + kv_indices, + k_pe_output, + self.kv_lora_rank, + self.rotary_emb.rotary_dim, + cos_sin_cache, + positions, + attn_logits, + num_kv_split, + sm_scale, + logit_cap=self.attn_mqa.logit_cap, + use_rope=enable_rope_fusion, + is_neox_style=self.rotary_emb.is_neox_style, + ) + + if enable_rope_fusion: + k_input[..., self.kv_lora_rank :] = k_pe_output + forward_batch.token_to_kv_pool.set_kv_buffer( + self.attn_mqa, forward_batch.out_cache_loc, k_input, None + ) + + attn_output = attn_output.view(-1, self.num_local_heads, self.kv_lora_rank) + + if self.w_vc.dtype == torch.float8_e4m3fnuz: + # TODO(kernel): add bmm_fp8 for torch.float8_e4m3fnuz + attn_bmm_output = torch.bmm( + attn_output.to(torch.bfloat16).transpose(0, 1), + self.w_vc.to(torch.bfloat16) * self.w_scale, + ) + elif self.w_vc.dtype == torch.float8_e4m3fn: + attn_output_val, attn_output_scale = input_to_float8( + attn_output.transpose(0, 1), torch.float8_e4m3fn + ) + attn_bmm_output = bmm_fp8( + attn_output_val, + self.w_vc, + attn_output_scale, + self.w_scale, + torch.bfloat16, + ) + else: + attn_bmm_output = torch.bmm(attn_output.transpose(0, 1), self.w_vc) + attn_output = attn_bmm_output.transpose(0, 1).flatten(1, 2) + output, _ = self.o_proj(attn_output) + + return output + def all_gather( input_tensor: torch.Tensor, forward_batch: ForwardBatch, rank, world_size, group diff --git a/test/srt/test_triton_attention_rocm_mla.py b/test/srt/test_triton_attention_rocm_mla.py new file mode 100644 index 000000000..c2a11f979 --- /dev/null +++ b/test/srt/test_triton_attention_rocm_mla.py @@ -0,0 +1,258 @@ +import random +import unittest + +import torch + +from sglang.srt.layers.attention.triton_ops.decode_attention import ( + decode_attention_fwd_grouped, +) +from sglang.srt.layers.attention.triton_ops.rocm_mla_decode_rope import ( + decode_attention_fwd_grouped_rope, +) +from sglang.srt.layers.rotary_embedding import DeepseekScalingRotaryEmbedding + + +class TestTritonAttentionMLA(unittest.TestCase): + + def _set_all_seeds(self, seed): + """Set all random seeds for reproducibility.""" + random.seed(seed) + torch.manual_seed(seed) + torch.cuda.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + + def setUp(self): + # Set seeds before each test method + self._set_all_seeds(42) + + def preprocess_kv_cache(self, kv_cache, kv_lora_rank): + latent_cache = kv_cache + v_input = latent_cache[..., :kv_lora_rank] + v_input = v_input.contiguous().unsqueeze(1) + k_input = latent_cache.unsqueeze(1) + k_input[..., :kv_lora_rank] = v_input + + return k_input, v_input + + def input_helper( + self, + B, + H, + S, + kv_lora_rank, + rotary_dim, + qk_rope_head_dim, + num_kv_splits, + dtype, + device, + rope_base=10, + rope_max_seq_len=16384, + rope_scaling=1.0, + is_neox_style=False, + ): + q = torch.randn( + B, H, kv_lora_rank + qk_rope_head_dim, device=device, dtype=dtype + ) + kv_cache = torch.randn( + B * S, kv_lora_rank + qk_rope_head_dim, dtype=dtype, device=device + ) + kv_indptr = torch.arange(B + 1, device=device) * S + kv_indices = torch.arange(B * S, device=device) + attn_logits = torch.empty( + B, H, num_kv_splits, kv_lora_rank + 1, dtype=dtype, device=device + ) + rotary_emb = DeepseekScalingRotaryEmbedding( + qk_rope_head_dim, + rotary_dim, + rope_max_seq_len, + rope_base, + is_neox_style, + rope_scaling, + q.dtype, + device="cpu", + ).cuda() + positions = torch.tensor([S], device=device).unsqueeze(0).repeat(B, 1) + + return kv_indptr, kv_indices, q, kv_cache, attn_logits, rotary_emb, positions + + def ref_compute_full_fwd( + self, + q, + k_input, + v_input, + kv_lora_rank, + kv_indptr, + kv_indices, + num_kv_splits, + sm_scale, + logit_cap, + rotary_emb, + positions, + use_rope, + device="cuda", + ): + + B, H = q.shape[0], q.shape[1] + S = kv_indptr[1].item() + qk_rope_head_dim = k_input.shape[-1] - kv_lora_rank + + q_input = torch.empty(B, H, kv_lora_rank + qk_rope_head_dim, dtype=q.dtype).to( + device + ) + q_nope_out, q_pe = q.split([kv_lora_rank, qk_rope_head_dim], dim=-1) + k_pe_t = k_input.view(B, 1, S, -1)[:, :, -1:, kv_lora_rank:] + + if use_rope: + q_pe, k_pe_t = rotary_emb(positions, q_pe.unsqueeze(2), k_pe_t) + q_pe = q_pe.squeeze() + + k_input.view(B, 1, S, -1)[:, :, -1:, kv_lora_rank:] = k_pe_t + + q_input[..., :kv_lora_rank] = q_nope_out + q_input[..., kv_lora_rank:] = q_pe + + B, H = q_input.shape[0], q_input.shape[1] + kv_lora_rank = v_input.shape[-1] + device = q_input.device + + attn_logits = torch.empty( + B, H, num_kv_splits, kv_lora_rank + 1, dtype=q_input.dtype, device=device + ) + o = torch.empty(B, H, kv_lora_rank, dtype=q_input.dtype, device=device) + + decode_attention_fwd_grouped( + q_input, + k_input, + v_input, + o, + kv_indptr, + kv_indices, + attn_logits, + num_kv_splits, + sm_scale, + logit_cap, + ) + + return attn_logits, o, k_pe_t.squeeze() + + def _test_rocm_fused_mla_kernel( + self, + B, + H, + S, + kv_lora_rank, + qk_rope_head_dim, + rotary_dim, + dtype, + use_rope, + is_neox_style, + num_kv_splits=2, + sm_scale=1.0, + logit_cap=0.0, + device="cuda", + ): + kv_indptr, kv_indices, q, kv_cache, attn_logits, rotary_emb, positions = ( + self.input_helper( + B, + H, + S, + kv_lora_rank, + rotary_dim, + qk_rope_head_dim, + num_kv_splits, + dtype, + device=device, + is_neox_style=is_neox_style, + ) + ) + + k_input, v_input = self.preprocess_kv_cache(kv_cache, kv_lora_rank) + k_pe_tokens = torch.empty( + B, qk_rope_head_dim, dtype=kv_cache.dtype, device=device + ) + tri_o = torch.empty(B, H, kv_lora_rank, dtype=kv_cache.dtype, device=device) + + decode_attention_fwd_grouped_rope( + q, + k_input, + v_input, + tri_o, + kv_indptr, + kv_indices, + k_pe_tokens if use_rope else None, + kv_lora_rank, + rotary_dim if use_rope else None, + rotary_emb.cos_sin_cache if use_rope else None, + positions if use_rope else None, + attn_logits, + num_kv_splits, + sm_scale, + logit_cap, + use_rope, + is_neox_style, + ) + + tri_logits = attn_logits + + # reference + ref_logits, ref_o, ref_k_pe_tokens = self.ref_compute_full_fwd( + q, + k_input, + v_input, + kv_lora_rank, + kv_indptr, + kv_indices, + num_kv_splits, + sm_scale, + logit_cap, + rotary_emb, + positions, + use_rope, + device="cuda", + ) + + if use_rope: + torch.testing.assert_close( + ref_k_pe_tokens, k_pe_tokens.squeeze(), atol=1e-2, rtol=1e-2 + ) + torch.testing.assert_close(ref_logits, tri_logits, atol=1e-2, rtol=1e-2) + torch.testing.assert_close(ref_o, tri_o, atol=1e-2, rtol=1e-2) + + def test_grouped_rocm_fused_mla(self): + configs = [ + (1, 128, 2048, 512, 64, 64), + (1, 128, 2048, 512, 128, 64), + (1, 128, 2048, 512, 127, 64), + (1, 128, 2050, 512, 127, 64), + (1, 128, 2050, 512, 128, 64), + (8, 128, 2048, 512, 64, 64), + (8, 128, 2048, 512, 128, 64), + (8, 128, 2048, 512, 127, 64), + (8, 128, 2050, 512, 127, 64), + (8, 128, 2050, 512, 128, 64), + ] + dtypes = [torch.bfloat16, torch.float32] + use_rope_list = [True, False] + is_neox_style_list = [True, False] + + for B, H, S, kv_lora_rank, qk_rope_head_dim, rotary_dim in configs: + for dtype in dtypes: + for use_rope in use_rope_list: + for is_neox_style in is_neox_style_list: + self._test_rocm_fused_mla_kernel( + B, + H, + S, + kv_lora_rank, + qk_rope_head_dim, + rotary_dim, + dtype, + use_rope, + is_neox_style, + ) + + +if __name__ == "__main__": + unittest.main()