[router] address worker load tracking consistency (#9523)

Co-authored-by: fzyzcjy <5236035+fzyzcjy@users.noreply.github.com>
This commit is contained in:
Simo Lin
2025-08-26 06:40:51 -07:00
committed by GitHub
parent 0936c766ed
commit 3578eb1e9b
3 changed files with 95 additions and 2 deletions

View File

@@ -1243,10 +1243,19 @@ impl PDRouter {
let decode_workers = self.decode_workers.clone();
tokio::spawn(async move {
// Use a flag to track whether stream completed successfully
let mut stream_completed = false;
futures_util::pin_mut!(stream);
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
// Check for stream end marker to decrement load early
let is_done = chunk
.as_ref()
.windows(12)
.any(|window| window == b"data: [DONE]");
let result = if return_logprob && prefill_logprobs.is_some() {
// Try to merge logprobs
Self::merge_streaming_logprobs(prefill_logprobs.clone(), &chunk)
@@ -1258,6 +1267,12 @@ impl PDRouter {
if tx.send(Ok(result)).is_err() {
break;
}
// If we see the done marker, decrement load immediately
if is_done {
stream_completed = true;
break;
}
}
Err(e) => {
if let Some(ref url) = decode_url {
@@ -1270,20 +1285,30 @@ impl PDRouter {
}
}
// Decrement load after streaming is complete
// Always decrement load after streaming (either completes or errors)
// Find and decrement prefill worker
if let Ok(prefill_workers_guard) = prefill_workers.read() {
for worker in prefill_workers_guard.iter() {
if worker.url() == prefill_url.as_str() {
worker.decrement_load();
debug!(
"Decremented load for prefill worker: {} (stream_completed: {})",
prefill_url, stream_completed
);
break;
}
}
}
// Find and decrement decode worker
if let Ok(decode_workers_guard) = decode_workers.read() {
for worker in decode_workers_guard.iter() {
if worker.url() == decode_url_str.as_str() {
worker.decrement_load();
debug!(
"Decremented load for decode worker: {} (stream_completed: {})",
decode_url_str, stream_completed
);
break;
}
}

View File

@@ -490,6 +490,13 @@ impl Router {
false
};
// Keep a clone for potential cleanup on retry
let worker_for_cleanup = if load_incremented {
Some(worker.clone_worker())
} else {
None
};
let response = self
.send_typed_request(
headers,
@@ -502,6 +509,19 @@ impl Router {
.await;
worker.record_outcome(response.status().is_success());
// For retryable failures, we need to decrement load since send_typed_request
// won't have done it (it only decrements on success or non-retryable failures)
if is_retryable_status(response.status()) && load_incremented {
if let Some(cleanup_worker) = worker_for_cleanup {
cleanup_worker.decrement_load();
RouterMetrics::set_running_requests(
cleanup_worker.url(),
cleanup_worker.load(),
);
}
}
response
},
// should_retry predicate
@@ -657,13 +677,25 @@ impl Router {
response
}
Err(e) => {
// IMPORTANT: Decrement load on error before returning
if load_incremented {
if let Ok(workers_guard) = self.workers.read() {
if let Some(worker) =
workers_guard.iter().find(|w| w.url() == worker_url)
{
worker.decrement_load();
RouterMetrics::set_running_requests(worker_url, worker.load());
}
}
}
let error_msg = format!("Failed to get response body: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, error_msg).into_response()
}
};
// Decrement load counter for non-streaming requests if it was incremented
if load_incremented && !is_stream {
if load_incremented {
if let Ok(workers_guard) = self.workers.read() {
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
worker.decrement_load();