[Fix] Reduce busy polling when scheduler is idle (#6026)
This commit is contained in:
committed by
GitHub
parent
4b9971e401
commit
bd7cfbd2f8
@@ -550,6 +550,7 @@ class SchedulerDisaggregationDecodeMixin:
|
||||
# When the server is idle, do self-check and re-init some states
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
self.last_batch = batch
|
||||
|
||||
@@ -628,6 +629,7 @@ class SchedulerDisaggregationDecodeMixin:
|
||||
# When the server is idle, do self-check and re-init some states
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
self.last_batch = batch
|
||||
self.last_batch_in_queue = last_batch_in_queue
|
||||
|
||||
@@ -242,6 +242,7 @@ class SchedulerDisaggregationPrefillMixin:
|
||||
if batch is None and len(self.disagg_prefill_inflight_queue) == 0:
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
self.last_batch = batch
|
||||
# HACK (byronhsu): reset the batch_is_full flag because we never enter update_running_batch which resets it
|
||||
@@ -294,6 +295,7 @@ class SchedulerDisaggregationPrefillMixin:
|
||||
if batch is None and len(self.disagg_prefill_inflight_queue) == 0:
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
self.last_batch = batch
|
||||
# HACK (byronhsu): reset the batch_is_full flag because we never enter update_running_batch which resets it
|
||||
|
||||
@@ -179,6 +179,27 @@ class EmbeddingBatchResult:
|
||||
bid: int
|
||||
|
||||
|
||||
class IdleSleeper:
|
||||
"""
|
||||
In setups which have long inactivity periods it is desirable to reduce
|
||||
system power consumption when sglang does nothing. This would lead not only
|
||||
to power savings, but also to more CPU thermal headroom when a request
|
||||
eventually comes. This is important in cases when multiple GPUs are connected
|
||||
as each GPU would otherwise pin one thread at 100% CPU usage.
|
||||
|
||||
The simplest solution is to use zmq.Poller on all sockets that may receive
|
||||
data that needs handling immediately.
|
||||
"""
|
||||
|
||||
def __init__(self, sockets):
|
||||
self.poller = zmq.Poller()
|
||||
for s in sockets:
|
||||
self.poller.register(s, zmq.POLLIN)
|
||||
|
||||
def maybe_sleep(self):
|
||||
self.poller.poll(1000)
|
||||
|
||||
|
||||
class Scheduler(
|
||||
SchedulerOutputProcessorMixin,
|
||||
SchedulerDisaggregationDecodeMixin,
|
||||
@@ -228,6 +249,8 @@ class Scheduler(
|
||||
|
||||
# Init inter-process communication
|
||||
context = zmq.Context(2)
|
||||
self.idle_sleeper = None
|
||||
|
||||
if self.pp_rank == 0 and self.attn_tp_rank == 0:
|
||||
self.recv_from_tokenizer = get_zmq_socket(
|
||||
context, zmq.PULL, port_args.scheduler_input_ipc_name, False
|
||||
@@ -250,6 +273,13 @@ class Scheduler(
|
||||
self.recv_from_rpc = get_zmq_socket(
|
||||
context, zmq.DEALER, port_args.rpc_ipc_name, False
|
||||
)
|
||||
if self.server_args.sleep_on_idle:
|
||||
self.idle_sleeper = IdleSleeper(
|
||||
[
|
||||
self.recv_from_tokenizer,
|
||||
self.recv_from_rpc,
|
||||
]
|
||||
)
|
||||
else:
|
||||
self.recv_from_tokenizer = None
|
||||
self.recv_from_rpc = None
|
||||
@@ -478,6 +508,10 @@ class Scheduler(
|
||||
)
|
||||
self.init_disaggregation()
|
||||
|
||||
def maybe_sleep_on_idle(self):
|
||||
if self.idle_sleeper is not None:
|
||||
self.idle_sleeper.maybe_sleep()
|
||||
|
||||
def init_tokenizer(self):
|
||||
server_args = self.server_args
|
||||
|
||||
@@ -667,6 +701,7 @@ class Scheduler(
|
||||
# When the server is idle, do self-check and re-init some states
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
self.last_batch = batch
|
||||
|
||||
@@ -711,6 +746,7 @@ class Scheduler(
|
||||
# When the server is idle, do self-check and re-init some states
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
self.last_batch = batch
|
||||
|
||||
@@ -816,6 +852,7 @@ class Scheduler(
|
||||
if server_is_idle:
|
||||
self.check_memory()
|
||||
self.new_token_ratio = self.init_new_token_ratio
|
||||
self.maybe_sleep_on_idle()
|
||||
|
||||
def recv_requests(self) -> List[Req]:
|
||||
"""Receive results at tp_rank = 0 and broadcast it to all other TP ranks."""
|
||||
|
||||
@@ -90,6 +90,7 @@ class ServerArgs:
|
||||
download_dir: Optional[str] = None
|
||||
base_gpu_id: int = 0
|
||||
gpu_id_step: int = 1
|
||||
sleep_on_idle: bool = False
|
||||
|
||||
# Logging
|
||||
log_level: str = "info"
|
||||
@@ -844,6 +845,11 @@ class ServerArgs:
|
||||
default=ServerArgs.gpu_id_step,
|
||||
help="The delta between consecutive GPU IDs that are used. For example, setting it to 2 will use GPU 0,2,4,...",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sleep-on-idle",
|
||||
action="store_true",
|
||||
help="Reduce CPU usage when sglang is idle.",
|
||||
)
|
||||
|
||||
# Logging
|
||||
parser.add_argument(
|
||||
|
||||
Reference in New Issue
Block a user