refactor(claude-cli): make session bookkeeping sync, drop lock-across-await

- Convert ClaudeProcess::last_used from tokio::sync::Mutex<Instant> to
  std::sync::Mutex<Instant>: the critical section is one Copy read/write
  with no .await, so a sync mutex lets SessionManager iterate sessions
  without holding the map lock across an await point. Fixes the
  lock-across-await pattern in lru_session_id and evict_idle.
- Simplify SessionManager::get_or_spawn to a single map-lock acquisition
  on the fast path; only release the lock for the rare case where we
  need to await a victim shutdown before spawning.
- Replace the hand-rolled "deterministic UUID via DefaultHasher" with a
  real UUIDv5 over the OID namespace (uuid feature `v5`). Stable across
  Rust toolchain versions, unlike SipHash, and matches what the doc on
  the helper claimed all along.
- Introduce ProcessError::MissingStdio { which } so spawns where
  Stdio::piped() somehow returned None surface as their own programmer-
  error variant rather than masquerading as ExitedEarly.
- Delete the dead is_zero() helper.
This commit is contained in:
Spherrrical 2026-05-04 13:35:11 -07:00
parent 56006f0769
commit 53a23ec8f9
4 changed files with 127 additions and 114 deletions

1
crates/Cargo.lock generated
View file

@ -4106,6 +4106,7 @@ dependencies = [
"getrandom 0.4.2",
"js-sys",
"serde_core",
"sha1_smol",
"wasm-bindgen",
]

View file

@ -60,7 +60,7 @@ time = { version = "0.3", features = ["formatting", "macros"] }
tracing = "0.1"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
uuid = { version = "1.0", features = ["v4", "v5", "serde"] }
[dev-dependencies]
mockito = "1.0"

View file

@ -4,7 +4,7 @@
//! and stream-json lives in `hermesllm::apis::claude_cli`.
use std::process::Stdio;
use std::sync::Arc;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use hermesllm::apis::claude_cli::{parse_ndjson_line, ClaudeCliEvent, ClaudeCliInputEvent};
@ -56,6 +56,12 @@ pub enum ProcessError {
StdinWrite(#[source] std::io::Error),
#[error("claude process exited unexpectedly")]
ExitedEarly,
/// `Command::spawn` succeeded but a piped stdio handle was already taken
/// by the time we asked for it. Should be unreachable given we set
/// `Stdio::piped()` immediately before spawn; surfaced as its own variant
/// so callers can tell it apart from a real "exited early".
#[error("claude child is missing piped {which} after spawn")]
MissingStdio { which: &'static str },
#[error("claude watchdog fired after {0:?} of silence")]
WatchdogTimeout(Duration),
#[error("failed to serialize stdin payload: {0}")]
@ -94,8 +100,11 @@ pub struct ClaudeProcess {
event_rx: Arc<Mutex<mpsc::Receiver<ClaudeCliEvent>>>,
config: ClaudeCliConfig,
/// Last time a request was served on this session — used by the session
/// manager to enforce the idle TTL.
last_used: Mutex<Instant>,
/// manager to enforce the idle TTL. Held under a sync mutex because the
/// critical section is one read/write of a `Copy` value with no `.await`,
/// which keeps `SessionManager` callers from holding the session-map lock
/// across an async hop.
last_used: StdMutex<Instant>,
pub session_id: String,
}
@ -148,9 +157,18 @@ impl ClaudeProcess {
source: e,
})?;
let stdin = child.stdin.take().ok_or(ProcessError::ExitedEarly)?;
let stdout = child.stdout.take().ok_or(ProcessError::ExitedEarly)?;
let stderr = child.stderr.take().ok_or(ProcessError::ExitedEarly)?;
let stdin = child
.stdin
.take()
.ok_or(ProcessError::MissingStdio { which: "stdin" })?;
let stdout = child
.stdout
.take()
.ok_or(ProcessError::MissingStdio { which: "stdout" })?;
let stderr = child
.stderr
.take()
.ok_or(ProcessError::MissingStdio { which: "stderr" })?;
// Bounded channel — backpressure if the consumer is slow, but large
// enough that bursts of small text deltas do not block stdout drain.
@ -217,7 +235,7 @@ impl ClaudeProcess {
stdin: Mutex::new(Some(stdin)),
event_rx: Arc::new(Mutex::new(rx)),
config,
last_used: Mutex::new(Instant::now()),
last_used: StdMutex::new(Instant::now()),
session_id,
}))
}
@ -232,7 +250,10 @@ impl ClaudeProcess {
&self,
events: &[ClaudeCliInputEvent],
) -> Result<TurnStream, ProcessError> {
*self.last_used.lock().await = Instant::now();
// Sync lock + Copy value; never held across an `.await`.
if let Ok(mut last) = self.last_used.lock() {
*last = Instant::now();
}
// Claim the event receiver for the lifetime of this turn.
let rx_guard = Arc::clone(&self.event_rx)
@ -259,8 +280,17 @@ impl ClaudeProcess {
}
/// Most-recent activity timestamp; used by the session manager's reaper.
pub async fn last_used(&self) -> Instant {
*self.last_used.lock().await
/// Sync because the lock guards a single `Instant` with no `.await` in
/// the critical section — keeps callers from holding async locks across
/// an await point.
pub fn last_used(&self) -> Instant {
// Poisoning is impossible here (the only writer is `send_user_turn`
// which never panics while holding the lock), but if it ever happens
// we degrade gracefully rather than aborting.
self.last_used
.lock()
.map(|g| *g)
.unwrap_or_else(|p| *p.into_inner())
}
/// Forcefully terminate the child. Safe to call multiple times.

View file

@ -3,10 +3,8 @@
//! long-lived `ClaudeProcess`. Enforces an idle TTL and a hard cap on the
//! number of concurrent sessions.
use std::collections::{hash_map::DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use hermesllm::apis::anthropic::{
MessagesContentBlock, MessagesMessageContent, MessagesRequest, MessagesRole,
@ -75,15 +73,19 @@ impl SessionManager {
return uuid_from_seed(trimmed);
}
}
let mut hasher = DefaultHasher::new();
req.model.hash(&mut hasher);
// Build a deterministic seed from (model, system_prompt, first user
// message) so a retried conversation lands on the same session.
let mut seed = String::new();
seed.push_str(&req.model);
seed.push('\u{1f}');
if let Some(system) = &req.system {
system_text(system).hash(&mut hasher);
seed.push_str(&system_text(system));
}
seed.push('\u{1f}');
if let Some(first) = first_user_message_text(req) {
first.hash(&mut hasher);
seed.push_str(&first);
}
uuid_from_seed(&hasher.finish().to_string())
uuid_from_seed(&seed)
}
/// Get the existing session's process or spawn a new one.
@ -98,40 +100,61 @@ impl SessionManager {
// background task for the common one-developer-one-laptop deployment.
self.evict_idle().await;
{
let map = self.inner.lock().await;
if let Some(existing) = map.get(session_id) {
debug!(session = %session_id, "reusing claude-cli session");
return Ok(Arc::clone(existing));
}
}
// Single lock acquisition for the whole get-or-spawn path. `last_used`
// is now a sync mutex on `ClaudeProcess`, so iterating to find the
// LRU victim does not block other tasks across an `.await`.
let mut map = self.inner.lock().await;
if let Some(existing) = map.get(session_id) {
debug!(session = %session_id, "reusing claude-cli session");
return Ok(Arc::clone(existing));
}
if map.len() >= self.config.max_sessions {
// Evict the least-recently-used session to keep the cap honest.
if let Some(victim_key) = lru_session_id(&map).await {
if let Some(victim) = map.remove(&victim_key) {
info!(session = %victim_key, "evicting LRU claude-cli session to make room");
drop(map);
victim.shutdown().await;
map = self.inner.lock().await;
}
}
}
// If we are at the cap, take an LRU victim out of the map first so
// its slot is freed before we insert. We drop the lock for the
// shutdown await (killing a child can take a tick), accepting that
// the cap can drift by one if a concurrent task spawns in that
// window — the next reap will catch it.
let victim = if map.len() >= self.config.max_sessions {
let victim_key = lru_session_id(&map);
victim_key.and_then(|k| map.remove(&k).map(|v| (k, v)))
} else {
None
};
// Spawn outside of any lock if we have to wait on a victim shutdown.
let process = if let Some((victim_key, victim_proc)) = victim {
drop(map);
info!(session = %victim_key, "evicting LRU claude-cli session to make room");
victim_proc.shutdown().await;
let process = ClaudeProcess::spawn(
session_id.to_string(),
model,
system_prompt,
cwd,
self.config.process.clone(),
)
.await?;
self.inner
.lock()
.await
.insert(session_id.to_string(), Arc::clone(&process));
process
} else {
// No eviction needed — keep holding the map lock across spawn so
// we don't race with another caller resolving the same id.
let process = ClaudeProcess::spawn(
session_id.to_string(),
model,
system_prompt,
cwd,
self.config.process.clone(),
)
.await?;
map.insert(session_id.to_string(), Arc::clone(&process));
process
};
let process = ClaudeProcess::spawn(
session_id.to_string(),
model,
system_prompt,
cwd,
self.config.process.clone(),
)
.await?;
map.insert(session_id.to_string(), Arc::clone(&process));
Ok(process)
}
@ -152,23 +175,21 @@ impl SessionManager {
return;
}
let now = Instant::now();
let mut to_kill: Vec<(String, Arc<ClaudeProcess>)> = Vec::new();
{
let map = self.inner.lock().await;
for (k, v) in map.iter() {
if now.duration_since(v.last_used().await) > ttl {
to_kill.push((k.clone(), Arc::clone(v)));
}
}
}
if to_kill.is_empty() {
return;
}
let mut map = self.inner.lock().await;
for (k, _) in &to_kill {
map.remove(k);
}
drop(map);
// Collect victims under a single lock acquisition; `last_used()` is
// sync, so the iteration never crosses an `.await`.
let to_kill: Vec<(String, Arc<ClaudeProcess>)> = {
let mut map = self.inner.lock().await;
let keys: Vec<String> = map
.iter()
.filter(|(_, v)| now.duration_since(v.last_used()) > ttl)
.map(|(k, _)| k.clone())
.collect();
keys.into_iter()
.filter_map(|k| map.remove(&k).map(|v| (k, v)))
.collect()
};
for (k, proc) in to_kill {
info!(session = %k, "evicting idle claude-cli session");
proc.shutdown().await;
@ -176,16 +197,12 @@ impl SessionManager {
}
}
async fn lru_session_id(map: &HashMap<String, Arc<ClaudeProcess>>) -> Option<String> {
let mut oldest: Option<(String, Instant)> = None;
for (k, v) in map.iter() {
let used = v.last_used().await;
match &oldest {
Some((_, t)) if *t < used => {}
_ => oldest = Some((k.clone(), used)),
}
}
oldest.map(|(k, _)| k)
/// Pick the least-recently-used session id from the map. Sync because
/// `ClaudeProcess::last_used` is sync.
fn lru_session_id(map: &HashMap<String, Arc<ClaudeProcess>>) -> Option<String> {
map.iter()
.min_by_key(|(_, v)| v.last_used())
.map(|(k, _)| k.clone())
}
fn first_user_message_text(req: &MessagesRequest) -> Option<String> {
@ -222,47 +239,12 @@ fn system_text(system: &MessagesSystemPrompt) -> String {
}
}
/// Deterministic v5-style UUID derived from an arbitrary seed string. The
/// `claude` CLI requires `--session-id` to be a valid UUID; we use the DNS
/// namespace constant as a stable salt so the same conversation always maps
/// to the same id without us pulling in the v5 feature of the `uuid` crate.
/// Deterministic UUIDv5 derived from an arbitrary seed string. The `claude`
/// CLI requires `--session-id` to be a valid UUID; v5 (SHA-1 based) gives
/// us a stable mapping across Rust toolchain versions, unlike `DefaultHasher`.
/// We use the OID namespace because the seed isn't a DNS or URL name.
fn uuid_from_seed(seed: &str) -> String {
let mut hasher = DefaultHasher::new();
seed.hash(&mut hasher);
let h1 = hasher.finish();
let mut hasher2 = DefaultHasher::new();
h1.hash(&mut hasher2);
seed.hash(&mut hasher2);
let h2 = hasher2.finish();
let bytes = [
(h1 >> 56) as u8,
(h1 >> 48) as u8,
(h1 >> 40) as u8,
(h1 >> 32) as u8,
(h1 >> 24) as u8,
(h1 >> 16) as u8,
(h1 >> 8) as u8,
h1 as u8,
(h2 >> 56) as u8,
(h2 >> 48) as u8,
(h2 >> 40) as u8,
(h2 >> 32) as u8,
(h2 >> 24) as u8,
(h2 >> 16) as u8,
(h2 >> 8) as u8,
h2 as u8,
];
uuid::Builder::from_random_bytes(bytes)
.into_uuid()
.to_string()
}
/// `Duration::is_zero` shim — `Duration` exposes `is_zero` only on stable
/// 1.53+, but our MSRV already covers that. Re-exporting keeps call sites
/// terse if we ever need to swap implementations.
#[allow(dead_code)]
fn is_zero(d: Duration) -> bool {
d.is_zero()
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, seed.as_bytes()).to_string()
}
#[cfg(test)]