[router] fix p and d worker filtering and bootstrap port handling (#11729)
This commit is contained in:
@@ -247,6 +247,20 @@ pub enum ConnectionMode {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ConnectionMode {
|
||||||
|
/// Check if this connection mode matches another, with special handling for gRPC
|
||||||
|
/// This allows matching any gRPC connection regardless of port when comparing
|
||||||
|
/// Grpc { port: None } as a wildcard
|
||||||
|
pub fn matches(&self, filter: &ConnectionMode) -> bool {
|
||||||
|
match (self, filter) {
|
||||||
|
(ConnectionMode::Http, ConnectionMode::Http) => true,
|
||||||
|
(ConnectionMode::Grpc { .. }, ConnectionMode::Grpc { port: None }) => true,
|
||||||
|
(ConnectionMode::Grpc { port: p1 }, ConnectionMode::Grpc { port: p2 }) => p1 == p2,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl fmt::Display for ConnectionMode {
|
impl fmt::Display for ConnectionMode {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
|||||||
@@ -308,9 +308,9 @@ impl WorkerRegistry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check connection_mode if specified
|
// Check connection_mode if specified (using matches for flexible gRPC matching)
|
||||||
if let Some(ref conn) = connection_mode {
|
if let Some(ref conn) = connection_mode {
|
||||||
if w.connection_mode() != *conn {
|
if !w.connection_mode().matches(conn) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -340,45 +340,32 @@ impl WorkerSelectionStage {
|
|||||||
model_id: Option<&str>,
|
model_id: Option<&str>,
|
||||||
text: Option<&str>,
|
text: Option<&str>,
|
||||||
) -> Option<(Arc<dyn Worker>, Arc<dyn Worker>)> {
|
) -> Option<(Arc<dyn Worker>, Arc<dyn Worker>)> {
|
||||||
// Get prefill workers - use None for WorkerType filter to get all types,
|
|
||||||
// then filter manually (since Prefill is a struct variant)
|
|
||||||
let all_workers = self.worker_registry.get_workers_filtered(
|
let all_workers = self.worker_registry.get_workers_filtered(
|
||||||
model_id,
|
model_id,
|
||||||
None, // Get all types
|
None,
|
||||||
Some(ConnectionMode::Grpc { port: None }),
|
Some(ConnectionMode::Grpc { port: None }), // Match any gRPC worker
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
|
|
||||||
let prefill_workers: Vec<_> = all_workers
|
let (available_prefill, available_decode): (Vec<_>, Vec<_>) =
|
||||||
.iter()
|
all_workers
|
||||||
.filter(|w| matches!(w.metadata().worker_type, WorkerType::Prefill { .. }))
|
.into_iter()
|
||||||
.cloned()
|
.fold((Vec::new(), Vec::new()), |mut acc, w| {
|
||||||
.collect();
|
if w.is_available() {
|
||||||
|
match w.metadata().worker_type {
|
||||||
let available_prefill: Vec<_> = prefill_workers
|
WorkerType::Prefill { .. } => acc.0.push(w),
|
||||||
.iter()
|
WorkerType::Decode => acc.1.push(w),
|
||||||
.filter(|w| w.is_available())
|
_ => {}
|
||||||
.cloned()
|
}
|
||||||
.collect();
|
}
|
||||||
|
acc
|
||||||
|
});
|
||||||
|
|
||||||
if available_prefill.is_empty() {
|
if available_prefill.is_empty() {
|
||||||
warn!("No available prefill workers");
|
warn!("No available prefill workers");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get decode workers from the same all_workers list
|
|
||||||
let decode_workers: Vec<_> = all_workers
|
|
||||||
.iter()
|
|
||||||
.filter(|w| matches!(w.metadata().worker_type, WorkerType::Decode))
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let available_decode: Vec<_> = decode_workers
|
|
||||||
.iter()
|
|
||||||
.filter(|w| w.is_available())
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if available_decode.is_empty() {
|
if available_decode.is_empty() {
|
||||||
warn!("No available decode workers");
|
warn!("No available decode workers");
|
||||||
return None;
|
return None;
|
||||||
|
|||||||
Reference in New Issue
Block a user