diff --git a/sgl-router/Cargo.lock b/sgl-router/Cargo.lock index 9820fced5..c5dff9ab1 100644 --- a/sgl-router/Cargo.lock +++ b/sgl-router/Cargo.lock @@ -30,7 +30,7 @@ dependencies = [ "actix-service", "actix-utils", "ahash", - "base64", + "base64 0.22.1", "bitflags", "brotli", "bytes", @@ -65,7 +65,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -182,7 +182,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -237,6 +237,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -301,6 +307,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -313,6 +330,17 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -328,6 +356,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -424,6 +458,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -459,7 +494,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -550,6 +585,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.87", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.87", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -573,6 +643,17 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.18" @@ -583,7 +664,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 2.0.87", ] [[package]] @@ -604,9 +685,21 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] +[[package]] +name = "dyn-clone" +version = "1.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c7a8fb8a9fbf66c1f703fe16184d10ca0ee9d23be5b4436400408ba54a95005" + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -701,6 +794,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -717,6 +825,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -731,7 +850,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -752,6 +871,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -833,6 +953,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "hashbrown" @@ -852,6 +976,15 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "http" version = "0.2.12" @@ -874,6 +1007,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -893,10 +1037,16 @@ dependencies = [ "bytes", "futures-util", "http 1.1.0", - "http-body", + "http-body 1.0.1", "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "httparse" version = "1.9.5" @@ -915,6 +1065,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.5.0" @@ -926,7 +1099,7 @@ dependencies = [ "futures-util", "h2 0.4.6", "http 1.1.0", - "http-body", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -935,6 +1108,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "rustls-native-certs", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.3" @@ -943,15 +1132,27 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper", + "hyper 1.5.0", "hyper-util", - "rustls", + "rustls 0.23.18", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.0", "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.32", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -960,7 +1161,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.5.0", "hyper-util", "native-tls", "tokio", @@ -978,8 +1179,8 @@ dependencies = [ "futures-channel", "futures-util", "http 1.1.0", - "http-body", - "hyper", + "http-body 1.0.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", @@ -1125,9 +1326,15 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.0.3" @@ -1171,6 +1378,15 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.10.1" @@ -1207,6 +1423,149 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-patch" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec9ad60d674508f3ca8f380a928cfe7b096bc729c4e2dbfe3852bc45da3ab30b" +dependencies = [ + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "jsonpath-rust" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96acbc6188d3bd83519d053efec756aa4419de62ec47be7f28dec297f7dc9eb0" +dependencies = [ + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "k8s-openapi" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "550f99d93aa4c2b25de527bce492d772caf5e21d7ac9bd4b508ba781c8d91e30" +dependencies = [ + "base64 0.21.7", + "chrono", + "serde", + "serde-value", + "serde_json", +] + +[[package]] +name = "kube" +version = "0.88.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "462fe330a0617b276ec864c2255810adcdf519ecb6844253c54074b2086a97bc" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "0.88.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe0d65dd6f3adba29cfb84f19dfe55449c7f6c35425f9d8294bec40313e0b64" +dependencies = [ + "base64 0.21.7", + "bytes", + "chrono", + "either", + "futures", + "home", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper-rustls 0.24.2", + "hyper-timeout", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "pin-project", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "0.88.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6b42844e9172f631b8263ea9ce003b9251da13beb1401580937ad206dd82f4c" +dependencies = [ + "chrono", + "form_urlencoded", + "http 0.2.12", + "json-patch", + "k8s-openapi", + "once_cell", + "schemars", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "kube-derive" +version = "0.88.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5b5a111ee287bd237b8190b8c39543ea9fd22f79e9c32a36c24e08234bcda22" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.87", +] + +[[package]] +name = "kube-runtime" +version = "0.88.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bc06275064c81056fbb28ea876b3fb339d970e8132282119359afca0835c0ea" +dependencies = [ + "ahash", + "async-trait", + "backoff", + "derivative", + "futures", + "hashbrown 0.14.5", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "smallvec", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -1402,7 +1761,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -1423,6 +1782,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -1458,12 +1826,87 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pem" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" +dependencies = [ + "base64 0.22.1", + "serde", +] + [[package]] name = "percent-encoding" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "198db74531d58c70a361c42201efde7e2591e976d518caf7662a47dc5720e7b6" +dependencies = [ + "memchr", + "thiserror 2.0.12", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d725d9cfd79e87dccc9341a2ef39d1b6f6353d68c4b33c177febbe1a402c97c5" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db7d01726be8ab66ab32f9df467ae8b1148906685bbe75c82d1e65d7f5b3f841" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "pest_meta" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9f832470494906d1fca5329f8ab5791cc60beb230c74815dff541cbd2b5ca0" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "pin-project-lite" version = "0.2.15" @@ -1559,7 +2002,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -1572,7 +2015,7 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -1679,7 +2122,7 @@ version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "encoding_rs", "futures-channel", @@ -1687,10 +2130,10 @@ dependencies = [ "futures-util", "h2 0.4.6", "http 1.1.0", - "http-body", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.5.0", + "hyper-rustls 0.27.3", "hyper-tls", "hyper-util", "ipnet", @@ -1701,7 +2144,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile", + "rustls-pemfile 2.2.0", "serde", "serde_json", "serde_urlencoded", @@ -1762,6 +2205,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.18" @@ -1770,11 +2225,32 @@ checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "once_cell", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -1790,6 +2266,16 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -1816,12 +2302,56 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "schemars" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.87", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -1860,6 +2390,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.214" @@ -1868,7 +2408,18 @@ checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", +] + +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", ] [[package]] @@ -1895,6 +2446,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sglang_router_rs" version = "0.0.0" @@ -1905,8 +2469,11 @@ dependencies = [ "clap", "dashmap", "env_logger", + "futures", "futures-util", "http 1.1.0", + "k8s-openapi", + "kube", "log", "pyo3", "rand", @@ -1931,6 +2498,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2004,6 +2582,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.87" @@ -2032,7 +2621,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -2081,7 +2670,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -2092,7 +2690,18 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", ] [[package]] @@ -2163,6 +2772,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -2173,13 +2792,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls", + "rustls 0.23.18", "rustls-pki-types", "tokio", ] @@ -2194,9 +2823,54 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "base64 0.21.7", + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "http-range-header", + "mime", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -2222,7 +2896,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" dependencies = [ "crossbeam-channel", - "thiserror", + "thiserror 1.0.69", "time", "tracing-subscriber", ] @@ -2235,7 +2909,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -2303,6 +2977,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "unicode-ident" version = "1.0.13" @@ -2315,6 +2995,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" @@ -2405,7 +3091,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.87", "wasm-bindgen-shared", ] @@ -2439,7 +3125,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2648,7 +3334,7 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", "synstructure", ] @@ -2670,7 +3356,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] @@ -2690,7 +3376,7 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", "synstructure", ] @@ -2719,7 +3405,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.87", ] [[package]] diff --git a/sgl-router/Cargo.toml b/sgl-router/Cargo.toml index 6ad8cb08d..65f951954 100644 --- a/sgl-router/Cargo.toml +++ b/sgl-router/Cargo.toml @@ -30,7 +30,9 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "chrono"] } tracing-log = "0.2" tracing-appender = "0.2.3" - +kube = { version = "0.88.1", features = ["runtime", "derive"] } +k8s-openapi = { version = "0.21.0", features = ["v1_29"] } +futures = "0.3" [profile.release] lto = "thin" codegen-units = 1 diff --git a/sgl-router/README.md b/sgl-router/README.md index 7e6cd8ed5..4ec152705 100644 --- a/sgl-router/README.md +++ b/sgl-router/README.md @@ -81,6 +81,41 @@ router = Router( Use the `--verbose` flag with the CLI for more detailed logs. +### Kubernetes Service Discovery + +SGL Router supports automatic service discovery for worker nodes in Kubernetes environments. When enabled, the router will automatically: + +- Discover and add worker pods with matching labels +- Remove unhealthy or deleted worker pods +- Dynamically adjust the worker pool based on pod health and availability + +#### Command Line Usage + +```bash +python -m sglang_router.launch_router \ + --service-discovery \ + --selector app=sglang-worker role=inference \ + --service-discovery-port 8000 \ + --service-discovery-namespace default +``` + +#### Service Discovery Arguments + +- `--service-discovery`: Enable Kubernetes service discovery feature +- `--selector`: One or more label key-value pairs for pod selection (format: key1=value1 key2=value2) +- `--service-discovery-port`: Port to use when generating worker URLs (default: 80) +- `--service-discovery-namespace`: Optional. Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions) + +#### RBAC Requirements + +When using service discovery, you must configure proper Kubernetes RBAC permissions: + +- **If using namespace-scoped discovery** (with `--service-discovery-namespace`): + Set up a ServiceAccount, Role, and RoleBinding + +- **If watching all namespaces** (without specifying namespace): + Set up a ServiceAccount, ClusterRole, and ClusterRoleBinding with permissions to list/watch pods at the cluster level + ### Troubleshooting 1. If rust analyzer is not working in VSCode, set `rust-analyzer.linkedProjects` to the absolute path of `Cargo.toml` in your repo. For example: diff --git a/sgl-router/py_src/sglang_router/launch_router.py b/sgl-router/py_src/sglang_router/launch_router.py index d66acd78a..4826c4465 100644 --- a/sgl-router/py_src/sglang_router/launch_router.py +++ b/sgl-router/py_src/sglang_router/launch_router.py @@ -2,7 +2,7 @@ import argparse import dataclasses import logging import sys -from typing import List, Optional +from typing import Dict, List, Optional from sglang_router import Router from sglang_router_rs import PolicyType @@ -43,6 +43,11 @@ class RouterArgs: max_payload_size: int = 4 * 1024 * 1024 # 4MB verbose: bool = False log_dir: Optional[str] = None + # Service discovery configuration + service_discovery: bool = False + selector: Dict[str, str] = dataclasses.field(default_factory=dict) + service_discovery_port: int = 80 + service_discovery_namespace: Optional[str] = None @staticmethod def add_cli_args( @@ -149,6 +154,28 @@ class RouterArgs: default=None, help="Directory to store log files. If not specified, logs are only output to console.", ) + parser.add_argument( + f"--{prefix}service-discovery", + action="store_true", + help="Enable Kubernetes service discovery", + ) + parser.add_argument( + f"--{prefix}selector", + type=str, + nargs="+", + help="Label selector for Kubernetes service discovery (format: key1=value1 key2=value2)", + ) + parser.add_argument( + f"--{prefix}service-discovery-port", + type=int, + default=RouterArgs.service_discovery_port, + help="Port to use for discovered worker pods", + ) + parser.add_argument( + f"--{prefix}service-discovery-namespace", + type=str, + help="Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions)", + ) @classmethod def from_cli_args( @@ -182,8 +209,26 @@ class RouterArgs: max_payload_size=getattr(args, f"{prefix}max_payload_size"), verbose=getattr(args, f"{prefix}verbose", False), log_dir=getattr(args, f"{prefix}log_dir", None), + service_discovery=getattr(args, f"{prefix}service_discovery", False), + selector=cls._parse_selector(getattr(args, f"{prefix}selector", None)), + service_discovery_port=getattr(args, f"{prefix}service_discovery_port"), + service_discovery_namespace=getattr( + args, f"{prefix}service_discovery_namespace", None + ), ) + @staticmethod + def _parse_selector(selector_list): + if not selector_list: + return {} + + selector = {} + for item in selector_list: + if "=" in item: + key, value = item.split("=", 1) + selector[key] = value + return selector + def policy_from_str(policy_str: str) -> PolicyType: """Convert policy string to PolicyType enum.""" @@ -229,6 +274,10 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: max_payload_size=router_args.max_payload_size, verbose=router_args.verbose, log_dir=router_args.log_dir, + service_discovery=router_args.service_discovery, + selector=router_args.selector, + service_discovery_port=router_args.service_discovery_port, + service_discovery_namespace=router_args.service_discovery_namespace, ) router.start() diff --git a/sgl-router/py_src/sglang_router/router.py b/sgl-router/py_src/sglang_router/router.py index 1c1eee450..3490a2574 100644 --- a/sgl-router/py_src/sglang_router/router.py +++ b/sgl-router/py_src/sglang_router/router.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Dict, List, Optional from sglang_router_rs import PolicyType from sglang_router_rs import Router as _Router @@ -32,6 +32,14 @@ class Router: max_tree_size: Maximum size of the approximation tree for cache-aware routing. Default: 2^24 verbose: Enable verbose logging. Default: False log_dir: Directory to store log files. If None, logs are only output to console. Default: None + service_discovery: Enable Kubernetes service discovery. When enabled, the router will + automatically discover worker pods based on the selector. Default: False + selector: Dictionary mapping of label keys to values for Kubernetes pod selection. + Example: {"app": "sglang-worker"}. Default: {} + service_discovery_port: Port to use for service discovery. The router will generate + worker URLs using this port. Default: 80 + service_discovery_namespace: Kubernetes namespace to watch for pods. If not provided, + watches pods across all namespaces (requires cluster-wide permissions). Default: None """ def __init__( @@ -50,7 +58,14 @@ class Router: max_payload_size: int = 4 * 1024 * 1024, # 4MB verbose: bool = False, log_dir: Optional[str] = None, + service_discovery: bool = False, + selector: Dict[str, str] = None, + service_discovery_port: int = 80, + service_discovery_namespace: Optional[str] = None, ): + if selector is None: + selector = {} + self._router = _Router( worker_urls=worker_urls, policy=policy, @@ -66,6 +81,10 @@ class Router: max_payload_size=max_payload_size, verbose=verbose, log_dir=log_dir, + service_discovery=service_discovery, + selector=selector, + service_discovery_port=service_discovery_port, + service_discovery_namespace=service_discovery_namespace, ) def start(self) -> None: diff --git a/sgl-router/py_test/test_launch_router.py b/sgl-router/py_test/test_launch_router.py index e35532678..c6f0444f4 100644 --- a/sgl-router/py_test/test_launch_router.py +++ b/sgl-router/py_test/test_launch_router.py @@ -38,6 +38,10 @@ class TestLaunchRouter(unittest.TestCase): max_payload_size=4 * 1024 * 1024, # 4MB verbose=False, log_dir=None, + service_discovery=False, + selector=None, + service_discovery_port=80, + service_discovery_namespace=None, ) def create_router_args(self, **kwargs): @@ -79,6 +83,23 @@ class TestLaunchRouter(unittest.TestCase): args = self.create_router_args(worker_urls=[]) self.run_router_process(args) + def test_launch_router_with_service_discovery(self): + # Test router startup with service discovery enabled but no selectors + args = self.create_router_args( + worker_urls=[], service_discovery=True, selector=["app=test-worker"] + ) + self.run_router_process(args) + + def test_launch_router_with_service_discovery_namespace(self): + # Test router startup with service discovery enabled and namespace specified + args = self.create_router_args( + worker_urls=[], + service_discovery=True, + selector=["app=test-worker"], + service_discovery_namespace="test-namespace", + ) + self.run_router_process(args) + if __name__ == "__main__": unittest.main() diff --git a/sgl-router/py_test/test_launch_server.py b/sgl-router/py_test/test_launch_server.py index 09413d15b..33dd3e854 100644 --- a/sgl-router/py_test/test_launch_server.py +++ b/sgl-router/py_test/test_launch_server.py @@ -24,6 +24,10 @@ def popen_launch_router( max_payload_size: int = None, api_key: str = None, log_dir: str = None, + service_discovery: bool = False, + selector: list = None, + service_discovery_port: int = 80, + service_discovery_namespace: str = None, ): """ Launch the router server process. @@ -37,6 +41,10 @@ def popen_launch_router( max_payload_size: Maximum payload size in bytes api_key: API key for the router log_dir: Directory to store log files. If None, logs are only output to console. + service_discovery: Enable Kubernetes service discovery + selector: List of label selectors in format ["key1=value1", "key2=value2"] + service_discovery_port: Port to use for service discovery + service_discovery_namespace: Kubernetes namespace to watch for pods. If None, watches all namespaces. """ _, host, port = base_url.split(":") host = host[2:] @@ -65,6 +73,20 @@ def popen_launch_router( if max_payload_size is not None: command.extend(["--router-max-payload-size", str(max_payload_size)]) + if service_discovery: + command.append("--router-service-discovery") + + if selector: + command.extend(["--router-selector"] + selector) + + if service_discovery_port != 80: + command.extend(["--router-service-discovery-port", str(service_discovery_port)]) + + if service_discovery_namespace: + command.extend( + ["--router-service-discovery-namespace", service_discovery_namespace] + ) + if log_dir is not None: command.extend(["--log-dir", log_dir]) diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 2b570b9ee..4223a53fe 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -1,7 +1,9 @@ use pyo3::prelude::*; pub mod logging; +use std::collections::HashMap; pub mod router; pub mod server; +pub mod service_discovery; pub mod tree; #[pyclass(eq)] @@ -29,6 +31,10 @@ struct Router { max_payload_size: usize, verbose: bool, log_dir: Option, + service_discovery: bool, + selector: HashMap, + service_discovery_port: u16, + service_discovery_namespace: Option, } #[pymethods] @@ -49,6 +55,10 @@ impl Router { max_payload_size = 4 * 1024 * 1024, verbose = false, log_dir = None, + service_discovery = false, + selector = HashMap::new(), + service_discovery_port = 80, + service_discovery_namespace = None ))] fn new( worker_urls: Vec, @@ -65,6 +75,10 @@ impl Router { max_payload_size: usize, verbose: bool, log_dir: Option, + service_discovery: bool, + selector: HashMap, + service_discovery_port: u16, + service_discovery_namespace: Option, ) -> PyResult { Ok(Router { host, @@ -81,6 +95,10 @@ impl Router { max_payload_size, verbose, log_dir, + service_discovery, + selector, + service_discovery_port, + service_discovery_namespace, }) } @@ -105,6 +123,19 @@ impl Router { }, }; + // Create service discovery config if enabled + let service_discovery_config = if self.service_discovery { + Some(service_discovery::ServiceDiscoveryConfig { + enabled: true, + selector: self.selector.clone(), + check_interval: std::time::Duration::from_secs(60), + port: self.service_discovery_port, + namespace: self.service_discovery_namespace.clone(), + }) + } else { + None + }; + actix_web::rt::System::new().block_on(async move { server::startup(server::ServerConfig { host: self.host.clone(), @@ -114,6 +145,7 @@ impl Router { verbose: self.verbose, max_payload_size: self.max_payload_size, log_dir: self.log_dir.clone(), + service_discovery_config, }) .await .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; diff --git a/sgl-router/src/router.rs b/sgl-router/src/router.rs index e722732c7..73160e4d2 100644 --- a/sgl-router/src/router.rs +++ b/sgl-router/src/router.rs @@ -240,6 +240,15 @@ impl Router { }) } + /// Get a reference to the worker URLs shared across threads + pub fn get_worker_urls(&self) -> Arc>> { + match self { + Router::RoundRobin { worker_urls, .. } => Arc::clone(worker_urls), + Router::Random { worker_urls, .. } => Arc::clone(worker_urls), + Router::CacheAware { worker_urls, .. } => Arc::clone(worker_urls), + } + } + fn wait_for_healthy_workers( worker_urls: &[String], timeout_secs: u64, diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index 1eec4a0d2..5993e2f48 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -1,26 +1,30 @@ use crate::logging::{self, LoggingConfig}; use crate::router::PolicyConfig; use crate::router::Router; +use crate::service_discovery::{start_service_discovery, ServiceDiscoveryConfig}; use actix_web::{ error, get, post, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, }; use bytes::Bytes; use futures_util::StreamExt; +use reqwest::Client; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; -use tracing::{info, Level}; +use tokio::spawn; +use tracing::{error, info, warn, Level}; #[derive(Debug)] pub struct AppState { router: Router, - client: reqwest::Client, + client: Client, } impl AppState { pub fn new( worker_urls: Vec, - client: reqwest::Client, + client: Client, policy_config: PolicyConfig, ) -> Result { // Create router based on policy @@ -149,6 +153,7 @@ pub struct ServerConfig { pub verbose: bool, pub max_payload_size: usize, pub log_dir: Option, + pub service_discovery_config: Option, } pub async fn startup(config: ServerConfig) -> std::io::Result<()> { @@ -180,7 +185,15 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { config.max_payload_size / (1024 * 1024) ); - let client = reqwest::Client::builder() + // Log service discovery status + if let Some(service_discovery_config) = &config.service_discovery_config { + info!("🚧 Service discovery enabled"); + info!("🚧 Selector: {:?}", service_discovery_config.selector); + } else { + info!("🚧 Service discovery disabled"); + } + + let client = Client::builder() .pool_idle_timeout(Some(Duration::from_secs(50))) .build() .expect("Failed to create HTTP client"); @@ -194,6 +207,30 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, ); + // Start the service discovery if enabled + if let Some(service_discovery_config) = config.service_discovery_config { + if service_discovery_config.enabled { + let worker_urls = Arc::clone(&app_state.router.get_worker_urls()); + + match start_service_discovery(service_discovery_config, worker_urls).await { + Ok(handle) => { + info!("✅ Service discovery started successfully"); + + // Spawn a task to handle the service discovery thread + spawn(async move { + if let Err(e) = handle.await { + error!("Service discovery task failed: {:?}", e); + } + }); + } + Err(e) => { + error!("Failed to start service discovery: {}", e); + warn!("Continuing without service discovery"); + } + } + } + } + info!("✅ Serving router on {}:{}", config.host, config.port); info!("✅ Serving workers on {:?}", config.worker_urls); diff --git a/sgl-router/src/service_discovery.rs b/sgl-router/src/service_discovery.rs new file mode 100644 index 000000000..74a5f4989 --- /dev/null +++ b/sgl-router/src/service_discovery.rs @@ -0,0 +1,285 @@ +use futures::{StreamExt, TryStreamExt}; +use k8s_openapi::api::core::v1::Pod; +use kube::{ + api::Api, + runtime::watcher::{watcher, Config}, + runtime::WatchStreamExt, + Client, +}; +use log::{error, info, warn}; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; +use tokio::task; +use tokio::time; + +/// Represents the service discovery configuration +#[derive(Debug, Clone)] +pub struct ServiceDiscoveryConfig { + pub enabled: bool, + pub selector: HashMap, + pub check_interval: Duration, + pub port: u16, + pub namespace: Option, +} + +impl Default for ServiceDiscoveryConfig { + fn default() -> Self { + ServiceDiscoveryConfig { + enabled: false, + selector: HashMap::new(), + check_interval: Duration::from_secs(60), + port: 80, // Default port to connect to pods + namespace: None, // None means watch all namespaces + } + } +} + +/// Represents a Kubernetes pod's information used for worker management +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PodInfo { + pub name: String, + pub ip: String, + pub status: String, + pub is_ready: bool, +} + +impl PodInfo { + pub fn from_pod(pod: &Pod) -> Option { + let name = pod.metadata.name.clone()?; + let status = pod.status.clone()?; + let pod_ip = status.pod_ip?; + + let is_ready = if let Some(conditions) = &status.conditions { + conditions + .iter() + .any(|condition| condition.type_ == "Ready" && condition.status == "True") + } else { + false + }; + + let pod_status = status.phase.unwrap_or_else(|| "Unknown".to_string()); + + Some(PodInfo { + name, + ip: pod_ip, + status: pod_status, + is_ready, + }) + } + + /// Returns true if the pod is in a state where it can accept traffic + pub fn is_healthy(&self) -> bool { + self.is_ready && self.status == "Running" + } + + /// Generates a worker URL for this pod + pub fn worker_url(&self, port: u16) -> String { + format!("http://{}:{}", self.ip, port) + } +} + +pub async fn start_service_discovery( + config: ServiceDiscoveryConfig, + worker_urls: Arc>>, +) -> Result, kube::Error> { + // Don't initialize anything if service discovery is disabled + if !config.enabled { + // Return a generic error when service discovery is disabled + return Err(kube::Error::Api(kube::error::ErrorResponse { + status: "Disabled".to_string(), + message: "Service discovery is disabled".to_string(), + reason: "ConfigurationError".to_string(), + code: 400, + })); + } + + // Initialize Kubernetes client + let client = Client::try_default().await?; + + // Construct label selector string from map + let label_selector = config + .selector + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(","); + + info!( + "Starting Kubernetes service discovery with selector: {}", + label_selector + ); + + // Create the task that will run in the background + let handle = task::spawn(async move { + // We'll track pods we've already added to avoid duplicates + let tracked_pods = Arc::new(Mutex::new(HashSet::new())); + + // Create a watcher for pods + let pods: Api = if let Some(namespace) = &config.namespace { + Api::namespaced(client, namespace) + } else { + Api::all(client) + }; + + info!("Kubernetes service discovery initialized successfully"); + + // Create an Arc for the selector map + let selector = Arc::new(config.selector); + let port = config.port; + + loop { + // Create a watcher with the proper parameters according to the kube-rs API + let watcher_config = Config::default(); + let watcher_stream = watcher(pods.clone(), watcher_config).applied_objects(); + + // Clone Arcs for the closures + let selector_clone = Arc::clone(&selector); + let tracked_pods_clone = Arc::clone(&tracked_pods); + let worker_urls_clone = Arc::clone(&worker_urls); + + // Apply label selector filter separately since we can't do it directly with the watcher anymore + let filtered_stream = watcher_stream.filter_map(move |obj_res| { + let selector_inner = Arc::clone(&selector_clone); + + async move { + match obj_res { + Ok(pod) => { + // Only process pods matching our label selector + if pod.metadata.labels.as_ref().map_or(false, |labels| { + // Check if the pod has all the labels from our selector + selector_inner.iter().all(|(k, v)| { + labels.get(k).map_or(false, |label_value| label_value == v) + }) + }) { + Some(Ok(pod)) + } else { + None + } + } + Err(e) => Some(Err(e)), + } + } + }); + + // Clone again for the next closure + let tracked_pods_clone2 = Arc::clone(&tracked_pods_clone); + let worker_urls_clone2 = Arc::clone(&worker_urls_clone); + + match filtered_stream + .try_for_each(move |pod| { + let tracked_pods_inner = Arc::clone(&tracked_pods_clone2); + let worker_urls_inner = Arc::clone(&worker_urls_clone2); + + async move { + if let Some(pod_info) = PodInfo::from_pod(&pod) { + if pod.metadata.deletion_timestamp.is_some() { + handle_pod_deletion( + &pod_info, + tracked_pods_inner, + worker_urls_inner, + port, + ) + .await; + } else { + handle_pod_event( + &pod_info, + tracked_pods_inner, + worker_urls_inner, + port, + ) + .await; + } + } + Ok(()) + } + }) + .await + { + Ok(_) => {} + Err(err) => { + error!("Error in Kubernetes watcher: {}", err); + // Wait a bit before retrying + time::sleep(Duration::from_secs(5)).await; + } + } + + // If the watcher exits for some reason, wait a bit before restarting + warn!( + "Kubernetes watcher exited, restarting in {} seconds", + config.check_interval.as_secs() + ); + time::sleep(config.check_interval).await; + } + }); + + Ok(handle) +} + +async fn handle_pod_event( + pod_info: &PodInfo, + tracked_pods: Arc>>, + worker_urls: Arc>>, + port: u16, +) { + let worker_url = pod_info.worker_url(port); + + // Check if pod is already tracked + let already_tracked = { + let tracker = tracked_pods.lock().unwrap(); + tracker.contains(pod_info) + }; + + // If pod is healthy and not already tracked, add it + if pod_info.is_healthy() { + if !already_tracked { + info!( + "Adding healthy pod {} ({}) as worker", + pod_info.name, pod_info.ip + ); + + // Add URL to worker list + let mut urls = worker_urls.write().unwrap(); + if !urls.contains(&worker_url) { + urls.push(worker_url.clone()); + info!("Added new worker URL: {}", worker_url); + } + + // Track this pod + let mut tracker = tracked_pods.lock().unwrap(); + tracker.insert(pod_info.clone()); + } + } else if already_tracked { + // If pod was healthy before but not anymore, remove it + handle_pod_deletion(pod_info, tracked_pods, worker_urls, port).await; + } +} + +async fn handle_pod_deletion( + pod_info: &PodInfo, + tracked_pods: Arc>>, + worker_urls: Arc>>, + port: u16, +) { + let worker_url = pod_info.worker_url(port); + + // Remove the pod from our tracking + let was_tracked = { + let mut tracker = tracked_pods.lock().unwrap(); + tracker.remove(pod_info) + }; + + if was_tracked { + info!( + "Removing pod {} ({}) from workers", + pod_info.name, pod_info.ip + ); + + // Remove URL from worker list + let mut urls = worker_urls.write().unwrap(); + if let Some(idx) = urls.iter().position(|url| url == &worker_url) { + urls.remove(idx); + info!("Removed worker URL: {}", worker_url); + } + } +}