PR comments to make ObservableStreamProcessor accept optonal Vec<Messagges>

This commit is contained in:
Salman Paracha 2026-01-06 21:07:56 -08:00
parent 056d346ab4
commit 654225a524
2 changed files with 5 additions and 13 deletions

View file

@ -290,18 +290,14 @@ pub async fn llm_chat(
.await;
// Create base processor for metrics and tracing
let mut base_processor = ObservableStreamProcessor::new(
let base_processor = ObservableStreamProcessor::new(
trace_collector,
operation_component::LLM,
llm_span,
request_start_time,
(!messages_for_signals.is_empty()).then_some(messages_for_signals),
);
// Add messages for signal analysis if available
if !messages_for_signals.is_empty() {
base_processor = base_processor.with_messages(messages_for_signals);
}
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured)
let streaming_response = if let (true, false, Some(state_store)) = (

View file

@ -51,11 +51,13 @@ impl ObservableStreamProcessor {
/// * `service_name` - The service name for this span (e.g., "archgw(llm)")
/// * `span` - The span to finalize after streaming completes
/// * `start_time` - When the request started (for duration calculation)
/// * `messages` - Optional conversation messages for signal analysis
pub fn new(
collector: Arc<TraceCollector>,
service_name: impl Into<String>,
span: Span,
start_time: Instant,
messages: Option<Vec<Message>>,
) -> Self {
Self {
collector,
@ -65,15 +67,9 @@ impl ObservableStreamProcessor {
chunk_count: 0,
start_time,
time_to_first_token: None,
messages: None,
messages,
}
}
/// Set the conversation messages for signal analysis
pub fn with_messages(mut self, messages: Vec<Message>) -> Self {
self.messages = Some(messages);
self
}
}
impl StreamProcessor for ObservableStreamProcessor {