[Nightly] Optimize nightly CI (#4509)
### What this PR does / why we need it? 1. Optimize multi-node waiting logic 2. Remove the `tee` pipeline for logs, which will lead to hang issue ### How was this patch tested? - vLLM version: v0.12.0 - vLLM main: https://github.com/vllm-project/vllm/commit/v0.12.0 Signed-off-by: wangli <wangli858794774@gmail.com>
This commit is contained in:
@@ -20,6 +20,7 @@
|
||||
import contextlib
|
||||
import gc
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
@@ -35,6 +36,7 @@ import requests
|
||||
import torch
|
||||
from modelscope import snapshot_download # type: ignore[import-untyped]
|
||||
from PIL import Image
|
||||
from requests.exceptions import RequestException
|
||||
from torch import nn
|
||||
from transformers import (AutoConfig, AutoModelForCausalLM, AutoTokenizer,
|
||||
BatchEncoding, BatchFeature)
|
||||
@@ -70,6 +72,7 @@ _PromptMultiModalInput = Union[List[_M], List[List[_M]]]
|
||||
PromptImageInput = _PromptMultiModalInput[Image.Image]
|
||||
PromptAudioInput = _PromptMultiModalInput[Tuple[np.ndarray, int]]
|
||||
PromptVideoInput = _PromptMultiModalInput[np.ndarray]
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_TEST_DIR = os.path.dirname(__file__)
|
||||
|
||||
@@ -161,22 +164,17 @@ class RemoteOpenAIServer:
|
||||
max_wait_seconds = max_wait_seconds or 1800
|
||||
if self.disaggregated_prefill:
|
||||
assert proxy_port is not None, "for disaggregated_prefill, proxy port must be provided"
|
||||
self._wait_for_server_pd(proxy_port=proxy_port,
|
||||
timeout=max_wait_seconds)
|
||||
self._wait_for_server_pd(timeout=max_wait_seconds)
|
||||
else:
|
||||
self._wait_for_server(url=self.url_for("health"),
|
||||
timeout=max_wait_seconds)
|
||||
self._wait_for_multiple_servers(
|
||||
[(self.host, self.url_for("health"))],
|
||||
timeout=max_wait_seconds)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.proc.terminate()
|
||||
try:
|
||||
self.proc.wait(8)
|
||||
except subprocess.TimeoutExpired:
|
||||
# force kill if needed
|
||||
self.proc.kill()
|
||||
self._terminate_server()
|
||||
|
||||
def _poll(self) -> Optional[int]:
|
||||
"""Subclasses override this method to customize process polling"""
|
||||
@@ -201,48 +199,95 @@ class RemoteOpenAIServer:
|
||||
finally:
|
||||
if isinstance(client, httpx.Client):
|
||||
client.close()
|
||||
self._terminate_server()
|
||||
|
||||
def _wait_for_server_pd(self, proxy_port: int, timeout: float):
|
||||
def _wait_for_server_pd(self, timeout: float):
|
||||
# Wait for all api_server nodes ready
|
||||
assert self.nodes_info is not None, "cluster info must be provided"
|
||||
for node_info in self.nodes_info:
|
||||
if node_info.headless:
|
||||
continue
|
||||
proxy_port = self.proxy_port
|
||||
|
||||
url_health = f"http://{node_info.ip}:{node_info.server_port}/health"
|
||||
self._wait_for_server(url=url_health, timeout=timeout)
|
||||
def url_health(ip: str, port: int) -> str:
|
||||
return f"http://{ip}:{port}/health"
|
||||
|
||||
targets = [(node_info.ip,
|
||||
url_health(node_info.ip, node_info.server_port))
|
||||
for node_info in self.nodes_info if not node_info.headless]
|
||||
|
||||
# Wait for proxy ready
|
||||
master_node = self.nodes_info[0]
|
||||
url_proxy = f"http://{master_node.ip}:{proxy_port}/healthcheck"
|
||||
self._wait_for_server(url=url_proxy, timeout=timeout)
|
||||
|
||||
def _wait_for_server(self, *, url: str, timeout: float):
|
||||
# run health check
|
||||
# Wait for master node proxy first
|
||||
self._wait_for_multiple_servers([(master_node.ip, url_proxy)],
|
||||
timeout=timeout)
|
||||
|
||||
# Then wait for all api_server nodes
|
||||
self._wait_for_multiple_servers(targets=targets, timeout=timeout)
|
||||
|
||||
def _wait_for_multiple_servers(self, targets, timeout: float):
|
||||
"""
|
||||
targets: List[(node_ip, url)]
|
||||
"""
|
||||
start = time.time()
|
||||
client = requests
|
||||
while True:
|
||||
try:
|
||||
if client.get(url).status_code == 200:
|
||||
break
|
||||
except Exception:
|
||||
# this exception can only be raised by requests.get,
|
||||
# which means the server is not ready yet.
|
||||
# the stack trace is not useful, so we suppress it
|
||||
# by using `raise from None`.
|
||||
result = self._poll()
|
||||
if result is not None and result != 0:
|
||||
raise RuntimeError("Server exited unexpectedly.") from None
|
||||
|
||||
time.sleep(5)
|
||||
if time.time() - start > timeout:
|
||||
raise RuntimeError(
|
||||
"Server failed to start in time.") from None
|
||||
# track readiness
|
||||
ready = {node_ip: False for node_ip, _ in targets}
|
||||
|
||||
# polling loop
|
||||
while True:
|
||||
all_ready = True
|
||||
|
||||
for node_ip, url in targets:
|
||||
if ready[node_ip]:
|
||||
continue # already ready
|
||||
|
||||
try:
|
||||
resp = client.get(url)
|
||||
if resp.status_code == 200:
|
||||
ready[node_ip] = True
|
||||
logger.info(f"[READY] Node {node_ip} is ready.")
|
||||
else:
|
||||
all_ready = False
|
||||
logger.info(f"[WAIT] {url}: HTTP {resp.status_code}")
|
||||
except RequestException:
|
||||
all_ready = False
|
||||
logger.info(f"[WAIT] {url}: connection failed")
|
||||
|
||||
# underlying process died?
|
||||
result = self._poll()
|
||||
if result is not None and result != 0:
|
||||
raise RuntimeError(
|
||||
f"Server at {node_ip} exited unexpectedly."
|
||||
) from None
|
||||
|
||||
# if all nodes ready, exit
|
||||
if all_ready:
|
||||
break
|
||||
|
||||
# check timeout
|
||||
if time.time() - start > timeout:
|
||||
not_ready_nodes = [n for n, ok in ready.items() if not ok]
|
||||
self._terminate_server()
|
||||
raise RuntimeError(
|
||||
f"Timeout: these nodes did not become ready: {not_ready_nodes}"
|
||||
) from None
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
@property
|
||||
def url_root(self) -> str:
|
||||
return f"http://{self.host}:{self.port}"
|
||||
|
||||
def _terminate_server(self) -> None:
|
||||
"""Subclasses override this method to customize server process termination"""
|
||||
self.proc.terminate()
|
||||
try:
|
||||
self.proc.wait(8)
|
||||
except subprocess.TimeoutExpired:
|
||||
# force kill if needed
|
||||
self.proc.kill()
|
||||
|
||||
def url_for(self, *parts: str) -> str:
|
||||
return self.url_root + "/" + "/".join(parts)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user