[Misc] add service discovery for sgl router

This commit is contained in:
Simo Lin
2025-04-29 10:21:19 -07:00
committed by GitHub
parent 91dda4cd06
commit 1468769bde
11 changed files with 1242 additions and 45 deletions

View File

@@ -2,7 +2,7 @@ import argparse
import dataclasses
import logging
import sys
from typing import List, Optional
from typing import Dict, List, Optional
from sglang_router import Router
from sglang_router_rs import PolicyType
@@ -43,6 +43,11 @@ class RouterArgs:
max_payload_size: int = 4 * 1024 * 1024 # 4MB
verbose: bool = False
log_dir: Optional[str] = None
# Service discovery configuration
service_discovery: bool = False
selector: Dict[str, str] = dataclasses.field(default_factory=dict)
service_discovery_port: int = 80
service_discovery_namespace: Optional[str] = None
@staticmethod
def add_cli_args(
@@ -149,6 +154,28 @@ class RouterArgs:
default=None,
help="Directory to store log files. If not specified, logs are only output to console.",
)
parser.add_argument(
f"--{prefix}service-discovery",
action="store_true",
help="Enable Kubernetes service discovery",
)
parser.add_argument(
f"--{prefix}selector",
type=str,
nargs="+",
help="Label selector for Kubernetes service discovery (format: key1=value1 key2=value2)",
)
parser.add_argument(
f"--{prefix}service-discovery-port",
type=int,
default=RouterArgs.service_discovery_port,
help="Port to use for discovered worker pods",
)
parser.add_argument(
f"--{prefix}service-discovery-namespace",
type=str,
help="Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions)",
)
@classmethod
def from_cli_args(
@@ -182,8 +209,26 @@ class RouterArgs:
max_payload_size=getattr(args, f"{prefix}max_payload_size"),
verbose=getattr(args, f"{prefix}verbose", False),
log_dir=getattr(args, f"{prefix}log_dir", None),
service_discovery=getattr(args, f"{prefix}service_discovery", False),
selector=cls._parse_selector(getattr(args, f"{prefix}selector", None)),
service_discovery_port=getattr(args, f"{prefix}service_discovery_port"),
service_discovery_namespace=getattr(
args, f"{prefix}service_discovery_namespace", None
),
)
@staticmethod
def _parse_selector(selector_list):
if not selector_list:
return {}
selector = {}
for item in selector_list:
if "=" in item:
key, value = item.split("=", 1)
selector[key] = value
return selector
def policy_from_str(policy_str: str) -> PolicyType:
"""Convert policy string to PolicyType enum."""
@@ -229,6 +274,10 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]:
max_payload_size=router_args.max_payload_size,
verbose=router_args.verbose,
log_dir=router_args.log_dir,
service_discovery=router_args.service_discovery,
selector=router_args.selector,
service_discovery_port=router_args.service_discovery_port,
service_discovery_namespace=router_args.service_discovery_namespace,
)
router.start()

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Dict, List, Optional
from sglang_router_rs import PolicyType
from sglang_router_rs import Router as _Router
@@ -32,6 +32,14 @@ class Router:
max_tree_size: Maximum size of the approximation tree for cache-aware routing. Default: 2^24
verbose: Enable verbose logging. Default: False
log_dir: Directory to store log files. If None, logs are only output to console. Default: None
service_discovery: Enable Kubernetes service discovery. When enabled, the router will
automatically discover worker pods based on the selector. Default: False
selector: Dictionary mapping of label keys to values for Kubernetes pod selection.
Example: {"app": "sglang-worker"}. Default: {}
service_discovery_port: Port to use for service discovery. The router will generate
worker URLs using this port. Default: 80
service_discovery_namespace: Kubernetes namespace to watch for pods. If not provided,
watches pods across all namespaces (requires cluster-wide permissions). Default: None
"""
def __init__(
@@ -50,7 +58,14 @@ class Router:
max_payload_size: int = 4 * 1024 * 1024, # 4MB
verbose: bool = False,
log_dir: Optional[str] = None,
service_discovery: bool = False,
selector: Dict[str, str] = None,
service_discovery_port: int = 80,
service_discovery_namespace: Optional[str] = None,
):
if selector is None:
selector = {}
self._router = _Router(
worker_urls=worker_urls,
policy=policy,
@@ -66,6 +81,10 @@ class Router:
max_payload_size=max_payload_size,
verbose=verbose,
log_dir=log_dir,
service_discovery=service_discovery,
selector=selector,
service_discovery_port=service_discovery_port,
service_discovery_namespace=service_discovery_namespace,
)
def start(self) -> None: