From cef91b1ed70ee2eb46989cc810097afb0dc9e032 Mon Sep 17 00:00:00 2001 From: fzyzcjy <5236035+fzyzcjy@users.noreply.github.com> Date: Thu, 8 May 2025 16:03:08 +0800 Subject: [PATCH] [PD] Add control to slow down a server (#5572) --- python/sglang/srt/entrypoints/http_server.py | 14 ++++++++++++++ python/sglang/srt/managers/io_struct.py | 10 ++++++++++ python/sglang/srt/managers/scheduler.py | 16 ++++++++++++++++ python/sglang/srt/managers/tokenizer_manager.py | 17 +++++++++++++++++ 4 files changed, 57 insertions(+) diff --git a/python/sglang/srt/entrypoints/http_server.py b/python/sglang/srt/entrypoints/http_server.py index 30f4f2305..1ac29e8d5 100644 --- a/python/sglang/srt/entrypoints/http_server.py +++ b/python/sglang/srt/entrypoints/http_server.py @@ -62,6 +62,7 @@ from sglang.srt.managers.io_struct import ( ResumeMemoryOccupationReqInput, SeparateReasoningReqInput, SetInternalStateReq, + SlowDownReqInput, UpdateWeightFromDiskReqInput, UpdateWeightsFromDistributedReqInput, UpdateWeightsFromTensorReqInput, @@ -494,6 +495,19 @@ async def resume_memory_occupation( return _create_error_response(e) +@app.api_route("/slow_down", methods=["GET", "POST"]) +async def slow_down(obj: SlowDownReqInput, request: Request): + """Slow down the system deliberately. Only for testing. Example scenario: + when we want to test performance of D in large-scale PD disaggregation and have no enough nodes for P, + we can use this to slow down D to let it have enough running sequences, and then disable slowdown + to let it run in full batch size. + """ + try: + await _global_state.tokenizer_manager.slow_down(obj, request) + except Exception as e: + return _create_error_response(e) + + @app.api_route("/open_session", methods=["GET", "POST"]) async def open_session(obj: OpenSessionReqInput, request: Request): """Open a session, and return its unique session id.""" diff --git a/python/sglang/srt/managers/io_struct.py b/python/sglang/srt/managers/io_struct.py index 174656b2d..76e57177e 100644 --- a/python/sglang/srt/managers/io_struct.py +++ b/python/sglang/srt/managers/io_struct.py @@ -790,6 +790,16 @@ class ResumeMemoryOccupationReqOutput: pass +@dataclass +class SlowDownReqInput: + forward_sleep_time: Optional[float] + + +@dataclass +class SlowDownReqOutput: + pass + + @dataclass class AbortReq: # The request id diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index b69bcd140..82f8c9ad8 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -87,6 +87,8 @@ from sglang.srt.managers.io_struct import ( RpcReqOutput, SetInternalStateReq, SetInternalStateReqOutput, + SlowDownReqInput, + SlowDownReqOutput, TokenizedEmbeddingReqInput, TokenizedGenerateReqInput, UpdateWeightFromDiskReqInput, @@ -417,6 +419,8 @@ class Scheduler( self.profiler_id: Optional[str] = None self.profiler_target_forward_ct: Optional[int] = None + self.forward_sleep_time = None + # Init metrics stats self.init_metrics() @@ -439,6 +443,7 @@ class Scheduler( (GetWeightsByNameReqInput, self.get_weights_by_name), (ReleaseMemoryOccupationReqInput, self.release_memory_occupation), (ResumeMemoryOccupationReqInput, self.resume_memory_occupation), + (SlowDownReqInput, self.slow_down), (ProfileReq, self.profile), (GetInternalStateReq, self.get_internal_state), (SetInternalStateReq, self.set_internal_state), @@ -1526,6 +1531,10 @@ class Scheduler( ): self.stop_profile() + if self.forward_sleep_time is not None: + logger.info(f"Scheduler.run_batch sleep {self.forward_sleep_time}s") + time.sleep(self.forward_sleep_time) + # Run forward if self.is_generation: if self.spec_algorithm.is_none(): @@ -2001,6 +2010,13 @@ class Scheduler( del self.stashed_model_static_state return ResumeMemoryOccupationReqOutput() + def slow_down(self, recv_req: SlowDownReqInput): + t = recv_req.forward_sleep_time + if t is not None and t <= 0: + t = None + self.forward_sleep_time = t + return SlowDownReqOutput() + def profile(self, recv_req: ProfileReq): if recv_req.type == ProfileReqType.START_PROFILE: return self.start_profile( diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index 9db63c881..f5f0f4187 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -90,6 +90,8 @@ from sglang.srt.managers.io_struct import ( ResumeMemoryOccupationReqInput, ResumeMemoryOccupationReqOutput, SessionParams, + SlowDownReqInput, + SlowDownReqOutput, TokenizedEmbeddingReqInput, TokenizedGenerateReqInput, UpdateWeightFromDiskReqInput, @@ -259,6 +261,9 @@ class TokenizerManager: self.resume_memory_occupation_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size ) + self.slow_down_communicator = _Communicator( + self.send_to_scheduler, server_args.dp_size + ) self.flush_cache_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size ) @@ -312,6 +317,10 @@ class TokenizerManager: ResumeMemoryOccupationReqOutput, self.resume_memory_occupation_communicator.handle_recv, ), + ( + SlowDownReqOutput, + self.slow_down_communicator.handle_recv, + ), ( FlushCacheReqOutput, self.flush_cache_communicator.handle_recv, @@ -870,6 +879,14 @@ class TokenizerManager: self.auto_create_handle_loop() await self.resume_memory_occupation_communicator(obj) + async def slow_down( + self, + obj: SlowDownReqInput, + request: Optional[fastapi.Request] = None, + ): + self.auto_create_handle_loop() + await self.slow_down_communicator(obj) + async def open_session( self, obj: OpenSessionReqInput, request: Optional[fastapi.Request] = None ):