[Hicache]: Add E2E CI For 3FS-KVStore (#10131)

This commit is contained in:
hzh0425
2025-09-08 16:54:50 +08:00
committed by GitHub
parent 78f139812a
commit ec99668ab7
7 changed files with 417 additions and 214 deletions

View File

@@ -0,0 +1,164 @@
import logging
import os
import threading
from abc import ABC, abstractmethod
from typing import List
import torch
class Hf3fsClient(ABC):
"""Abstract interface for HF3FS clients."""
@abstractmethod
def __init__(self, path: str, size: int, bytes_per_page: int, entries: int):
"""Initialize the HF3FS client.
Args:
path: File path for storage
size: Total size of storage file
bytes_per_page: Bytes per page
entries: Number of entries for batch operations
"""
pass
@abstractmethod
def batch_read(self, offsets: List[int], tensors: List[torch.Tensor]) -> List[int]:
"""Batch read from storage."""
pass
@abstractmethod
def batch_write(self, offsets: List[int], tensors: List[torch.Tensor]) -> List[int]:
"""Batch write to storage."""
pass
@abstractmethod
def check(self, offsets: List[int], tensors: List[torch.Tensor]) -> None:
"""Validate batch operation parameters."""
pass
@abstractmethod
def get_size(self) -> int:
"""Get total storage size."""
pass
@abstractmethod
def close(self) -> None:
"""Close the client and cleanup resources."""
pass
@abstractmethod
def flush(self) -> None:
"""Flush data to disk."""
pass
logger = logging.getLogger(__name__)
class Hf3fsMockClient(Hf3fsClient):
"""Mock implementation of Hf3fsClient for CI testing purposes."""
def __init__(self, path: str, size: int, bytes_per_page: int, entries: int):
"""Initialize mock HF3FS client."""
self.path = path
self.size = size
self.bytes_per_page = bytes_per_page
self.entries = entries
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.path), exist_ok=True)
# Create and initialize the file
self.file = os.open(self.path, os.O_RDWR | os.O_CREAT)
os.ftruncate(self.file, size)
logger.info(
f"Hf3fsMockClient initialized: path={path}, size={size}, "
f"bytes_per_page={bytes_per_page}, entries={entries}"
)
def batch_read(self, offsets: List[int], tensors: List[torch.Tensor]) -> List[int]:
"""Batch read from mock storage."""
self.check(offsets, tensors)
results = []
for offset, tensor in zip(offsets, tensors):
size = tensor.numel() * tensor.itemsize
try:
os.lseek(self.file, offset, os.SEEK_SET)
bytes_read = os.read(self.file, size)
if len(bytes_read) == size:
# Convert bytes to tensor and copy to target
bytes_tensor = torch.frombuffer(bytes_read, dtype=torch.uint8)
typed_tensor = bytes_tensor.view(tensor.dtype).view(tensor.shape)
tensor.copy_(typed_tensor)
results.append(size)
else:
logger.warning(
f"Short read: expected {size}, got {len(bytes_read)}"
)
results.append(len(bytes_read))
except Exception as e:
logger.error(f"Error reading from offset {offset}: {e}")
results.append(0)
return results
def batch_write(self, offsets: List[int], tensors: List[torch.Tensor]) -> List[int]:
"""Batch write to mock storage."""
self.check(offsets, tensors)
results = []
for offset, tensor in zip(offsets, tensors):
size = tensor.numel() * tensor.itemsize
try:
# Convert tensor to bytes and write directly to file
tensor_bytes = tensor.contiguous().view(torch.uint8).flatten()
data = tensor_bytes.numpy().tobytes()
os.lseek(self.file, offset, os.SEEK_SET)
bytes_written = os.write(self.file, data)
if bytes_written == size:
results.append(size)
else:
logger.warning(f"Short write: expected {size}, got {bytes_written}")
results.append(bytes_written)
except Exception as e:
logger.error(f"Error writing to offset {offset}: {e}")
results.append(0)
return results
def check(self, offsets: List[int], tensors: List[torch.Tensor]) -> None:
"""Validate batch operation parameters."""
pass
def get_size(self) -> int:
"""Get total storage size."""
return self.size
def close(self) -> None:
"""Close the mock client and cleanup resources."""
try:
if hasattr(self, "file") and self.file >= 0:
os.close(self.file)
self.file = -1 # Mark as closed
logger.info(f"MockHf3fsClient closed: {self.path}")
except Exception as e:
logger.error(f"Error closing MockHf3fsClient: {e}")
def flush(self) -> None:
"""Flush data to disk."""
try:
os.fsync(self.file)
except Exception as e:
logger.error(f"Error flushing MockHf3fsClient: {e}")

View File

@@ -9,6 +9,8 @@ from typing import List
import torch
from torch.utils.cpp_extension import load
from sglang.srt.mem_cache.storage.hf3fs.hf3fs_client import Hf3fsClient
root = Path(__file__).parent.resolve()
hf3fs_utils = load(name="hf3fs_utils", sources=[f"{root}/hf3fs_utils.cpp"])
@@ -51,7 +53,9 @@ def wsynchronized():
return _decorator
class Hf3fsClient:
class Hf3fsUsrBioClient(Hf3fsClient):
"""HF3FS client implementation using usrbio."""
def __init__(self, path: str, size: int, bytes_per_page: int, entries: int):
if not HF3FS_AVAILABLE:
raise ImportError(

View File

@@ -13,7 +13,7 @@ from typing import Any, List, Optional, Tuple
import torch
from sglang.srt.mem_cache.hicache_storage import HiCacheStorage, HiCacheStorageConfig
from sglang.srt.mem_cache.storage.hf3fs.client_hf3fs import Hf3fsClient
from sglang.srt.mem_cache.storage.hf3fs.hf3fs_client import Hf3fsClient
from sglang.srt.metrics.collector import StorageMetrics
logger = logging.getLogger(__name__)
@@ -114,6 +114,33 @@ def synchronized():
return _decorator
def create_hf3fs_client(
path: str, size: int, bytes_per_page: int, entries: int, use_mock: bool = False
) -> Hf3fsClient:
"""Factory function to create appropriate HF3FS client.
Args:
path: File path for storage
size: Total size of storage file
bytes_per_page: Bytes per page
entries: Number of entries for batch operations
use_mock: Whether to use mock client instead of real usrbio client
Returns:
"""
if use_mock:
from sglang.srt.mem_cache.storage.hf3fs.hf3fs_client import Hf3fsMockClient
logger.info(f"[Rank Using Hf3fsMockClient for testing")
return Hf3fsMockClient(path, size, bytes_per_page, entries)
else:
from sglang.srt.mem_cache.storage.hf3fs.hf3fs_usrbio_client import (
Hf3fsUsrBioClient,
)
return Hf3fsUsrBioClient(path, size, bytes_per_page, entries)
class HiCacheHF3FS(HiCacheStorage):
"""HiCache backend that stores KV cache pages in HF3FS files."""
@@ -131,6 +158,7 @@ class HiCacheHF3FS(HiCacheStorage):
metadata_client: Hf3fsMetadataInterface,
is_mla_model: bool = False,
is_page_first_layout: bool = False,
use_mock_client: bool = False,
):
self.rank = rank
self.file_path = file_path
@@ -159,8 +187,12 @@ class HiCacheHF3FS(HiCacheStorage):
self.ac = AtomicCounter(self.numjobs)
self.clients = [
Hf3fsClient(
self.file_path, self.file_size, self.bytes_per_page, self.entries
create_hf3fs_client(
self.file_path,
self.file_size,
self.bytes_per_page,
self.entries,
use_mock_client,
)
for _ in range(numjobs)
]
@@ -202,14 +234,24 @@ class HiCacheHF3FS(HiCacheStorage):
Hf3fsLocalMetadataClient,
)
use_mock_client = False
if storage_config is not None:
rank, is_mla_model, is_page_first_layout = (
storage_config.tp_rank,
storage_config.is_mla_model,
storage_config.is_page_first_layout,
)
if storage_config.extra_config is not None:
use_mock_client = storage_config.extra_config.get(
"use_mock_hf3fs_client", False
)
else:
rank, is_mla_model, is_page_first_layout = 0, False, False
rank, is_mla_model, is_page_first_layout = (
0,
False,
False,
)
mla_unsupported_msg = f"MLA model is not supported without global metadata server, please refer to https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/mem_cache/storage/hf3fs/docs/deploy_sglang_3fs_multinode.md"
@@ -228,6 +270,7 @@ class HiCacheHF3FS(HiCacheStorage):
dtype=dtype,
metadata_client=Hf3fsLocalMetadataClient(),
is_page_first_layout=is_page_first_layout,
use_mock_client=use_mock_client,
)
try:
@@ -277,6 +320,7 @@ class HiCacheHF3FS(HiCacheStorage):
metadata_client=metadata_client,
is_mla_model=is_mla_model,
is_page_first_layout=is_page_first_layout,
use_mock_client=use_mock_client,
)
def get(