diff --git a/crates/Cargo.lock b/crates/Cargo.lock index c5819de9..7001e810 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -4106,6 +4106,7 @@ dependencies = [ "getrandom 0.4.2", "js-sys", "serde_core", + "sha1_smol", "wasm-bindgen", ] diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index 0b62c313..ac17c9a6 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -60,7 +60,7 @@ time = { version = "0.3", features = ["formatting", "macros"] } tracing = "0.1" tracing-opentelemetry = "0.32.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -uuid = { version = "1.0", features = ["v4", "serde"] } +uuid = { version = "1.0", features = ["v4", "v5", "serde"] } [dev-dependencies] mockito = "1.0" diff --git a/crates/brightstaff/src/handlers/claude_cli/process.rs b/crates/brightstaff/src/handlers/claude_cli/process.rs index 6a19943c..e93642fa 100644 --- a/crates/brightstaff/src/handlers/claude_cli/process.rs +++ b/crates/brightstaff/src/handlers/claude_cli/process.rs @@ -4,7 +4,7 @@ //! and stream-json lives in `hermesllm::apis::claude_cli`. use std::process::Stdio; -use std::sync::Arc; +use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; use hermesllm::apis::claude_cli::{parse_ndjson_line, ClaudeCliEvent, ClaudeCliInputEvent}; @@ -56,6 +56,12 @@ pub enum ProcessError { StdinWrite(#[source] std::io::Error), #[error("claude process exited unexpectedly")] ExitedEarly, + /// `Command::spawn` succeeded but a piped stdio handle was already taken + /// by the time we asked for it. Should be unreachable given we set + /// `Stdio::piped()` immediately before spawn; surfaced as its own variant + /// so callers can tell it apart from a real "exited early". + #[error("claude child is missing piped {which} after spawn")] + MissingStdio { which: &'static str }, #[error("claude watchdog fired after {0:?} of silence")] WatchdogTimeout(Duration), #[error("failed to serialize stdin payload: {0}")] @@ -94,8 +100,11 @@ pub struct ClaudeProcess { event_rx: Arc>>, config: ClaudeCliConfig, /// Last time a request was served on this session — used by the session - /// manager to enforce the idle TTL. - last_used: Mutex, + /// manager to enforce the idle TTL. Held under a sync mutex because the + /// critical section is one read/write of a `Copy` value with no `.await`, + /// which keeps `SessionManager` callers from holding the session-map lock + /// across an async hop. + last_used: StdMutex, pub session_id: String, } @@ -148,9 +157,18 @@ impl ClaudeProcess { source: e, })?; - let stdin = child.stdin.take().ok_or(ProcessError::ExitedEarly)?; - let stdout = child.stdout.take().ok_or(ProcessError::ExitedEarly)?; - let stderr = child.stderr.take().ok_or(ProcessError::ExitedEarly)?; + let stdin = child + .stdin + .take() + .ok_or(ProcessError::MissingStdio { which: "stdin" })?; + let stdout = child + .stdout + .take() + .ok_or(ProcessError::MissingStdio { which: "stdout" })?; + let stderr = child + .stderr + .take() + .ok_or(ProcessError::MissingStdio { which: "stderr" })?; // Bounded channel — backpressure if the consumer is slow, but large // enough that bursts of small text deltas do not block stdout drain. @@ -217,7 +235,7 @@ impl ClaudeProcess { stdin: Mutex::new(Some(stdin)), event_rx: Arc::new(Mutex::new(rx)), config, - last_used: Mutex::new(Instant::now()), + last_used: StdMutex::new(Instant::now()), session_id, })) } @@ -232,7 +250,10 @@ impl ClaudeProcess { &self, events: &[ClaudeCliInputEvent], ) -> Result { - *self.last_used.lock().await = Instant::now(); + // Sync lock + Copy value; never held across an `.await`. + if let Ok(mut last) = self.last_used.lock() { + *last = Instant::now(); + } // Claim the event receiver for the lifetime of this turn. let rx_guard = Arc::clone(&self.event_rx) @@ -259,8 +280,17 @@ impl ClaudeProcess { } /// Most-recent activity timestamp; used by the session manager's reaper. - pub async fn last_used(&self) -> Instant { - *self.last_used.lock().await + /// Sync because the lock guards a single `Instant` with no `.await` in + /// the critical section — keeps callers from holding async locks across + /// an await point. + pub fn last_used(&self) -> Instant { + // Poisoning is impossible here (the only writer is `send_user_turn` + // which never panics while holding the lock), but if it ever happens + // we degrade gracefully rather than aborting. + self.last_used + .lock() + .map(|g| *g) + .unwrap_or_else(|p| *p.into_inner()) } /// Forcefully terminate the child. Safe to call multiple times. diff --git a/crates/brightstaff/src/handlers/claude_cli/session.rs b/crates/brightstaff/src/handlers/claude_cli/session.rs index cd664cab..3b102345 100644 --- a/crates/brightstaff/src/handlers/claude_cli/session.rs +++ b/crates/brightstaff/src/handlers/claude_cli/session.rs @@ -3,10 +3,8 @@ //! long-lived `ClaudeProcess`. Enforces an idle TTL and a hard cap on the //! number of concurrent sessions. -use std::collections::{hash_map::DefaultHasher, HashMap}; -use std::hash::{Hash, Hasher}; +use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use hermesllm::apis::anthropic::{ MessagesContentBlock, MessagesMessageContent, MessagesRequest, MessagesRole, @@ -75,15 +73,19 @@ impl SessionManager { return uuid_from_seed(trimmed); } } - let mut hasher = DefaultHasher::new(); - req.model.hash(&mut hasher); + // Build a deterministic seed from (model, system_prompt, first user + // message) so a retried conversation lands on the same session. + let mut seed = String::new(); + seed.push_str(&req.model); + seed.push('\u{1f}'); if let Some(system) = &req.system { - system_text(system).hash(&mut hasher); + seed.push_str(&system_text(system)); } + seed.push('\u{1f}'); if let Some(first) = first_user_message_text(req) { - first.hash(&mut hasher); + seed.push_str(&first); } - uuid_from_seed(&hasher.finish().to_string()) + uuid_from_seed(&seed) } /// Get the existing session's process or spawn a new one. @@ -98,40 +100,61 @@ impl SessionManager { // background task for the common one-developer-one-laptop deployment. self.evict_idle().await; - { - let map = self.inner.lock().await; - if let Some(existing) = map.get(session_id) { - debug!(session = %session_id, "reusing claude-cli session"); - return Ok(Arc::clone(existing)); - } - } - + // Single lock acquisition for the whole get-or-spawn path. `last_used` + // is now a sync mutex on `ClaudeProcess`, so iterating to find the + // LRU victim does not block other tasks across an `.await`. let mut map = self.inner.lock().await; + if let Some(existing) = map.get(session_id) { + debug!(session = %session_id, "reusing claude-cli session"); return Ok(Arc::clone(existing)); } - if map.len() >= self.config.max_sessions { - // Evict the least-recently-used session to keep the cap honest. - if let Some(victim_key) = lru_session_id(&map).await { - if let Some(victim) = map.remove(&victim_key) { - info!(session = %victim_key, "evicting LRU claude-cli session to make room"); - drop(map); - victim.shutdown().await; - map = self.inner.lock().await; - } - } - } + // If we are at the cap, take an LRU victim out of the map first so + // its slot is freed before we insert. We drop the lock for the + // shutdown await (killing a child can take a tick), accepting that + // the cap can drift by one if a concurrent task spawns in that + // window — the next reap will catch it. + let victim = if map.len() >= self.config.max_sessions { + let victim_key = lru_session_id(&map); + victim_key.and_then(|k| map.remove(&k).map(|v| (k, v))) + } else { + None + }; + + // Spawn outside of any lock if we have to wait on a victim shutdown. + let process = if let Some((victim_key, victim_proc)) = victim { + drop(map); + info!(session = %victim_key, "evicting LRU claude-cli session to make room"); + victim_proc.shutdown().await; + let process = ClaudeProcess::spawn( + session_id.to_string(), + model, + system_prompt, + cwd, + self.config.process.clone(), + ) + .await?; + self.inner + .lock() + .await + .insert(session_id.to_string(), Arc::clone(&process)); + process + } else { + // No eviction needed — keep holding the map lock across spawn so + // we don't race with another caller resolving the same id. + let process = ClaudeProcess::spawn( + session_id.to_string(), + model, + system_prompt, + cwd, + self.config.process.clone(), + ) + .await?; + map.insert(session_id.to_string(), Arc::clone(&process)); + process + }; - let process = ClaudeProcess::spawn( - session_id.to_string(), - model, - system_prompt, - cwd, - self.config.process.clone(), - ) - .await?; - map.insert(session_id.to_string(), Arc::clone(&process)); Ok(process) } @@ -152,23 +175,21 @@ impl SessionManager { return; } let now = Instant::now(); - let mut to_kill: Vec<(String, Arc)> = Vec::new(); - { - let map = self.inner.lock().await; - for (k, v) in map.iter() { - if now.duration_since(v.last_used().await) > ttl { - to_kill.push((k.clone(), Arc::clone(v))); - } - } - } - if to_kill.is_empty() { - return; - } - let mut map = self.inner.lock().await; - for (k, _) in &to_kill { - map.remove(k); - } - drop(map); + + // Collect victims under a single lock acquisition; `last_used()` is + // sync, so the iteration never crosses an `.await`. + let to_kill: Vec<(String, Arc)> = { + let mut map = self.inner.lock().await; + let keys: Vec = map + .iter() + .filter(|(_, v)| now.duration_since(v.last_used()) > ttl) + .map(|(k, _)| k.clone()) + .collect(); + keys.into_iter() + .filter_map(|k| map.remove(&k).map(|v| (k, v))) + .collect() + }; + for (k, proc) in to_kill { info!(session = %k, "evicting idle claude-cli session"); proc.shutdown().await; @@ -176,16 +197,12 @@ impl SessionManager { } } -async fn lru_session_id(map: &HashMap>) -> Option { - let mut oldest: Option<(String, Instant)> = None; - for (k, v) in map.iter() { - let used = v.last_used().await; - match &oldest { - Some((_, t)) if *t < used => {} - _ => oldest = Some((k.clone(), used)), - } - } - oldest.map(|(k, _)| k) +/// Pick the least-recently-used session id from the map. Sync because +/// `ClaudeProcess::last_used` is sync. +fn lru_session_id(map: &HashMap>) -> Option { + map.iter() + .min_by_key(|(_, v)| v.last_used()) + .map(|(k, _)| k.clone()) } fn first_user_message_text(req: &MessagesRequest) -> Option { @@ -222,47 +239,12 @@ fn system_text(system: &MessagesSystemPrompt) -> String { } } -/// Deterministic v5-style UUID derived from an arbitrary seed string. The -/// `claude` CLI requires `--session-id` to be a valid UUID; we use the DNS -/// namespace constant as a stable salt so the same conversation always maps -/// to the same id without us pulling in the v5 feature of the `uuid` crate. +/// Deterministic UUIDv5 derived from an arbitrary seed string. The `claude` +/// CLI requires `--session-id` to be a valid UUID; v5 (SHA-1 based) gives +/// us a stable mapping across Rust toolchain versions, unlike `DefaultHasher`. +/// We use the OID namespace because the seed isn't a DNS or URL name. fn uuid_from_seed(seed: &str) -> String { - let mut hasher = DefaultHasher::new(); - seed.hash(&mut hasher); - let h1 = hasher.finish(); - let mut hasher2 = DefaultHasher::new(); - h1.hash(&mut hasher2); - seed.hash(&mut hasher2); - let h2 = hasher2.finish(); - let bytes = [ - (h1 >> 56) as u8, - (h1 >> 48) as u8, - (h1 >> 40) as u8, - (h1 >> 32) as u8, - (h1 >> 24) as u8, - (h1 >> 16) as u8, - (h1 >> 8) as u8, - h1 as u8, - (h2 >> 56) as u8, - (h2 >> 48) as u8, - (h2 >> 40) as u8, - (h2 >> 32) as u8, - (h2 >> 24) as u8, - (h2 >> 16) as u8, - (h2 >> 8) as u8, - h2 as u8, - ]; - uuid::Builder::from_random_bytes(bytes) - .into_uuid() - .to_string() -} - -/// `Duration::is_zero` shim — `Duration` exposes `is_zero` only on stable -/// 1.53+, but our MSRV already covers that. Re-exporting keeps call sites -/// terse if we ever need to swap implementations. -#[allow(dead_code)] -fn is_zero(d: Duration) -> bool { - d.is_zero() + uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, seed.as_bytes()).to_string() } #[cfg(test)]