421 lines
17 KiB
Python
421 lines
17 KiB
Python
################################################################################
|
|
# Copyright(c)2020-2025 Shanghai Biren Technology Co., Ltd. All rights reserved.
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
################################################################################
|
|
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
|
|
# This file is a pure Python wrapper for the SCCL library.
|
|
# The main purpose is to use SCCL combined with CUDA graph.
|
|
# Before writing this script, we tried the following approach:
|
|
# 1. We tried to use `cupy`, it calls SCCL correctly, but `cupy` itself
|
|
# often gets stuck when initializing the SCCL communicator.
|
|
# 2. We tried to use `torch.distributed`, but `torch.distributed.all_reduce`
|
|
# contains many other potential cuda APIs, that are not allowed during
|
|
# capturing the CUDA graph. For further details, please check
|
|
# https://discuss.pytorch.org/t/pytorch-cudagraph-with-nccl-operation-failed/ .
|
|
#
|
|
# Another rejected idea is to write a C/C++ binding for SCCL. It is usually
|
|
# doable, but we often encounter issues related with succl versions, and need
|
|
# to switch between different versions of SCCL. See
|
|
# https://github.com/NVIDIA/nccl/issues/1234 for more details.
|
|
# A C/C++ binding is not flexible enough to handle this. It requires
|
|
# recompilation of the code every time we want to switch between different
|
|
# versions. This current implementation, with a **pure** Python wrapper, is
|
|
# more flexible. We can easily switch between different versions of SCCL by
|
|
# changing the environment variable `VLLM_SCCL_SO_PATH`, or the `so_file`
|
|
# variable in the code.
|
|
|
|
import ctypes
|
|
import platform
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional
|
|
|
|
import torch
|
|
from torch.distributed import ReduceOp
|
|
|
|
from vllm.logger import logger
|
|
from vllm_br import envs
|
|
|
|
# === export types and functions from nccl to Python ===
|
|
# for the original nccl definition, please check
|
|
# https://github.com/NVIDIA/nccl/blob/master/src/nccl.h.in
|
|
|
|
succlResult_t = ctypes.c_int
|
|
succlComm_t = ctypes.c_void_p
|
|
|
|
|
|
class succlUniqueId(ctypes.Structure):
|
|
_fields_ = [("internal", ctypes.c_byte * 128)]
|
|
|
|
|
|
suStream_t = ctypes.c_void_p
|
|
buffer_type = ctypes.c_void_p
|
|
|
|
succlDataType_t = ctypes.c_int
|
|
|
|
|
|
class succlDataTypeEnum:
|
|
succlInt8 = 0
|
|
succlChar = 0
|
|
succlUint8 = 1
|
|
succlInt16 = 2
|
|
succlUint16 = 3
|
|
succlInt32 = 4
|
|
succlInt = 4
|
|
succlUint32 = 5
|
|
succlInt64 = 6
|
|
succlUint64 = 7
|
|
succlBfloat16 = 8
|
|
succlFloat32 = 9
|
|
succlFloat = 9
|
|
succlFloat64 = 10
|
|
succlDouble = 10
|
|
succlNumTypes = 11
|
|
|
|
@classmethod
|
|
def from_torch(cls, dtype: torch.dtype) -> int:
|
|
if dtype == torch.int8:
|
|
return cls.succlInt8
|
|
if dtype == torch.uint8:
|
|
return cls.succlUint8
|
|
if dtype == torch.int32:
|
|
return cls.succlInt32
|
|
if dtype == torch.int64:
|
|
return cls.succlInt64
|
|
if dtype == torch.float16:
|
|
return cls.succlBfloat16
|
|
if dtype == torch.float32:
|
|
return cls.succlFloat32
|
|
if dtype == torch.float64:
|
|
return cls.succlFloat64
|
|
if dtype == torch.bfloat16:
|
|
return cls.succlBfloat16
|
|
raise ValueError(f"Unsupported dtype: {dtype}")
|
|
|
|
|
|
succlRedOp_t = ctypes.c_int
|
|
|
|
|
|
class succlRedOpTypeEnum:
|
|
succlSum = 0
|
|
succlProd = 1
|
|
succlMax = 2
|
|
succlMin = 3
|
|
succlAvg = 4
|
|
succlNumOps = 5
|
|
|
|
@classmethod
|
|
def from_torch(cls, op: ReduceOp) -> int:
|
|
if op == ReduceOp.SUM:
|
|
return cls.succlSum
|
|
if op == ReduceOp.PRODUCT:
|
|
return cls.succlProd
|
|
if op == ReduceOp.MAX:
|
|
return cls.succlMax
|
|
if op == ReduceOp.MIN:
|
|
return cls.succlMin
|
|
if op == ReduceOp.AVG:
|
|
return cls.succlAvg
|
|
raise ValueError(f"Unsupported op: {op}")
|
|
|
|
|
|
@dataclass
|
|
class Function:
|
|
name: str
|
|
restype: Any
|
|
argtypes: list[Any]
|
|
|
|
|
|
class SCCLLibrary:
|
|
exported_functions = [
|
|
# const char* succlGetErrorString(succlResult_t result)
|
|
Function("succlGetErrorString", ctypes.c_char_p, [succlResult_t]),
|
|
# succlResult_t succlGetVersion(int *version);
|
|
Function("succlGetVersion", succlResult_t,
|
|
[ctypes.POINTER(ctypes.c_int)]),
|
|
# succlResult_t succlGetUniqueId(succlUniqueId* uniqueId);
|
|
Function("succlGetUniqueId", succlResult_t,
|
|
[ctypes.POINTER(succlUniqueId)]),
|
|
# succlResult_t succlCommInitRank(
|
|
# succlComm_t* comm, int nranks, succlUniqueId commId, int rank);
|
|
# note that succlComm_t is a pointer type, so the first argument
|
|
# is a pointer to a pointer
|
|
Function("succlCommInitRank", succlResult_t, [
|
|
ctypes.POINTER(succlComm_t), ctypes.c_int, succlUniqueId,
|
|
ctypes.c_int, ctypes.c_void_p
|
|
]),
|
|
# succlResult_t succlAllReduce(
|
|
# const void* sendbuff, void* recvbuff, size_t count,
|
|
# succlDataType_t datatype, succlRedOp_t op, succlComm_t comm,
|
|
# suStream_t stream);
|
|
# note that suStream_t is a pointer type, so the last argument
|
|
# is a pointer
|
|
Function("succlAllReduce", succlResult_t, [
|
|
buffer_type, buffer_type, ctypes.c_size_t, succlDataType_t,
|
|
succlRedOp_t, succlComm_t, suStream_t, ctypes.c_void_p
|
|
]),
|
|
|
|
# succlResult_t succlReduce(
|
|
# const void* sendbuff, void* recvbuff, size_t count,
|
|
# succlDataType_t datatype, succlRedOp_t op, int root,
|
|
# succlComm_t comm, suStream_t stream);
|
|
# note that suStream_t is a pointer type, so the last argument
|
|
# is a pointer
|
|
Function("succlReduce", succlResult_t, [
|
|
buffer_type, buffer_type, ctypes.c_size_t, succlDataType_t,
|
|
succlRedOp_t, ctypes.c_int, succlComm_t, suStream_t,
|
|
ctypes.c_void_p
|
|
]),
|
|
|
|
# succlResult_t succlAllGather(
|
|
# const void* sendbuff, void* recvbuff, size_t count,
|
|
# succlDataType_t datatype, succlComm_t comm,
|
|
# suStream_t stream);
|
|
# note that suStream_t is a pointer type, so the last argument
|
|
# is a pointer
|
|
Function("succlAllGather", succlResult_t, [
|
|
buffer_type, buffer_type, ctypes.c_size_t, succlDataType_t,
|
|
succlComm_t, suStream_t, ctypes.c_void_p
|
|
]),
|
|
|
|
# succlResult_t succlReduceScatter(
|
|
# const void* sendbuff, void* recvbuff, size_t count,
|
|
# succlDataType_t datatype, succlRedOp_t op, succlComm_t comm,
|
|
# suStream_t stream);
|
|
# note that suStream_t is a pointer type, so the last argument
|
|
# is a pointer
|
|
Function("succlReduceScatter", succlResult_t, [
|
|
buffer_type, buffer_type, ctypes.c_size_t, succlDataType_t,
|
|
succlRedOp_t, succlComm_t, suStream_t, ctypes.c_void_p
|
|
]),
|
|
|
|
# succlResult_t succlSend(
|
|
# const void* sendbuff, size_t count, succlDataType_t datatype,
|
|
# int dest, succlComm_t comm, suStream_t stream);
|
|
Function("succlSend", succlResult_t, [
|
|
buffer_type, ctypes.c_size_t, succlDataType_t, ctypes.c_int,
|
|
succlComm_t, suStream_t, ctypes.c_void_p
|
|
]),
|
|
|
|
# succlResult_t succlRecv(
|
|
# void* recvbuff, size_t count, succlDataType_t datatype,
|
|
# int src, succlComm_t comm, suStream_t stream);
|
|
Function("succlRecv", succlResult_t, [
|
|
buffer_type, ctypes.c_size_t, succlDataType_t, ctypes.c_int,
|
|
succlComm_t, suStream_t, ctypes.c_void_p
|
|
]),
|
|
|
|
# succlResult_t succlBroadcast(
|
|
# const void* sendbuff, void* recvbuff, size_t count,
|
|
# succlDataType_t datatype, int root, succlComm_t comm,
|
|
# suStream_t stream);
|
|
Function("succlBroadcast", succlResult_t, [
|
|
buffer_type, buffer_type, ctypes.c_size_t, succlDataType_t,
|
|
ctypes.c_int, succlComm_t, suStream_t, ctypes.c_void_p
|
|
]),
|
|
|
|
# be cautious! this is a collective call, it will block until all
|
|
# processes in the communicator have called this function.
|
|
# because Python object destruction can happen in random order,
|
|
# it is better not to call it at all.
|
|
# succlResult_t succlCommDestroy(succlComm_t comm);
|
|
Function("succlCommDestroy", succlResult_t, [succlComm_t]),
|
|
# succlResult_t succlGroupStart();
|
|
Function("succlGroupStart", succlResult_t, []),
|
|
# succlResult_t succlGroupEnd();
|
|
Function("succlGroupEnd", succlResult_t, []),
|
|
# Function("succldemoSetdevice", succlResult_t, [ctypes.c_int]),
|
|
]
|
|
|
|
# class attribute to store the mapping from the path to the library
|
|
# to avoid loading the same library multiple times
|
|
path_to_library_cache: dict[str, Any] = {}
|
|
|
|
# class attribute to store the mapping from library path
|
|
# to the corresponding dictionary
|
|
path_to_dict_mapping: dict[str, dict[str, Any]] = {}
|
|
|
|
def __init__(self, so_file: Optional[str] = None):
|
|
|
|
so_file = so_file or find_sccl_library()
|
|
try:
|
|
if so_file not in SCCLLibrary.path_to_dict_mapping:
|
|
lib = ctypes.CDLL(so_file)
|
|
SCCLLibrary.path_to_library_cache[so_file] = lib
|
|
self.lib = SCCLLibrary.path_to_library_cache[so_file]
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to load SCCL library from %s. "
|
|
"It is expected if you are not running on NVIDIA/AMD GPUs."
|
|
"Otherwise, the sccl library might not exist, be corrupted "
|
|
"or it does not support the current platform %s. "
|
|
"If you already have the library, please set the "
|
|
"environment variable VLLM_SCCL_SO_PATH"
|
|
" to point to the correct sccl library path.", so_file,
|
|
platform.platform())
|
|
raise e
|
|
|
|
if so_file not in SCCLLibrary.path_to_dict_mapping:
|
|
_funcs: dict[str, Any] = {}
|
|
for func in SCCLLibrary.exported_functions:
|
|
f = getattr(self.lib, func.name)
|
|
f.restype = func.restype
|
|
f.argtypes = func.argtypes
|
|
_funcs[func.name] = f
|
|
SCCLLibrary.path_to_dict_mapping[so_file] = _funcs
|
|
self._funcs = SCCLLibrary.path_to_dict_mapping[so_file]
|
|
|
|
def succlGetErrorString(self, result: succlResult_t) -> str:
|
|
return self._funcs["succlGetErrorString"](result).decode("utf-8")
|
|
|
|
def SUCCL_CHECK(self, result: succlResult_t) -> None:
|
|
if result != 0:
|
|
error_str = self.succlGetErrorString(result)
|
|
raise RuntimeError(f"SCCL error: {error_str}")
|
|
|
|
def succlGetVersion(self) -> str:
|
|
version = ctypes.c_int()
|
|
self.SUCCL_CHECK(self._funcs["succlGetVersion"](ctypes.byref(version)))
|
|
version_str = str(version.value)
|
|
# something like 21903 --> "2.19.3"
|
|
major = version_str[0].lstrip("0")
|
|
minor = version_str[1:3].lstrip("0")
|
|
patch = version_str[3:].lstrip("0")
|
|
return f"{major}.{minor}.{patch}"
|
|
|
|
def succlGetUniqueId(self) -> succlUniqueId:
|
|
unique_id = succlUniqueId()
|
|
self.SUCCL_CHECK(self._funcs["succlGetUniqueId"](
|
|
ctypes.byref(unique_id)))
|
|
return unique_id
|
|
|
|
def unique_id_from_bytes(self, data: bytes) -> succlUniqueId:
|
|
if len(data) != 128:
|
|
raise ValueError(
|
|
f"Expected 128 bytes for succlUniqueId, got {len(data)} bytes")
|
|
unique_id = succlUniqueId()
|
|
ctypes.memmove(ctypes.addressof(unique_id.internal), data, 128)
|
|
return unique_id
|
|
|
|
def succlCommInitRank(self, world_size: int, unique_id: succlUniqueId,
|
|
rank: int) -> succlComm_t:
|
|
comm = succlComm_t()
|
|
result = self._funcs["succlCommInitRank"](ctypes.byref(comm),
|
|
world_size, unique_id, rank,
|
|
None)
|
|
self.SUCCL_CHECK(result)
|
|
return comm
|
|
|
|
# def succldemoSetdevice(self, deviceid:int):
|
|
# self._funcs["succldemoSetdevice"](deviceid)
|
|
|
|
def succlAllReduce(self, sendbuff: buffer_type, recvbuff: buffer_type,
|
|
count: int, datatype: int, op: int, comm: succlComm_t,
|
|
stream: suStream_t) -> None:
|
|
# `datatype` actually should be `succlDataType_t`
|
|
# and `op` should be `succlRedOp_t`
|
|
# both are aliases of `ctypes.c_int`
|
|
# when we pass int to a function, it will be converted to `ctypes.c_int`
|
|
# by ctypes automatically
|
|
self.SUCCL_CHECK(self._funcs["succlAllReduce"](sendbuff, recvbuff,
|
|
count, datatype, op,
|
|
comm, stream, None))
|
|
|
|
def succlReduce(self, sendbuff: buffer_type, recvbuff: buffer_type,
|
|
count: int, datatype: int, op: int, root: int,
|
|
comm: succlComm_t, stream: suStream_t) -> None:
|
|
# `datatype` actually should be `succlDataType_t`
|
|
# and `op` should be `succlRedOp_t`
|
|
# both are aliases of `ctypes.c_int`
|
|
# when we pass int to a function, it will be converted to `ctypes.c_int`
|
|
# by ctypes automatically
|
|
self.SUCCL_CHECK(self._funcs["succlReduce"](sendbuff, recvbuff, count,
|
|
datatype, op, root, comm,
|
|
stream, None))
|
|
|
|
def succlReduceScatter(self, sendbuff: buffer_type, recvbuff: buffer_type,
|
|
count: int, datatype: int, op: int,
|
|
comm: succlComm_t, stream: suStream_t) -> None:
|
|
# `datatype` actually should be `succlDataType_t`
|
|
# and `op` should be `succlRedOp_t`
|
|
# both are aliases of `ctypes.c_int`
|
|
# when we pass int to a function, it will be converted to `ctypes.c_int`
|
|
# by ctypes automatically
|
|
self.SUCCL_CHECK(self._funcs["succlReduceScatter"](sendbuff, recvbuff,
|
|
count, datatype, op,
|
|
comm, stream, None))
|
|
|
|
def succlAllGather(self, sendbuff: buffer_type, recvbuff: buffer_type,
|
|
count: int, datatype: int, comm: succlComm_t,
|
|
stream: suStream_t) -> None:
|
|
# `datatype` actually should be `succlDataType_t`
|
|
# which is an aliases of `ctypes.c_int`
|
|
# when we pass int to a function, it will be converted to `ctypes.c_int`
|
|
# by ctypes automatically
|
|
self.SUCCL_CHECK(self._funcs["succlAllGather"](sendbuff, recvbuff,
|
|
count, datatype, comm,
|
|
stream, None))
|
|
|
|
def succlSend(self, sendbuff: buffer_type, count: int, datatype: int,
|
|
dest: int, comm: succlComm_t, stream: suStream_t) -> None:
|
|
self.SUCCL_CHECK(self._funcs["succlSend"](sendbuff, count, datatype,
|
|
dest, comm, stream, None))
|
|
|
|
def succlRecv(self, recvbuff: buffer_type, count: int, datatype: int,
|
|
src: int, comm: succlComm_t, stream: suStream_t) -> None:
|
|
self.SUCCL_CHECK(self._funcs["succlRecv"](recvbuff, count, datatype,
|
|
src, comm, stream, None))
|
|
|
|
def succlBroadcast(self, sendbuff: buffer_type, recvbuff: buffer_type,
|
|
count: int, datatype: int, root: int, comm: succlComm_t,
|
|
stream: suStream_t) -> None:
|
|
self.SUCCL_CHECK(self._funcs["succlBroadcast"](sendbuff, recvbuff,
|
|
count, datatype, root,
|
|
comm, stream, None))
|
|
|
|
def succlCommDestroy(self, comm: succlComm_t) -> None:
|
|
self.SUCCL_CHECK(self._funcs["succlCommDestroy"](comm))
|
|
|
|
def succlGroupStart(self) -> None:
|
|
self.SUCCL_CHECK(self._funcs["succlGroupStart"]())
|
|
|
|
def succlGroupEnd(self) -> None:
|
|
self.SUCCL_CHECK(self._funcs["succlGroupEnd"]())
|
|
|
|
|
|
def find_sccl_library() -> str:
|
|
"""
|
|
We either use the library file specified by the `VLLM_SCCL_SO_PATH`
|
|
environment variable, or we find the library file brought by PyTorch.
|
|
After importing `torch`, `libsuccl.so.2` or `librccl.so.1` can be
|
|
found by `ctypes` automatically.
|
|
"""
|
|
so_file = envs.VLLM_SCCL_SO_PATH
|
|
# manually load the sccl library
|
|
if so_file:
|
|
logger.info(
|
|
"Found sccl from environment variable VLLM_SCCL_SO_PATH=%s",
|
|
so_file)
|
|
else:
|
|
raise ValueError("SCCL lib file not found.")
|
|
return so_file
|
|
|
|
|
|
__all__ = [
|
|
"SCCLLibrary", "succlDataTypeEnum", "succlRedOpTypeEnum", "succlUniqueId",
|
|
"succlComm_t", "suStream_t", "buffer_type"
|
|
]
|