add jemalloc and /debug/memstats endpoint for OOM diagnosis (#885)
Some checks are pending
CI / pre-commit (push) Waiting to run
CI / plano-tools-tests (push) Waiting to run
CI / native-smoke-test (push) Waiting to run
CI / docker-build (push) Waiting to run
CI / validate-config (push) Waiting to run
CI / security-scan (push) Blocked by required conditions
CI / test-prompt-gateway (push) Blocked by required conditions
CI / test-model-alias-routing (push) Blocked by required conditions
CI / test-responses-api-with-state (push) Blocked by required conditions
CI / e2e-plano-tests (3.10) (push) Blocked by required conditions
CI / e2e-plano-tests (3.11) (push) Blocked by required conditions
CI / e2e-plano-tests (3.12) (push) Blocked by required conditions
CI / e2e-plano-tests (3.13) (push) Blocked by required conditions
CI / e2e-plano-tests (3.14) (push) Blocked by required conditions
CI / e2e-demo-preference (push) Blocked by required conditions
CI / e2e-demo-currency (push) Blocked by required conditions
Publish docker image (latest) / build-arm64 (push) Waiting to run
Publish docker image (latest) / build-amd64 (push) Waiting to run
Publish docker image (latest) / create-manifest (push) Blocked by required conditions
Build and Deploy Documentation / build (push) Waiting to run

This commit is contained in:
Adil Hafeez 2026-04-23 13:59:12 -07:00 committed by GitHub
parent c8079ac971
commit aa726b1bba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 371 additions and 0 deletions

39
crates/Cargo.lock generated
View file

@ -366,6 +366,8 @@ dependencies = [
"serde_yaml", "serde_yaml",
"strsim", "strsim",
"thiserror 2.0.18", "thiserror 2.0.18",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"time", "time",
"tokio", "tokio",
"tokio-postgres", "tokio-postgres",
@ -2323,6 +2325,12 @@ dependencies = [
"windows-link", "windows-link",
] ]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.3.2" version = "2.3.2"
@ -3566,6 +3574,37 @@ dependencies = [
"rustc-hash 1.1.0", "rustc-hash 1.1.0",
] ]
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c"
dependencies = [
"libc",
"paste",
"tikv-jemalloc-sys",
]
[[package]]
name = "tikv-jemalloc-sys"
version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "tikv-jemallocator"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a"
dependencies = [
"libc",
"tikv-jemalloc-sys",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.47" version = "0.3.47"

View file

@ -3,6 +3,10 @@ name = "brightstaff"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[features]
default = ["jemalloc"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
[[bin]] [[bin]]
name = "brightstaff" name = "brightstaff"
path = "src/main.rs" path = "src/main.rs"
@ -47,6 +51,8 @@ serde_with = "3.13.0"
strsim = "0.11" strsim = "0.11"
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
thiserror = "2.0.12" thiserror = "2.0.12"
tikv-jemallocator = { version = "0.6", optional = true }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"], optional = true }
tokio = { version = "1.44.2", features = ["full"] } tokio = { version = "1.44.2", features = ["full"] }
tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] } tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
tokio-stream = "0.1" tokio-stream = "0.1"

View file

@ -0,0 +1,53 @@
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use hyper::{Response, StatusCode};
use super::full;
#[derive(serde::Serialize)]
struct MemStats {
allocated_bytes: usize,
resident_bytes: usize,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
/// Returns jemalloc memory statistics as JSON.
/// Falls back to a stub when the jemalloc feature is disabled.
pub async fn memstats() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let stats = get_jemalloc_stats();
let json = serde_json::to_string(&stats).unwrap();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(full(json))
.unwrap())
}
#[cfg(feature = "jemalloc")]
fn get_jemalloc_stats() -> MemStats {
use tikv_jemalloc_ctl::{epoch, stats};
if let Err(e) = epoch::advance() {
return MemStats {
allocated_bytes: 0,
resident_bytes: 0,
error: Some(format!("failed to advance jemalloc epoch: {e}")),
};
}
MemStats {
allocated_bytes: stats::allocated::read().unwrap_or(0),
resident_bytes: stats::resident::read().unwrap_or(0),
error: None,
}
}
#[cfg(not(feature = "jemalloc"))]
fn get_jemalloc_stats() -> MemStats {
MemStats {
allocated_bytes: 0,
resident_bytes: 0,
error: Some("jemalloc feature not enabled".to_string()),
}
}

View file

@ -1,4 +1,5 @@
pub mod agents; pub mod agents;
pub mod debug;
pub mod function_calling; pub mod function_calling;
pub mod llm; pub mod llm;
pub mod models; pub mod models;

View file

@ -1,5 +1,10 @@
#[cfg(feature = "jemalloc")]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use brightstaff::app_state::AppState; use brightstaff::app_state::AppState;
use brightstaff::handlers::agents::orchestrator::agent_chat; use brightstaff::handlers::agents::orchestrator::agent_chat;
use brightstaff::handlers::debug;
use brightstaff::handlers::empty; use brightstaff::handlers::empty;
use brightstaff::handlers::function_calling::function_calling_chat_handler; use brightstaff::handlers::function_calling::function_calling_chat_handler;
use brightstaff::handlers::llm::llm_chat; use brightstaff::handlers::llm::llm_chat;
@ -513,6 +518,7 @@ async fn dispatch(
Ok(list_models(Arc::clone(&state.llm_providers)).await) Ok(list_models(Arc::clone(&state.llm_providers)).await)
} }
(&Method::OPTIONS, "/v1/models" | "/agents/v1/models") => cors_preflight(), (&Method::OPTIONS, "/v1/models" | "/agents/v1/models") => cors_preflight(),
(&Method::GET, "/debug/memstats") => debug::memstats().await,
_ => { _ => {
debug!(method = %req.method(), path = %path, "no route found"); debug!(method = %req.method(), path = %path, "no route found");
let mut not_found = Response::new(empty()); let mut not_found = Response::new(empty());

View file

@ -3,3 +3,5 @@ pub mod model_metrics;
pub mod orchestrator; pub mod orchestrator;
pub mod orchestrator_model; pub mod orchestrator_model;
pub mod orchestrator_model_v1; pub mod orchestrator_model_v1;
#[cfg(test)]
mod stress_tests;

View file

@ -0,0 +1,264 @@
#[cfg(test)]
mod tests {
use crate::router::orchestrator::OrchestratorService;
use crate::session_cache::memory::MemorySessionCache;
use common::configuration::{SelectionPolicy, SelectionPreference, TopLevelRoutingPreference};
use hermesllm::apis::openai::{Message, MessageContent, Role};
use std::sync::Arc;
fn make_messages(n: usize) -> Vec<Message> {
(0..n)
.map(|i| Message {
role: if i % 2 == 0 {
Role::User
} else {
Role::Assistant
},
content: Some(MessageContent::Text(format!(
"This is message number {i} with some padding text to make it realistic."
))),
name: None,
tool_calls: None,
tool_call_id: None,
})
.collect()
}
fn make_routing_prefs() -> Vec<TopLevelRoutingPreference> {
vec![
TopLevelRoutingPreference {
name: "code_generation".to_string(),
description: "Code generation and debugging tasks".to_string(),
models: vec![
"openai/gpt-4o".to_string(),
"openai/gpt-4o-mini".to_string(),
],
selection_policy: SelectionPolicy {
prefer: SelectionPreference::None,
},
},
TopLevelRoutingPreference {
name: "summarization".to_string(),
description: "Summarizing documents and text".to_string(),
models: vec![
"anthropic/claude-3-sonnet".to_string(),
"openai/gpt-4o-mini".to_string(),
],
selection_policy: SelectionPolicy {
prefer: SelectionPreference::None,
},
},
]
}
/// Stress test: exercise the full routing code path N times using a mock
/// HTTP server and measure jemalloc allocated bytes before/after.
///
/// This catches:
/// - Memory leaks in generate_request / parse_response
/// - Leaks in reqwest connection handling
/// - String accumulation in the orchestrator model
/// - Fragmentation (jemalloc allocated vs resident)
#[tokio::test]
async fn stress_test_routing_determine_route() {
let mut server = mockito::Server::new_async().await;
let router_url = format!("{}/v1/chat/completions", server.url());
let mock_response = serde_json::json!({
"id": "chatcmpl-mock",
"object": "chat.completion",
"created": 1234567890,
"model": "plano-orchestrator",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "{\"route\": \"code_generation\"}"
},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 100, "completion_tokens": 10, "total_tokens": 110}
});
let _mock = server
.mock("POST", "/v1/chat/completions")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(mock_response.to_string())
.expect_at_least(1)
.create_async()
.await;
let prefs = make_routing_prefs();
let session_cache = Arc::new(MemorySessionCache::new(1000));
let orchestrator_service = Arc::new(OrchestratorService::with_routing(
router_url,
"Plano-Orchestrator".to_string(),
"plano-orchestrator".to_string(),
Some(prefs.clone()),
None,
None,
session_cache,
None,
2048,
));
// Warm up: a few requests to stabilize allocator state
for _ in 0..10 {
let msgs = make_messages(5);
let _ = orchestrator_service
.determine_route(&msgs, None, "warmup")
.await;
}
// Snapshot memory after warmup
let baseline = get_allocated();
let num_iterations = 2000;
for i in 0..num_iterations {
let msgs = make_messages(5 + (i % 10));
let inline = if i % 3 == 0 {
Some(make_routing_prefs())
} else {
None
};
let _ = orchestrator_service
.determine_route(&msgs, inline, &format!("req-{i}"))
.await;
}
let after = get_allocated();
let growth = after.saturating_sub(baseline);
let growth_mb = growth as f64 / (1024.0 * 1024.0);
let per_request = if num_iterations > 0 {
growth / num_iterations
} else {
0
};
eprintln!("=== Routing Stress Test Results ===");
eprintln!(" Iterations: {num_iterations}");
eprintln!(" Baseline alloc: {} bytes", baseline);
eprintln!(" Final alloc: {} bytes", after);
eprintln!(" Growth: {} bytes ({growth_mb:.2} MB)", growth);
eprintln!(" Per-request: {} bytes", per_request);
// Allow up to 256 bytes per request of retained growth (connection pool, etc.)
// A true leak would show thousands of bytes per request.
assert!(
per_request < 256,
"Possible memory leak: {per_request} bytes/request retained after {num_iterations} iterations"
);
}
/// Stress test with high concurrency: many parallel determine_route calls.
#[tokio::test]
async fn stress_test_routing_concurrent() {
let mut server = mockito::Server::new_async().await;
let router_url = format!("{}/v1/chat/completions", server.url());
let mock_response = serde_json::json!({
"id": "chatcmpl-mock",
"object": "chat.completion",
"created": 1234567890,
"model": "plano-orchestrator",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "{\"route\": \"summarization\"}"
},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 100, "completion_tokens": 10, "total_tokens": 110}
});
let _mock = server
.mock("POST", "/v1/chat/completions")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(mock_response.to_string())
.expect_at_least(1)
.create_async()
.await;
let prefs = make_routing_prefs();
let session_cache = Arc::new(MemorySessionCache::new(1000));
let orchestrator_service = Arc::new(OrchestratorService::with_routing(
router_url,
"Plano-Orchestrator".to_string(),
"plano-orchestrator".to_string(),
Some(prefs),
None,
None,
session_cache,
None,
2048,
));
// Warm up
for _ in 0..20 {
let msgs = make_messages(3);
let _ = orchestrator_service
.determine_route(&msgs, None, "warmup")
.await;
}
let baseline = get_allocated();
let concurrency = 50;
let requests_per_task = 100;
let total = concurrency * requests_per_task;
let mut handles = vec![];
for t in 0..concurrency {
let svc = Arc::clone(&orchestrator_service);
let handle = tokio::spawn(async move {
for r in 0..requests_per_task {
let msgs = make_messages(3 + (r % 8));
let _ = svc
.determine_route(&msgs, None, &format!("req-{t}-{r}"))
.await;
}
});
handles.push(handle);
}
for h in handles {
h.await.unwrap();
}
let after = get_allocated();
let growth = after.saturating_sub(baseline);
let per_request = growth / total;
eprintln!("=== Concurrent Routing Stress Test Results ===");
eprintln!(" Tasks: {concurrency} x {requests_per_task} = {total}");
eprintln!(" Baseline: {} bytes", baseline);
eprintln!(" Final: {} bytes", after);
eprintln!(
" Growth: {} bytes ({:.2} MB)",
growth,
growth as f64 / 1_048_576.0
);
eprintln!(" Per-request: {} bytes", per_request);
assert!(
per_request < 512,
"Possible memory leak under concurrency: {per_request} bytes/request retained after {total} requests"
);
}
#[cfg(feature = "jemalloc")]
fn get_allocated() -> usize {
tikv_jemalloc_ctl::epoch::advance().unwrap();
tikv_jemalloc_ctl::stats::allocated::read().unwrap_or(0)
}
#[cfg(not(feature = "jemalloc"))]
fn get_allocated() -> usize {
0
}
}