From 654225a524ba29f493abc0f0d41394a249a1ff88 Mon Sep 17 00:00:00 2001 From: Salman Paracha Date: Tue, 6 Jan 2026 21:07:56 -0800 Subject: [PATCH] PR comments to make ObservableStreamProcessor accept optonal Vec --- crates/brightstaff/src/handlers/llm.rs | 8 ++------ crates/brightstaff/src/handlers/utils.rs | 10 +++------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index b3320cc3..dfc1fe4f 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -290,18 +290,14 @@ pub async fn llm_chat( .await; // Create base processor for metrics and tracing - let mut base_processor = ObservableStreamProcessor::new( + let base_processor = ObservableStreamProcessor::new( trace_collector, operation_component::LLM, llm_span, request_start_time, + (!messages_for_signals.is_empty()).then_some(messages_for_signals), ); - // Add messages for signal analysis if available - if !messages_for_signals.is_empty() { - base_processor = base_processor.with_messages(messages_for_signals); - } - // === v1/responses state management: Wrap with ResponsesStateProcessor === // Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured) let streaming_response = if let (true, false, Some(state_store)) = ( diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index c57c59d7..53cd72fa 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -51,11 +51,13 @@ impl ObservableStreamProcessor { /// * `service_name` - The service name for this span (e.g., "archgw(llm)") /// * `span` - The span to finalize after streaming completes /// * `start_time` - When the request started (for duration calculation) + /// * `messages` - Optional conversation messages for signal analysis pub fn new( collector: Arc, service_name: impl Into, span: Span, start_time: Instant, + messages: Option>, ) -> Self { Self { collector, @@ -65,15 +67,9 @@ impl ObservableStreamProcessor { chunk_count: 0, start_time, time_to_first_token: None, - messages: None, + messages, } } - - /// Set the conversation messages for signal analysis - pub fn with_messages(mut self, messages: Vec) -> Self { - self.messages = Some(messages); - self - } } impl StreamProcessor for ObservableStreamProcessor {