[Docs]: Fix Multi-User Port Allocation Conflicts (#3601)
Co-authored-by: zhaochenyang20 <zhaochen20@outlook.com> Co-authored-by: simveit <simp.veitner@gmail.com>
This commit is contained in:
@@ -5,12 +5,15 @@ import importlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import urllib.request
|
||||
import weakref
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from io import BytesIO
|
||||
from json import dumps
|
||||
@@ -21,6 +24,8 @@ import requests
|
||||
from IPython.display import HTML, display
|
||||
from tqdm import tqdm
|
||||
|
||||
from sglang.srt.utils import kill_process_tree
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -306,27 +311,12 @@ def download_and_cache_file(url: str, filename: Optional[str] = None):
|
||||
return filename
|
||||
|
||||
|
||||
import fcntl
|
||||
|
||||
|
||||
def is_in_ci():
|
||||
from sglang.test.test_utils import is_in_ci
|
||||
|
||||
return is_in_ci()
|
||||
|
||||
|
||||
LOCKFILE = os.path.expanduser("~/.sglang_port_lock")
|
||||
PORT_REGISTRY = os.path.expanduser("~/.sglang_port_registry.json")
|
||||
|
||||
if not os.path.exists(LOCKFILE):
|
||||
with open(LOCKFILE, "w") as f:
|
||||
pass
|
||||
|
||||
if not os.path.exists(PORT_REGISTRY):
|
||||
with open(PORT_REGISTRY, "w") as f:
|
||||
json.dump([], f)
|
||||
|
||||
|
||||
def print_highlight(html_content: str):
|
||||
if is_in_ci():
|
||||
html_content = str(html_content).replace("\n", "<br>")
|
||||
@@ -335,55 +325,44 @@ def print_highlight(html_content: str):
|
||||
print(html_content)
|
||||
|
||||
|
||||
def init_port_registry():
|
||||
"""Initialize the port registry file if it doesn't exist."""
|
||||
if not os.path.exists(PORT_REGISTRY):
|
||||
with open(PORT_REGISTRY, "w") as f:
|
||||
json.dump([], f)
|
||||
process_socket_map = weakref.WeakKeyDictionary()
|
||||
|
||||
|
||||
def reserve_port(start=30000, end=40000):
|
||||
def reserve_port(host, start=30000, end=40000):
|
||||
"""
|
||||
Reserve an available port using a file lock and a registry.
|
||||
Returns the allocated port.
|
||||
Reserve an available port by trying to bind a socket.
|
||||
Returns a tuple (port, lock_socket) where `lock_socket` is kept open to hold the lock.
|
||||
"""
|
||||
init_port_registry()
|
||||
with open(LOCKFILE, "w") as lock:
|
||||
fcntl.flock(lock, fcntl.LOCK_EX)
|
||||
candidates = list(range(start, end))
|
||||
random.shuffle(candidates)
|
||||
|
||||
for port in candidates:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
try:
|
||||
with open(PORT_REGISTRY, "r") as f:
|
||||
used = json.load(f)
|
||||
except Exception:
|
||||
used = []
|
||||
for port in range(start, end):
|
||||
if port not in used:
|
||||
used.append(port)
|
||||
with open(PORT_REGISTRY, "w") as f:
|
||||
json.dump(used, f)
|
||||
return port
|
||||
raise RuntimeError("No free port available")
|
||||
# Attempt to bind to the port on localhost
|
||||
sock.bind((host, port))
|
||||
return port, sock
|
||||
except socket.error:
|
||||
sock.close() # Failed to bind, try next port
|
||||
continue
|
||||
raise RuntimeError("No free port available.")
|
||||
|
||||
|
||||
def release_port(port):
|
||||
"""Release the reserved port by removing it from the registry."""
|
||||
with open(LOCKFILE, "w") as lock:
|
||||
fcntl.flock(lock, fcntl.LOCK_EX)
|
||||
try:
|
||||
with open(PORT_REGISTRY, "r") as f:
|
||||
used = json.load(f)
|
||||
except Exception:
|
||||
used = []
|
||||
if port in used:
|
||||
used.remove(port)
|
||||
with open(PORT_REGISTRY, "w") as f:
|
||||
json.dump(used, f)
|
||||
def release_port(lock_socket):
|
||||
"""
|
||||
Release the reserved port by closing the lock socket.
|
||||
"""
|
||||
try:
|
||||
lock_socket.close()
|
||||
except Exception as e:
|
||||
print(f"Error closing socket: {e}")
|
||||
|
||||
|
||||
def execute_shell_command(command: str) -> subprocess.Popen:
|
||||
"""
|
||||
Execute a shell command and return its process handle.
|
||||
"""
|
||||
# Replace newline continuations and split the command string.
|
||||
command = command.replace("\\\n", " ").replace("\\", " ")
|
||||
parts = command.split()
|
||||
return subprocess.Popen(parts, text=True, stderr=subprocess.STDOUT)
|
||||
@@ -395,21 +374,28 @@ def launch_server_cmd(command: str, host: str = "0.0.0.0", port: int = None):
|
||||
If no port is specified, a free port is reserved.
|
||||
"""
|
||||
if port is None:
|
||||
port = reserve_port()
|
||||
port, lock_socket = reserve_port(host)
|
||||
else:
|
||||
lock_socket = None
|
||||
|
||||
full_command = f"{command} --port {port}"
|
||||
process = execute_shell_command(full_command)
|
||||
|
||||
if lock_socket is not None:
|
||||
process_socket_map[process] = lock_socket
|
||||
|
||||
return process, port
|
||||
|
||||
|
||||
def terminate_process(process, port=None):
|
||||
def terminate_process(process):
|
||||
"""
|
||||
Terminate the process and, if a port was reserved, release it.
|
||||
Terminate the process and automatically release the reserved port.
|
||||
"""
|
||||
from sglang.srt.utils import kill_process_tree
|
||||
|
||||
kill_process_tree(process.pid)
|
||||
if port is not None:
|
||||
release_port(port)
|
||||
|
||||
lock_socket = process_socket_map.pop(process, None)
|
||||
if lock_socket is not None:
|
||||
release_port(lock_socket)
|
||||
|
||||
|
||||
def wait_for_server(base_url: str, timeout: int = None) -> None:
|
||||
|
||||
Reference in New Issue
Block a user