[7/n] decouple quantization impl from vllm dependency - gguf kernel (#11019)
This commit is contained in:
160
sgl-kernel/tests/test_gguf.py
Normal file
160
sgl-kernel/tests/test_gguf.py
Normal file
@@ -0,0 +1,160 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
import torch
|
||||
from gguf import GGMLQuantizationType, GGUFReader, ReaderTensor, dequantize
|
||||
from huggingface_hub import snapshot_download
|
||||
from sgl_kernel import (
|
||||
ggml_dequantize,
|
||||
ggml_moe_a8,
|
||||
ggml_moe_a8_vec,
|
||||
ggml_moe_get_block_size,
|
||||
ggml_mul_mat_a8,
|
||||
ggml_mul_mat_vec_a8,
|
||||
)
|
||||
|
||||
GGUF_SAMPLE = snapshot_download("Isotr0py/test-gguf-sample")
|
||||
GGUF_SAMPLE_MOE = snapshot_download("SzymonOzog/test-gguf-moe-sample")
|
||||
|
||||
|
||||
def get_gguf_sample_tensors(
|
||||
hidden_size: int, quant_type: GGMLQuantizationType
|
||||
) -> list[ReaderTensor]:
|
||||
sample_dir = GGUF_SAMPLE
|
||||
filename = f"Quant_{quant_type.name}_{hidden_size}.gguf"
|
||||
sample_file = Path(sample_dir) / filename
|
||||
return GGUFReader(sample_file).tensors
|
||||
|
||||
|
||||
def get_gguf_MoE_tensors(
|
||||
hidden_size: int, quant_type: GGMLQuantizationType
|
||||
) -> list[ReaderTensor]:
|
||||
sample_dir = GGUF_SAMPLE_MOE
|
||||
filename = f"Quant_{quant_type.name}_{hidden_size}.gguf"
|
||||
sample_file = Path(sample_dir) / filename
|
||||
return GGUFReader(sample_file).tensors
|
||||
|
||||
|
||||
DTYPES = [torch.bfloat16] # [torch.half, torch.bfloat16, torch.float32]
|
||||
# Hidden_size for testing, must match the sample file in HF repo,
|
||||
# we have `hidden_size = 256, 1024` for test in HF repo currently.
|
||||
HIDDEN_SIZES = [256, 1024]
|
||||
NUM_TOKENS = [7, 2050] # Arbitrary values for testing
|
||||
SEEDS = [0]
|
||||
QUANT_TYPES = [
|
||||
# i-matrix
|
||||
GGMLQuantizationType.IQ1_M,
|
||||
GGMLQuantizationType.IQ1_S,
|
||||
GGMLQuantizationType.IQ2_S,
|
||||
GGMLQuantizationType.IQ2_XS,
|
||||
GGMLQuantizationType.IQ3_S,
|
||||
GGMLQuantizationType.IQ3_XXS,
|
||||
GGMLQuantizationType.IQ4_NL,
|
||||
GGMLQuantizationType.IQ4_XS,
|
||||
# k-quants
|
||||
GGMLQuantizationType.Q2_K,
|
||||
GGMLQuantizationType.Q3_K,
|
||||
GGMLQuantizationType.Q4_K,
|
||||
GGMLQuantizationType.Q5_K,
|
||||
GGMLQuantizationType.Q6_K,
|
||||
# standard quantization
|
||||
GGMLQuantizationType.Q4_0,
|
||||
GGMLQuantizationType.Q5_0,
|
||||
GGMLQuantizationType.Q8_0,
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("hidden_size", HIDDEN_SIZES)
|
||||
@pytest.mark.parametrize("dtype", DTYPES)
|
||||
@pytest.mark.parametrize("quant_type", QUANT_TYPES)
|
||||
@torch.inference_mode()
|
||||
def test_dequantize(
|
||||
hidden_size: int, dtype: torch.dtype, quant_type: GGMLQuantizationType
|
||||
):
|
||||
tensors = get_gguf_sample_tensors(hidden_size, quant_type)
|
||||
for tensor in tensors:
|
||||
shape_str = tensor.name.split("_")[-1]
|
||||
shape = map(int, shape_str.split("x"))
|
||||
|
||||
ref_output = torch.tensor(
|
||||
dequantize(tensor.data, quant_type), device="cuda"
|
||||
).to(dtype)
|
||||
output = ggml_dequantize(
|
||||
torch.tensor(tensor.data, device="cuda"), quant_type, *list(shape), dtype
|
||||
)
|
||||
|
||||
torch.testing.assert_close(output, ref_output, atol=1e-2, rtol=4e-2)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("hidden_size", HIDDEN_SIZES)
|
||||
@pytest.mark.parametrize("dtype", DTYPES)
|
||||
@pytest.mark.parametrize("quant_type", QUANT_TYPES)
|
||||
@torch.inference_mode()
|
||||
def test_mmvq(hidden_size: int, dtype: torch.dtype, quant_type: GGMLQuantizationType):
|
||||
|
||||
tensors = get_gguf_sample_tensors(hidden_size, quant_type)
|
||||
x = torch.rand((1, hidden_size), dtype=dtype, device="cuda")
|
||||
for tensor in tensors:
|
||||
weight = torch.tensor(dequantize(tensor.data, quant_type), device="cuda").to(
|
||||
dtype
|
||||
)
|
||||
ref_output = x @ weight.T
|
||||
|
||||
qweight = torch.tensor(tensor.data, device="cuda")
|
||||
output = ggml_mul_mat_vec_a8(qweight, x, quant_type, qweight.shape[0]).to(dtype)
|
||||
|
||||
torch.testing.assert_close(output, ref_output, atol=1, rtol=1e-1)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("num_tokens", NUM_TOKENS)
|
||||
@pytest.mark.parametrize("hidden_size", HIDDEN_SIZES)
|
||||
@pytest.mark.parametrize("dtype", DTYPES)
|
||||
@pytest.mark.parametrize(
|
||||
"quant_type",
|
||||
[
|
||||
# k-quants
|
||||
GGMLQuantizationType.Q2_K,
|
||||
GGMLQuantizationType.Q3_K,
|
||||
GGMLQuantizationType.Q4_K,
|
||||
GGMLQuantizationType.Q5_K,
|
||||
GGMLQuantizationType.Q6_K,
|
||||
# standard quants
|
||||
GGMLQuantizationType.Q4_0,
|
||||
GGMLQuantizationType.Q5_0,
|
||||
GGMLQuantizationType.Q8_0,
|
||||
],
|
||||
)
|
||||
@torch.inference_mode()
|
||||
def test_mmq(
|
||||
num_tokens: int,
|
||||
hidden_size: int,
|
||||
dtype: torch.dtype,
|
||||
quant_type: GGMLQuantizationType,
|
||||
):
|
||||
|
||||
tensors = get_gguf_sample_tensors(hidden_size, quant_type)
|
||||
x = torch.rand((num_tokens, hidden_size), dtype=dtype, device="cuda")
|
||||
for tensor in tensors:
|
||||
weight = torch.tensor(dequantize(tensor.data, quant_type), device="cuda").to(
|
||||
dtype
|
||||
)
|
||||
ref_output = x @ weight.T
|
||||
|
||||
qweight = torch.tensor(tensor.data, device="cuda")
|
||||
output = ggml_mul_mat_a8(qweight, x, quant_type, qweight.shape[0])
|
||||
atols = {torch.half: 1, torch.bfloat16: 1.5, torch.float: 1.2}
|
||||
# test matrix has inputs centered around 0 and lower precision from
|
||||
# bfloat16 tends to accumulate and can greatly inflate rtol
|
||||
# since outputs are also very close to 0
|
||||
rtols = {torch.half: 1e-1, torch.bfloat16: 1e4, torch.float: 2e1}
|
||||
torch.testing.assert_close(
|
||||
output, ref_output, atol=atols[dtype], rtol=rtols[dtype]
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -4,7 +4,14 @@ import pytest
|
||||
import torch
|
||||
import triton
|
||||
import triton.language as tl
|
||||
from sgl_kernel import moe_align_block_size
|
||||
from sgl_kernel import moe_align_block_size, moe_sum
|
||||
|
||||
|
||||
def is_hip() -> bool:
|
||||
return torch.version.hip is not None
|
||||
|
||||
|
||||
_is_hip = is_hip()
|
||||
|
||||
|
||||
def ceil_div(a, b):
|
||||
@@ -246,5 +253,20 @@ def test_moe_align_block_size_compare_implementations(
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("m", [1, 33, 64, 222])
|
||||
@pytest.mark.parametrize("topk", [2, 6])
|
||||
@pytest.mark.parametrize("k", [128, 511, 1024])
|
||||
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16, torch.bfloat16])
|
||||
@pytest.mark.skipif(_is_hip, reason="Skip for AMD GPU")
|
||||
def test_moe_sum(m: int, topk: int, k: int, dtype: torch.dtype):
|
||||
input = torch.randn((m, topk, k), device="cuda", dtype=dtype)
|
||||
actual = torch.empty((m, k), device="cuda", dtype=dtype)
|
||||
|
||||
expected = input.sum(dim=1)
|
||||
moe_sum(input, actual)
|
||||
|
||||
torch.testing.assert_close(actual, expected, atol=2e-2, rtol=0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
|
||||
Reference in New Issue
Block a user