diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 1fb5f973..b93a838e 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -4,9 +4,7 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; use brightstaff::app_state::AppState; use brightstaff::handlers::agents::orchestrator::agent_chat; -use brightstaff::handlers::claude_cli::{ - self, ClaudeCliConfig, SessionManager, SessionManagerConfig, -}; +use brightstaff::handlers::claude_cli::{self, SessionManager, SessionManagerConfig}; use brightstaff::handlers::debug; use brightstaff::handlers::empty; use brightstaff::handlers::function_calling::function_calling_chat_handler; @@ -586,6 +584,11 @@ async fn run_server(state: Arc) -> Result<(), Box Option<(std::net::SocketAddr, SessionManagerConfig)> { let addr_str = env::var("CLAUDE_CLI_LISTEN_ADDR").ok()?; let addr: std::net::SocketAddr = match addr_str.parse() { @@ -599,35 +602,33 @@ fn claude_cli_config_from_env() -> Option<(std::net::SocketAddr, SessionManagerC return None; } }; - let binary = env::var("CLAUDE_CLI_BIN").unwrap_or_else(|_| "claude".to_string()); - let permission_mode = - env::var("CLAUDE_CLI_PERMISSION_MODE").unwrap_or_else(|_| "bypassPermissions".to_string()); - let session_ttl = env::var("CLAUDE_CLI_SESSION_TTL_SECS") + + let mut cfg = SessionManagerConfig::default(); + if let Ok(s) = env::var("CLAUDE_CLI_BIN") { + cfg.process.binary = s; + } + if let Ok(s) = env::var("CLAUDE_CLI_PERMISSION_MODE") { + cfg.process.permission_mode = s; + } + if let Some(secs) = env::var("CLAUDE_CLI_SESSION_TTL_SECS") .ok() .and_then(|s| s.parse::().ok()) - .map(Duration::from_secs) - .unwrap_or_else(|| Duration::from_secs(600)); - let watchdog = env::var("CLAUDE_CLI_WATCHDOG_SECS") + { + cfg.process.session_ttl = Duration::from_secs(secs); + } + if let Some(secs) = env::var("CLAUDE_CLI_WATCHDOG_SECS") .ok() .and_then(|s| s.parse::().ok()) - .map(Duration::from_secs) - .unwrap_or_else(|| Duration::from_secs(120)); - let max_sessions = env::var("CLAUDE_CLI_MAX_SESSIONS") + { + cfg.process.watchdog = Duration::from_secs(secs); + } + if let Some(n) = env::var("CLAUDE_CLI_MAX_SESSIONS") .ok() .and_then(|s| s.parse::().ok()) - .unwrap_or(claude_cli::session::DEFAULT_MAX_SESSIONS); - Some(( - addr, - SessionManagerConfig { - max_sessions, - process: ClaudeCliConfig { - binary, - permission_mode, - session_ttl, - watchdog, - }, - }, - )) + { + cfg.max_sessions = n; + } + Some((addr, cfg)) } // --------------------------------------------------------------------------- diff --git a/crates/brightstaff/tests/claude_cli_bridge.rs b/crates/brightstaff/tests/claude_cli_bridge.rs index 6cf97258..4db35e2a 100644 --- a/crates/brightstaff/tests/claude_cli_bridge.rs +++ b/crates/brightstaff/tests/claude_cli_bridge.rs @@ -84,6 +84,20 @@ impl BridgeFixture { } } +/// Best-effort cleanup if a test panics before `stop().await`. We can't +/// `.await` from `Drop`, so we just abort the listener task; that's enough to +/// keep the runtime from leaking the spawned future. +impl Drop for BridgeFixture { + fn drop(&mut self) { + if let Some(tx) = self.shutdown.take() { + let _ = tx.send(()); + } + if let Some(h) = self.handle.take() { + h.abort(); + } + } +} + fn anthropic_request(stream: bool) -> Value { json!({ "model": "claude-cli/sonnet", diff --git a/crates/hermesllm/src/apis/claude_cli.rs b/crates/hermesllm/src/apis/claude_cli.rs index 0c107a88..2aa2a786 100644 --- a/crates/hermesllm/src/apis/claude_cli.rs +++ b/crates/hermesllm/src/apis/claude_cli.rs @@ -9,15 +9,15 @@ //! does the actual spawning and streaming. use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::{json, Map, Value}; use serde_with::skip_serializing_none; use thiserror::Error; use uuid::Uuid; use crate::apis::anthropic::{ - MessagesContentBlock, MessagesContentDelta, MessagesMessage, MessagesMessageContent, - MessagesMessageDelta, MessagesRequest, MessagesResponse, MessagesRole, MessagesStopReason, - MessagesStreamEvent, MessagesStreamMessage, MessagesSystemPrompt, MessagesUsage, + MessagesContentBlock, MessagesContentDelta, MessagesMessageContent, MessagesMessageDelta, + MessagesRequest, MessagesResponse, MessagesRole, MessagesStopReason, MessagesStreamEvent, + MessagesStreamMessage, MessagesSystemPrompt, MessagesUsage, }; /// Errors produced by translation between Anthropic Messages and Claude Code @@ -208,7 +208,7 @@ pub fn messages_request_to_stdin_payload( role: "user", content, }, - session_id: session_id.map(|s| s.to_string()), + session_id: session_id.map(str::to_string), }); } Ok(out) @@ -292,10 +292,10 @@ where ClaudeCliEvent::StreamEvent { event } => match event { MessagesStreamEvent::MessageStart { message } => { if id.is_empty() { - id = message.id.clone(); + id.clone_from(&message.id); } if !message.model.is_empty() { - model_out = message.model.clone(); + model_out.clone_from(&message.model); } usage = message.usage.clone(); } @@ -337,7 +337,6 @@ where // clients but dropped from the non-streaming aggregate. _ => {} }, - MessagesStreamEvent::ContentBlockStop { .. } => {} MessagesStreamEvent::MessageDelta { delta, usage: msg_usage, @@ -351,7 +350,9 @@ where // The MessageDelta usage carries final output_tokens. usage.output_tokens = msg_usage.output_tokens; } - MessagesStreamEvent::MessageStop | MessagesStreamEvent::Ping => {} + MessagesStreamEvent::ContentBlockStop { .. } + | MessagesStreamEvent::MessageStop + | MessagesStreamEvent::Ping => {} }, ClaudeCliEvent::Assistant { message } => { last_assistant_message = Some(message); @@ -411,7 +412,7 @@ where BlockKind::ToolUse => { if let Some((tool_id, name, raw_input)) = tool_accum.remove(&idx) { let input_value = if raw_input.is_empty() { - Value::Object(Default::default()) + Value::Object(Map::default()) } else { serde_json::from_str(&raw_input) .unwrap_or_else(|_| Value::String(raw_input)) @@ -505,9 +506,10 @@ pub fn cli_error_to_anthropic_error_body(message: &str) -> Value { /// the CLI did not emit one (it usually does, but very small turns can skip /// straight to `assistant`/`result`). pub fn synthetic_message_start(model: &str, session_id: Option<&str>) -> MessagesStreamEvent { - let id = session_id - .map(|s| format!("msg_cli_{}", s)) - .unwrap_or_else(|| format!("msg_cli_{}", Uuid::new_v4().simple())); + let id = session_id.map_or_else( + || format!("msg_cli_{}", Uuid::new_v4().simple()), + |s| format!("msg_cli_{s}"), + ); MessagesStreamEvent::MessageStart { message: MessagesStreamMessage { id, @@ -537,11 +539,6 @@ pub fn parse_ndjson_line(line: &str) -> Option for ProviderId { "do_ai" => Ok(ProviderId::DigitalOcean), // alias "vercel" => Ok(ProviderId::Vercel), "openrouter" => Ok(ProviderId::OpenRouter), - "claude-cli" => Ok(ProviderId::ClaudeCli), - "claude_cli" => Ok(ProviderId::ClaudeCli), // alias + "claude-cli" | "claude_cli" => Ok(ProviderId::ClaudeCli), _ => Err(format!("Unknown provider: {}", value)), } } diff --git a/crates/hermesllm/tests/claude_cli_fixtures.rs b/crates/hermesllm/tests/claude_cli_fixtures.rs index 3ac335c4..2847a352 100644 --- a/crates/hermesllm/tests/claude_cli_fixtures.rs +++ b/crates/hermesllm/tests/claude_cli_fixtures.rs @@ -56,17 +56,16 @@ fn text_response_aggregates_into_messages_response() { )); let final_event = stream.last().unwrap(); assert!(matches!(final_event, MessagesStreamEvent::MessageStop)); - let text_deltas = stream + let text_deltas: String = stream .iter() .filter_map(|ev| match ev { MessagesStreamEvent::ContentBlockDelta { delta: MessagesContentDelta::TextDelta { text }, .. - } => Some(text.clone()), + } => Some(text.as_str()), _ => None, }) - .collect::>() - .join(""); + .collect(); assert_eq!(text_deltas, "Hello, world!"); }