[P/D] pd proxy support ipv6 (#4161)
### What this PR does / why we need it?
pd proxy support ipv6, mooncake connector check whether the IPv6 address
is used and notify the user.
- vLLM version: v0.11.0
- vLLM main:
2918c1b49c
---------
Signed-off-by: liziyu <liziyu16@huawei.com>
This commit is contained in:
@@ -84,7 +84,7 @@ Compile and install
|
|||||||
```shell
|
```shell
|
||||||
mkdir build
|
mkdir build
|
||||||
cd build
|
cd build
|
||||||
cmake .. -USE_ASCEND_DIRECT=ON
|
cmake .. -DUSE_ASCEND_DIRECT=ON
|
||||||
make -j
|
make -j
|
||||||
make install
|
make install
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -88,6 +88,7 @@ import argparse
|
|||||||
import asyncio
|
import asyncio
|
||||||
import functools
|
import functools
|
||||||
import heapq
|
import heapq
|
||||||
|
import ipaddress
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
@@ -115,7 +116,13 @@ class ServerState:
|
|||||||
def __init__(self, host, port):
|
def __init__(self, host, port):
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.url = f'http://{host}:{port}/v1'
|
ip = ipaddress.ip_address(self.host)
|
||||||
|
if isinstance(ip, ipaddress.IPv4Address):
|
||||||
|
self.url = f'http://{host}:{port}/v1'
|
||||||
|
elif isinstance(ip, ipaddress.IPv6Address):
|
||||||
|
self.url = f'http://[{host}]:{port}/v1'
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f"Invild host IP address {ip}")
|
||||||
self.client = httpx.AsyncClient(timeout=None,
|
self.client = httpx.AsyncClient(timeout=None,
|
||||||
base_url=self.url,
|
base_url=self.url,
|
||||||
limits=httpx.Limits(
|
limits=httpx.Limits(
|
||||||
@@ -356,6 +363,9 @@ async def send_request_to_service(client: httpx.AsyncClient,
|
|||||||
req_data = req_data.copy()
|
req_data = req_data.copy()
|
||||||
req_data["stream"] = False
|
req_data["stream"] = False
|
||||||
req_data["max_tokens"] = 1
|
req_data["max_tokens"] = 1
|
||||||
|
req_data["min_tokens"] = 1
|
||||||
|
if "max_completion_tokens" in req_data:
|
||||||
|
req_data["max_completion_tokens"] = 1
|
||||||
if "stream_options" in req_data:
|
if "stream_options" in req_data:
|
||||||
del req_data["stream_options"]
|
del req_data["stream_options"]
|
||||||
headers = {
|
headers = {
|
||||||
|
|||||||
@@ -88,6 +88,7 @@ import argparse
|
|||||||
import asyncio
|
import asyncio
|
||||||
import functools
|
import functools
|
||||||
import heapq
|
import heapq
|
||||||
|
import ipaddress
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
@@ -117,7 +118,13 @@ class ServerState:
|
|||||||
def __init__(self, host, port):
|
def __init__(self, host, port):
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.url = f'http://{host}:{port}/v1'
|
ip = ipaddress.ip_address(self.host)
|
||||||
|
if isinstance(ip, ipaddress.IPv4Address):
|
||||||
|
self.url = f'http://{host}:{port}/v1'
|
||||||
|
elif isinstance(ip, ipaddress.IPv6Address):
|
||||||
|
self.url = f'http://[{host}]:{port}/v1'
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f"Invild host IP address {ip}")
|
||||||
self.client = httpx.AsyncClient(timeout=None,
|
self.client = httpx.AsyncClient(timeout=None,
|
||||||
base_url=self.url,
|
base_url=self.url,
|
||||||
limits=httpx.Limits(
|
limits=httpx.Limits(
|
||||||
@@ -366,6 +373,8 @@ async def send_request_to_service(client: httpx.AsyncClient,
|
|||||||
req_data["stream"] = False
|
req_data["stream"] = False
|
||||||
req_data["max_tokens"] = 1
|
req_data["max_tokens"] = 1
|
||||||
req_data["min_tokens"] = 1
|
req_data["min_tokens"] = 1
|
||||||
|
if "max_completion_tokens" in req_data:
|
||||||
|
req_data["max_completion_tokens"] = 1
|
||||||
if "stream_options" in req_data:
|
if "stream_options" in req_data:
|
||||||
del req_data["stream_options"]
|
del req_data["stream_options"]
|
||||||
headers = {
|
headers = {
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import ipaddress
|
||||||
import threading
|
import threading
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -8,6 +9,15 @@ _global_te_lock = threading.Lock()
|
|||||||
|
|
||||||
|
|
||||||
def get_global_te(hostname: str, device_name: Optional[str]):
|
def get_global_te(hostname: str, device_name: Optional[str]):
|
||||||
|
try:
|
||||||
|
ip = ipaddress.ip_address(hostname)
|
||||||
|
if isinstance(ip, ipaddress.IPv6Address):
|
||||||
|
raise RuntimeError(
|
||||||
|
"The backend if mooncake's Ascend Direct Xfer Library currcenly dose not support IPv6."
|
||||||
|
)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
global _global_te
|
global _global_te
|
||||||
if _global_te is None:
|
if _global_te is None:
|
||||||
with _global_te_lock:
|
with _global_te_lock:
|
||||||
|
|||||||
@@ -165,11 +165,6 @@ env_variables: Dict[str, Callable[[], Any]] = {
|
|||||||
# Whether to enable msMonitor tool to monitor the performance of vllm-ascend.
|
# Whether to enable msMonitor tool to monitor the performance of vllm-ascend.
|
||||||
"MSMONITOR_USE_DAEMON":
|
"MSMONITOR_USE_DAEMON":
|
||||||
lambda: bool(int(os.getenv("MSMONITOR_USE_DAEMON", '0'))),
|
lambda: bool(int(os.getenv("MSMONITOR_USE_DAEMON", '0'))),
|
||||||
# Timeout (in seconds) for delayed KVCache block release. In the prefill
|
|
||||||
# node, if a request is marked for delayed KV block release and the blocks
|
|
||||||
# are not freed within this timeout, they will be forcibly released.
|
|
||||||
"VLLM_ASCEND_KVCACHE_DELAY_FREE_TIMEOUT":
|
|
||||||
lambda: int(os.getenv("VLLM_ASCEND_KVCACHE_DELAY_FREE_TIMEOUT", 250)),
|
|
||||||
"VLLM_ASCEND_ENABLE_MLAPO":
|
"VLLM_ASCEND_ENABLE_MLAPO":
|
||||||
lambda: bool(int(os.getenv("VLLM_ASCEND_ENABLE_MLAPO", '0'))),
|
lambda: bool(int(os.getenv("VLLM_ASCEND_ENABLE_MLAPO", '0'))),
|
||||||
# Whether to enable transpose weight and cast format to FRACTAL_NZ.
|
# Whether to enable transpose weight and cast format to FRACTAL_NZ.
|
||||||
|
|||||||
Reference in New Issue
Block a user