[router] grpc router regular mode import cleanup (#10963)
This commit is contained in:
@@ -12,19 +12,25 @@ use axum::{
|
|||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use crate::config::types::RetryConfig;
|
use crate::config::types::RetryConfig;
|
||||||
use crate::core::{WorkerRegistry, WorkerType};
|
use crate::core::{ConnectionMode, Worker, WorkerRegistry, WorkerType};
|
||||||
use crate::grpc_client::{proto, SglangSchedulerClient};
|
use crate::grpc_client::{proto, SglangSchedulerClient};
|
||||||
use crate::metrics::RouterMetrics;
|
use crate::metrics::RouterMetrics;
|
||||||
use crate::policies::PolicyRegistry;
|
use crate::policies::PolicyRegistry;
|
||||||
|
use crate::protocols::spec::ChatMessage;
|
||||||
use crate::protocols::spec::{ChatCompletionRequest, StringOrArray};
|
use crate::protocols::spec::{ChatCompletionRequest, StringOrArray};
|
||||||
|
use crate::protocols::spec::{
|
||||||
|
CompletionRequest, EmbeddingRequest, GenerateRequest, RerankRequest, ResponsesGetParams,
|
||||||
|
ResponsesRequest, Tool, ToolChoice,
|
||||||
|
};
|
||||||
use crate::reasoning_parser::ParserFactory;
|
use crate::reasoning_parser::ParserFactory;
|
||||||
use crate::routers::RouterTrait;
|
use crate::routers::RouterTrait;
|
||||||
use crate::tokenizer::traits::Tokenizer;
|
use crate::server::AppContext;
|
||||||
use crate::tool_parser::ParserRegistry;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::tokenizer::chat_template::{ChatTemplateContentFormat, ChatTemplateParams};
|
use crate::tokenizer::chat_template::{ChatTemplateContentFormat, ChatTemplateParams};
|
||||||
|
use crate::tokenizer::traits::Tokenizer;
|
||||||
|
use crate::tokenizer::HuggingFaceTokenizer;
|
||||||
|
use crate::tool_parser::ParserRegistry;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
// Data structures for processing
|
// Data structures for processing
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -49,7 +55,7 @@ pub struct GrpcRouter {
|
|||||||
|
|
||||||
impl GrpcRouter {
|
impl GrpcRouter {
|
||||||
/// Create a new gRPC router
|
/// Create a new gRPC router
|
||||||
pub async fn new(ctx: &Arc<crate::server::AppContext>) -> Result<Self, String> {
|
pub async fn new(ctx: &Arc<AppContext>) -> Result<Self, String> {
|
||||||
// Extract necessary components from context
|
// Extract necessary components from context
|
||||||
let tokenizer = ctx
|
let tokenizer = ctx
|
||||||
.tokenizer
|
.tokenizer
|
||||||
@@ -71,7 +77,7 @@ impl GrpcRouter {
|
|||||||
let workers = worker_registry.get_workers_filtered(
|
let workers = worker_registry.get_workers_filtered(
|
||||||
None,
|
None,
|
||||||
Some(WorkerType::Regular),
|
Some(WorkerType::Regular),
|
||||||
Some(crate::core::ConnectionMode::Grpc { port: None }),
|
Some(ConnectionMode::Grpc { port: None }),
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -207,17 +213,17 @@ impl GrpcRouter {
|
|||||||
&self,
|
&self,
|
||||||
model_id: Option<&str>,
|
model_id: Option<&str>,
|
||||||
text: Option<&str>,
|
text: Option<&str>,
|
||||||
) -> Option<Arc<dyn crate::core::Worker>> {
|
) -> Option<Arc<dyn Worker>> {
|
||||||
// Get workers for the specified model, filtered by connection mode
|
// Get workers for the specified model, filtered by connection mode
|
||||||
let workers = self.worker_registry.get_workers_filtered(
|
let workers = self.worker_registry.get_workers_filtered(
|
||||||
model_id,
|
model_id,
|
||||||
Some(WorkerType::Regular),
|
Some(WorkerType::Regular),
|
||||||
Some(crate::core::ConnectionMode::Grpc { port: None }),
|
Some(ConnectionMode::Grpc { port: None }),
|
||||||
false, // get all workers, we'll filter by is_available() next
|
false, // get all workers, we'll filter by is_available() next
|
||||||
);
|
);
|
||||||
|
|
||||||
// Filter by availability (health + circuit breaker)
|
// Filter by availability (health + circuit breaker)
|
||||||
let available: Vec<Arc<dyn crate::core::Worker>> = workers
|
let available: Vec<Arc<dyn Worker>> = workers
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|w| w.is_available())
|
.filter(|w| w.is_available())
|
||||||
.cloned()
|
.cloned()
|
||||||
@@ -244,10 +250,10 @@ impl GrpcRouter {
|
|||||||
request: &ChatCompletionRequest,
|
request: &ChatCompletionRequest,
|
||||||
) -> Result<ProcessedMessages, String> {
|
) -> Result<ProcessedMessages, String> {
|
||||||
// Use the tokenizer's chat template - we require HuggingFace tokenizer for gRPC
|
// Use the tokenizer's chat template - we require HuggingFace tokenizer for gRPC
|
||||||
let formatted_text = if let Some(hf_tokenizer) =
|
let formatted_text = if let Some(hf_tokenizer) = self
|
||||||
self.tokenizer
|
.tokenizer
|
||||||
.as_any()
|
.as_any()
|
||||||
.downcast_ref::<crate::tokenizer::HuggingFaceTokenizer>()
|
.downcast_ref::<HuggingFaceTokenizer>()
|
||||||
{
|
{
|
||||||
// Get content format and transform messages accordingly
|
// Get content format and transform messages accordingly
|
||||||
let content_format = hf_tokenizer.chat_template_content_format();
|
let content_format = hf_tokenizer.chat_template_content_format();
|
||||||
@@ -350,9 +356,9 @@ impl GrpcRouter {
|
|||||||
|
|
||||||
/// Process messages based on content format for ANY message type
|
/// Process messages based on content format for ANY message type
|
||||||
fn process_content_format(
|
fn process_content_format(
|
||||||
messages: &[crate::protocols::spec::ChatMessage],
|
messages: &[ChatMessage],
|
||||||
content_format: crate::tokenizer::chat_template::ChatTemplateContentFormat,
|
content_format: ChatTemplateContentFormat,
|
||||||
) -> Result<Vec<serde_json::Value>, String> {
|
) -> Result<Vec<Value>, String> {
|
||||||
messages
|
messages
|
||||||
.iter()
|
.iter()
|
||||||
.map(|message| {
|
.map(|message| {
|
||||||
@@ -422,7 +428,7 @@ impl GrpcRouter {
|
|||||||
|
|
||||||
/// Process tool call arguments in messages
|
/// Process tool call arguments in messages
|
||||||
/// Per Transformers docs, tool call arguments in assistant messages should be dicts
|
/// Per Transformers docs, tool call arguments in assistant messages should be dicts
|
||||||
fn process_tool_call_arguments(messages: &mut [serde_json::Value]) -> Result<(), String> {
|
fn process_tool_call_arguments(messages: &mut [Value]) -> Result<(), String> {
|
||||||
for msg in messages {
|
for msg in messages {
|
||||||
// Early return if not assistant message
|
// Early return if not assistant message
|
||||||
let role = msg.get("role").and_then(|v| v.as_str());
|
let role = msg.get("role").and_then(|v| v.as_str());
|
||||||
@@ -466,8 +472,8 @@ impl GrpcRouter {
|
|||||||
/// Generate tool constraints for structured generation
|
/// Generate tool constraints for structured generation
|
||||||
fn generate_tool_constraints(
|
fn generate_tool_constraints(
|
||||||
&self,
|
&self,
|
||||||
_tools: &[crate::protocols::spec::Tool],
|
_tools: &[Tool],
|
||||||
_tool_choice: &Option<crate::protocols::spec::ToolChoice>,
|
_tool_choice: &Option<ToolChoice>,
|
||||||
model: &str,
|
model: &str,
|
||||||
) -> Option<(String, String)> {
|
) -> Option<(String, String)> {
|
||||||
let _parser = self.tool_parser_registry.get_parser(model)?;
|
let _parser = self.tool_parser_registry.get_parser(model)?;
|
||||||
@@ -541,7 +547,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
async fn route_generate(
|
async fn route_generate(
|
||||||
&self,
|
&self,
|
||||||
_headers: Option<&HeaderMap>,
|
_headers: Option<&HeaderMap>,
|
||||||
_body: &crate::protocols::spec::GenerateRequest,
|
_body: &GenerateRequest,
|
||||||
_model_id: Option<&str>,
|
_model_id: Option<&str>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
(StatusCode::NOT_IMPLEMENTED).into_response()
|
(StatusCode::NOT_IMPLEMENTED).into_response()
|
||||||
@@ -550,7 +556,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
async fn route_chat(
|
async fn route_chat(
|
||||||
&self,
|
&self,
|
||||||
headers: Option<&HeaderMap>,
|
headers: Option<&HeaderMap>,
|
||||||
body: &crate::protocols::spec::ChatCompletionRequest,
|
body: &ChatCompletionRequest,
|
||||||
model_id: Option<&str>,
|
model_id: Option<&str>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
self.route_chat_impl(headers, body, model_id).await
|
self.route_chat_impl(headers, body, model_id).await
|
||||||
@@ -559,7 +565,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
async fn route_completion(
|
async fn route_completion(
|
||||||
&self,
|
&self,
|
||||||
_headers: Option<&HeaderMap>,
|
_headers: Option<&HeaderMap>,
|
||||||
_body: &crate::protocols::spec::CompletionRequest,
|
_body: &CompletionRequest,
|
||||||
_model_id: Option<&str>,
|
_model_id: Option<&str>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
(StatusCode::NOT_IMPLEMENTED).into_response()
|
(StatusCode::NOT_IMPLEMENTED).into_response()
|
||||||
@@ -568,7 +574,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
async fn route_responses(
|
async fn route_responses(
|
||||||
&self,
|
&self,
|
||||||
_headers: Option<&HeaderMap>,
|
_headers: Option<&HeaderMap>,
|
||||||
_body: &crate::protocols::spec::ResponsesRequest,
|
_body: &ResponsesRequest,
|
||||||
_model_id: Option<&str>,
|
_model_id: Option<&str>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
(StatusCode::NOT_IMPLEMENTED).into_response()
|
(StatusCode::NOT_IMPLEMENTED).into_response()
|
||||||
@@ -578,7 +584,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
&self,
|
&self,
|
||||||
_headers: Option<&HeaderMap>,
|
_headers: Option<&HeaderMap>,
|
||||||
_response_id: &str,
|
_response_id: &str,
|
||||||
_params: &crate::protocols::spec::ResponsesGetParams,
|
_params: &ResponsesGetParams,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
(StatusCode::NOT_IMPLEMENTED).into_response()
|
(StatusCode::NOT_IMPLEMENTED).into_response()
|
||||||
}
|
}
|
||||||
@@ -590,7 +596,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
async fn route_embeddings(
|
async fn route_embeddings(
|
||||||
&self,
|
&self,
|
||||||
_headers: Option<&HeaderMap>,
|
_headers: Option<&HeaderMap>,
|
||||||
_body: &crate::protocols::spec::EmbeddingRequest,
|
_body: &EmbeddingRequest,
|
||||||
_model_id: Option<&str>,
|
_model_id: Option<&str>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
(StatusCode::NOT_IMPLEMENTED).into_response()
|
(StatusCode::NOT_IMPLEMENTED).into_response()
|
||||||
@@ -599,7 +605,7 @@ impl RouterTrait for GrpcRouter {
|
|||||||
async fn route_rerank(
|
async fn route_rerank(
|
||||||
&self,
|
&self,
|
||||||
_headers: Option<&HeaderMap>,
|
_headers: Option<&HeaderMap>,
|
||||||
_body: &crate::protocols::spec::RerankRequest,
|
_body: &RerankRequest,
|
||||||
_model_id: Option<&str>,
|
_model_id: Option<&str>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
(StatusCode::NOT_IMPLEMENTED).into_response()
|
(StatusCode::NOT_IMPLEMENTED).into_response()
|
||||||
|
|||||||
Reference in New Issue
Block a user