diff --git a/python/sglang/srt/disaggregation/prefill.py b/python/sglang/srt/disaggregation/prefill.py index 25ab54bb8..865955be5 100644 --- a/python/sglang/srt/disaggregation/prefill.py +++ b/python/sglang/srt/disaggregation/prefill.py @@ -10,9 +10,9 @@ Life cycle of a request in the prefill server 2. Waiting Queue a. Use PrefillAdder to pop requests b. Run forward - c. Add the request to Infight Queue + c. Add the request to Inflight Queue -3. Infight Queue +3. Inflight Queue a. Poll (non-blocking) the sender of the request b. Once the transfer has finished, return the request """ @@ -162,7 +162,7 @@ class SchedulerDisaggregationPrefillMixin: self: Scheduler, batch: ScheduleBatch, result: GenerationBatchResult ) -> None: """ - Transfer kv for prefill completed requests and add it into disagg_prefill_infight_queue + Transfer kv for prefill completed requests and add it into disagg_prefill_inflight_queue Adapted from process_batch_result_prefill """ @@ -175,7 +175,7 @@ class SchedulerDisaggregationPrefillMixin: req.output_ids.append(next_token_id) self.tree_cache.cache_unfinished_req(req) # update the tree and lock self.send_kv_chunk(req, token_id=next_token_id) - self.disagg_prefill_infight_queue.append(req) + self.disagg_prefill_inflight_queue.append(req) else: # being chunked reqs' prefill is not finished req.is_chunked -= 1 @@ -187,22 +187,22 @@ class SchedulerDisaggregationPrefillMixin: self.current_stream.synchronize() batch.next_batch_sampling_info.sampling_info_done.set() - def process_disagg_prefill_infight_queue(self: Scheduler) -> None: + def process_disagg_prefill_inflight_queue(self: Scheduler) -> None: """ Poll the requests in the middle of transfer. If done, return the request. """ - assert len(self.disagg_prefill_infight_queue) > 0 + assert len(self.disagg_prefill_inflight_queue) > 0 done_reqs = [] polls = poll_and_all_reduce( - [req.disagg_kv_sender for req in self.disagg_prefill_infight_queue], + [req.disagg_kv_sender for req in self.disagg_prefill_inflight_queue], self.tp_worker.get_tp_cpu_group(), ) undone_reqs: List[Req] = [] - # Check .poll() for the reqs in disagg_prefill_infight_queue. If Success, respond to the client and remove it from the queue - for req, poll in zip(self.disagg_prefill_infight_queue, polls): + # Check .poll() for the reqs in disagg_prefill_inflight_queue. If Success, respond to the client and remove it from the queue + for req, poll in zip(self.disagg_prefill_inflight_queue, polls): if poll in [KVPoll.WaitingForInput, KVPoll.Transferring]: undone_reqs.append(req) elif poll == KVPoll.Success: # transfer done @@ -215,7 +215,7 @@ class SchedulerDisaggregationPrefillMixin: # Stream requests which have finished transfer self.stream_output(done_reqs, False, None) - self.disagg_prefill_infight_queue = undone_reqs + self.disagg_prefill_inflight_queue = undone_reqs def process_prefill_chunk(self: Scheduler) -> None: if self.last_batch and self.last_batch.forward_mode.is_extend(): diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 2bba79770..ae469ae2a 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -594,7 +594,7 @@ class Scheduler( gloo_group=self.tp_worker.get_attention_tp_cpu_group(), ) # The prefill requests that are in the middle of kv sending - self.disagg_prefill_infight_queue: List[Req] = [] + self.disagg_prefill_inflight_queue: List[Req] = [] @DynamicGradMode() def event_loop_normal(self): @@ -674,10 +674,10 @@ class Scheduler( result = self.run_batch(batch) self.process_batch_result_disagg_prefill(batch, result) - if len(self.disagg_prefill_infight_queue) > 0: - self.process_disagg_prefill_infight_queue() + if len(self.disagg_prefill_inflight_queue) > 0: + self.process_disagg_prefill_inflight_queue() - if batch is None and len(self.disagg_prefill_infight_queue) == 0: + if batch is None and len(self.disagg_prefill_inflight_queue) == 0: self.check_memory() self.new_token_ratio = self.init_new_token_ratio