diff --git a/crates/Cargo.lock b/crates/Cargo.lock index 4ca39964..39261d67 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -366,6 +366,8 @@ dependencies = [ "serde_yaml", "strsim", "thiserror 2.0.18", + "tikv-jemalloc-ctl", + "tikv-jemallocator", "time", "tokio", "tokio-postgres", @@ -2323,6 +2325,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "percent-encoding" version = "2.3.2" @@ -3566,6 +3574,37 @@ dependencies = [ "rustc-hash 1.1.0", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.47" diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index 0de89a72..d2635963 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -3,6 +3,10 @@ name = "brightstaff" version = "0.1.0" edition = "2021" +[features] +default = ["jemalloc"] +jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] + [[bin]] name = "brightstaff" path = "src/main.rs" @@ -47,6 +51,8 @@ serde_with = "3.13.0" strsim = "0.11" serde_yaml = "0.9.34" thiserror = "2.0.12" +tikv-jemallocator = { version = "0.6", optional = true } +tikv-jemalloc-ctl = { version = "0.6", features = ["stats"], optional = true } tokio = { version = "1.44.2", features = ["full"] } tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] } tokio-stream = "0.1" diff --git a/crates/brightstaff/src/handlers/debug.rs b/crates/brightstaff/src/handlers/debug.rs new file mode 100644 index 00000000..58fbecd2 --- /dev/null +++ b/crates/brightstaff/src/handlers/debug.rs @@ -0,0 +1,53 @@ +use bytes::Bytes; +use http_body_util::combinators::BoxBody; +use hyper::{Response, StatusCode}; + +use super::full; + +#[derive(serde::Serialize)] +struct MemStats { + allocated_bytes: usize, + resident_bytes: usize, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, +} + +/// Returns jemalloc memory statistics as JSON. +/// Falls back to a stub when the jemalloc feature is disabled. +pub async fn memstats() -> Result>, hyper::Error> { + let stats = get_jemalloc_stats(); + let json = serde_json::to_string(&stats).unwrap(); + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(full(json)) + .unwrap()) +} + +#[cfg(feature = "jemalloc")] +fn get_jemalloc_stats() -> MemStats { + use tikv_jemalloc_ctl::{epoch, stats}; + + if let Err(e) = epoch::advance() { + return MemStats { + allocated_bytes: 0, + resident_bytes: 0, + error: Some(format!("failed to advance jemalloc epoch: {e}")), + }; + } + + MemStats { + allocated_bytes: stats::allocated::read().unwrap_or(0), + resident_bytes: stats::resident::read().unwrap_or(0), + error: None, + } +} + +#[cfg(not(feature = "jemalloc"))] +fn get_jemalloc_stats() -> MemStats { + MemStats { + allocated_bytes: 0, + resident_bytes: 0, + error: Some("jemalloc feature not enabled".to_string()), + } +} diff --git a/crates/brightstaff/src/handlers/mod.rs b/crates/brightstaff/src/handlers/mod.rs index 485a0438..4e851264 100644 --- a/crates/brightstaff/src/handlers/mod.rs +++ b/crates/brightstaff/src/handlers/mod.rs @@ -1,4 +1,5 @@ pub mod agents; +pub mod debug; pub mod function_calling; pub mod llm; pub mod models; diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 2d610d6f..b1e17e42 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -1,5 +1,10 @@ +#[cfg(feature = "jemalloc")] +#[global_allocator] +static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + use brightstaff::app_state::AppState; use brightstaff::handlers::agents::orchestrator::agent_chat; +use brightstaff::handlers::debug; use brightstaff::handlers::empty; use brightstaff::handlers::function_calling::function_calling_chat_handler; use brightstaff::handlers::llm::llm_chat; @@ -513,6 +518,7 @@ async fn dispatch( Ok(list_models(Arc::clone(&state.llm_providers)).await) } (&Method::OPTIONS, "/v1/models" | "/agents/v1/models") => cors_preflight(), + (&Method::GET, "/debug/memstats") => debug::memstats().await, _ => { debug!(method = %req.method(), path = %path, "no route found"); let mut not_found = Response::new(empty()); diff --git a/crates/brightstaff/src/router/mod.rs b/crates/brightstaff/src/router/mod.rs index 2ef0d11a..0f48c090 100644 --- a/crates/brightstaff/src/router/mod.rs +++ b/crates/brightstaff/src/router/mod.rs @@ -3,3 +3,5 @@ pub mod model_metrics; pub mod orchestrator; pub mod orchestrator_model; pub mod orchestrator_model_v1; +#[cfg(test)] +mod stress_tests; diff --git a/crates/brightstaff/src/router/stress_tests.rs b/crates/brightstaff/src/router/stress_tests.rs new file mode 100644 index 00000000..63c4112f --- /dev/null +++ b/crates/brightstaff/src/router/stress_tests.rs @@ -0,0 +1,264 @@ +#[cfg(test)] +mod tests { + use crate::router::orchestrator::OrchestratorService; + use crate::session_cache::memory::MemorySessionCache; + use common::configuration::{SelectionPolicy, SelectionPreference, TopLevelRoutingPreference}; + use hermesllm::apis::openai::{Message, MessageContent, Role}; + use std::sync::Arc; + + fn make_messages(n: usize) -> Vec { + (0..n) + .map(|i| Message { + role: if i % 2 == 0 { + Role::User + } else { + Role::Assistant + }, + content: Some(MessageContent::Text(format!( + "This is message number {i} with some padding text to make it realistic." + ))), + name: None, + tool_calls: None, + tool_call_id: None, + }) + .collect() + } + + fn make_routing_prefs() -> Vec { + vec![ + TopLevelRoutingPreference { + name: "code_generation".to_string(), + description: "Code generation and debugging tasks".to_string(), + models: vec![ + "openai/gpt-4o".to_string(), + "openai/gpt-4o-mini".to_string(), + ], + selection_policy: SelectionPolicy { + prefer: SelectionPreference::None, + }, + }, + TopLevelRoutingPreference { + name: "summarization".to_string(), + description: "Summarizing documents and text".to_string(), + models: vec![ + "anthropic/claude-3-sonnet".to_string(), + "openai/gpt-4o-mini".to_string(), + ], + selection_policy: SelectionPolicy { + prefer: SelectionPreference::None, + }, + }, + ] + } + + /// Stress test: exercise the full routing code path N times using a mock + /// HTTP server and measure jemalloc allocated bytes before/after. + /// + /// This catches: + /// - Memory leaks in generate_request / parse_response + /// - Leaks in reqwest connection handling + /// - String accumulation in the orchestrator model + /// - Fragmentation (jemalloc allocated vs resident) + #[tokio::test] + async fn stress_test_routing_determine_route() { + let mut server = mockito::Server::new_async().await; + let router_url = format!("{}/v1/chat/completions", server.url()); + + let mock_response = serde_json::json!({ + "id": "chatcmpl-mock", + "object": "chat.completion", + "created": 1234567890, + "model": "plano-orchestrator", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": "{\"route\": \"code_generation\"}" + }, + "finish_reason": "stop" + }], + "usage": {"prompt_tokens": 100, "completion_tokens": 10, "total_tokens": 110} + }); + + let _mock = server + .mock("POST", "/v1/chat/completions") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(mock_response.to_string()) + .expect_at_least(1) + .create_async() + .await; + + let prefs = make_routing_prefs(); + let session_cache = Arc::new(MemorySessionCache::new(1000)); + let orchestrator_service = Arc::new(OrchestratorService::with_routing( + router_url, + "Plano-Orchestrator".to_string(), + "plano-orchestrator".to_string(), + Some(prefs.clone()), + None, + None, + session_cache, + None, + 2048, + )); + + // Warm up: a few requests to stabilize allocator state + for _ in 0..10 { + let msgs = make_messages(5); + let _ = orchestrator_service + .determine_route(&msgs, None, "warmup") + .await; + } + + // Snapshot memory after warmup + let baseline = get_allocated(); + + let num_iterations = 2000; + + for i in 0..num_iterations { + let msgs = make_messages(5 + (i % 10)); + let inline = if i % 3 == 0 { + Some(make_routing_prefs()) + } else { + None + }; + let _ = orchestrator_service + .determine_route(&msgs, inline, &format!("req-{i}")) + .await; + } + + let after = get_allocated(); + + let growth = after.saturating_sub(baseline); + let growth_mb = growth as f64 / (1024.0 * 1024.0); + let per_request = if num_iterations > 0 { + growth / num_iterations + } else { + 0 + }; + + eprintln!("=== Routing Stress Test Results ==="); + eprintln!(" Iterations: {num_iterations}"); + eprintln!(" Baseline alloc: {} bytes", baseline); + eprintln!(" Final alloc: {} bytes", after); + eprintln!(" Growth: {} bytes ({growth_mb:.2} MB)", growth); + eprintln!(" Per-request: {} bytes", per_request); + + // Allow up to 256 bytes per request of retained growth (connection pool, etc.) + // A true leak would show thousands of bytes per request. + assert!( + per_request < 256, + "Possible memory leak: {per_request} bytes/request retained after {num_iterations} iterations" + ); + } + + /// Stress test with high concurrency: many parallel determine_route calls. + #[tokio::test] + async fn stress_test_routing_concurrent() { + let mut server = mockito::Server::new_async().await; + let router_url = format!("{}/v1/chat/completions", server.url()); + + let mock_response = serde_json::json!({ + "id": "chatcmpl-mock", + "object": "chat.completion", + "created": 1234567890, + "model": "plano-orchestrator", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": "{\"route\": \"summarization\"}" + }, + "finish_reason": "stop" + }], + "usage": {"prompt_tokens": 100, "completion_tokens": 10, "total_tokens": 110} + }); + + let _mock = server + .mock("POST", "/v1/chat/completions") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(mock_response.to_string()) + .expect_at_least(1) + .create_async() + .await; + + let prefs = make_routing_prefs(); + let session_cache = Arc::new(MemorySessionCache::new(1000)); + let orchestrator_service = Arc::new(OrchestratorService::with_routing( + router_url, + "Plano-Orchestrator".to_string(), + "plano-orchestrator".to_string(), + Some(prefs), + None, + None, + session_cache, + None, + 2048, + )); + + // Warm up + for _ in 0..20 { + let msgs = make_messages(3); + let _ = orchestrator_service + .determine_route(&msgs, None, "warmup") + .await; + } + + let baseline = get_allocated(); + + let concurrency = 50; + let requests_per_task = 100; + let total = concurrency * requests_per_task; + + let mut handles = vec![]; + for t in 0..concurrency { + let svc = Arc::clone(&orchestrator_service); + let handle = tokio::spawn(async move { + for r in 0..requests_per_task { + let msgs = make_messages(3 + (r % 8)); + let _ = svc + .determine_route(&msgs, None, &format!("req-{t}-{r}")) + .await; + } + }); + handles.push(handle); + } + + for h in handles { + h.await.unwrap(); + } + + let after = get_allocated(); + let growth = after.saturating_sub(baseline); + let per_request = growth / total; + + eprintln!("=== Concurrent Routing Stress Test Results ==="); + eprintln!(" Tasks: {concurrency} x {requests_per_task} = {total}"); + eprintln!(" Baseline: {} bytes", baseline); + eprintln!(" Final: {} bytes", after); + eprintln!( + " Growth: {} bytes ({:.2} MB)", + growth, + growth as f64 / 1_048_576.0 + ); + eprintln!(" Per-request: {} bytes", per_request); + + assert!( + per_request < 512, + "Possible memory leak under concurrency: {per_request} bytes/request retained after {total} requests" + ); + } + + #[cfg(feature = "jemalloc")] + fn get_allocated() -> usize { + tikv_jemalloc_ctl::epoch::advance().unwrap(); + tikv_jemalloc_ctl::stats::allocated::read().unwrap_or(0) + } + + #[cfg(not(feature = "jemalloc"))] + fn get_allocated() -> usize { + 0 + } +}