diff --git a/arch/arch_config_schema.yaml b/arch/arch_config_schema.yaml index 93b34a1e..f5793b2b 100644 --- a/arch/arch_config_schema.yaml +++ b/arch/arch_config_schema.yaml @@ -23,7 +23,7 @@ properties: required: - id - url - agent_filters: + filters: type: array items: type: object diff --git a/arch/tools/cli/config_generator.py b/arch/tools/cli/config_generator.py index 1770ad83..3acdb932 100644 --- a/arch/tools/cli/config_generator.py +++ b/arch/tools/cli/config_generator.py @@ -101,8 +101,8 @@ def validate_and_render_schema(): # Process agents section and convert to endpoints agents = config_yaml.get("agents", []) - agent_filters = config_yaml.get("agent_filters", []) - agents_combined = agents + agent_filters + filters = config_yaml.get("filters", []) + agents_combined = agents + filters agent_id_keys = set() for agent in agents_combined: diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 2e8e0961..be82e952 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -1,8 +1,12 @@ use std::sync::Arc; +use std::time::{Instant, SystemTime}; use bytes::Bytes; +use common::consts::TRACE_PARENT_HEADER; +use common::traces::{SpanBuilder, SpanKind, parse_traceparent}; use hermesllm::apis::OpenAIMessage; use hermesllm::clients::SupportedAPIsFromClient; +use hermesllm::providers::request::ProviderRequest; use hermesllm::ProviderRequestType; use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; @@ -14,6 +18,7 @@ use super::agent_selector::{AgentSelectionError, AgentSelector}; use super::pipeline_processor::{PipelineError, PipelineProcessor}; use super::response_handler::ResponseHandler; use crate::router::llm_router::RouterService; +use crate::tracing::{OperationNameBuilder, operation_component, http}; /// Main errors for agent chat completions #[derive(Debug, thiserror::Error)] @@ -179,7 +184,7 @@ async fn handle_agent_chat( } }; - let message: Vec = client_request.get_message_history(); + let message: Vec = client_request.get_messages(); // let chat_completions_request: ChatCompletionsRequest = // serde_json::from_slice(&chat_request_bytes).map_err(|err| { @@ -193,7 +198,7 @@ async fn handle_agent_chat( // Extract trace parent for routing let trace_parent = request_headers .iter() - .find(|(key, _)| key.as_str() == "traceparent") + .find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER) .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); // Create agent map for pipeline processing and agent selection @@ -205,11 +210,15 @@ async fn handle_agent_chat( // Select appropriate agent using arch router llm model let selected_agent = agent_selector - .select_agent(&message, &listener, trace_parent) + .select_agent(&message, &listener, trace_parent.clone()) .await?; debug!("Processing agent pipeline: {}", selected_agent.id); + // Record the start time for agent span + let agent_start_time = SystemTime::now(); + let agent_start_instant = Instant::now(); + // Process the filter chain let chat_history = pipeline_processor .process_filter_chain( @@ -222,14 +231,14 @@ async fn handle_agent_chat( .await?; // Get terminal agent and send final response - let terminal_agent_name = selected_agent.id; + let terminal_agent_name = selected_agent.id.clone(); let terminal_agent = agent_map.get(&terminal_agent_name).unwrap(); debug!("Processing terminal agent: {}", terminal_agent_name); debug!("Terminal agent details: {:?}", terminal_agent); let llm_response = pipeline_processor - .invoke_terminal_agent( + .invoke_agent( &chat_history, client_request, terminal_agent, @@ -237,6 +246,47 @@ async fn handle_agent_chat( ) .await?; + // Record agent span after processing is complete + let agent_end_time = SystemTime::now(); + let agent_elapsed = agent_start_instant.elapsed(); + + // Build full path with /agents prefix + let full_path = format!("/agents{}", request_path); + + // Build operation name: POST {full_path} {agent_name} + let operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path(&full_path) + .with_target(&terminal_agent_name) + .build(); + + // Parse trace parent to get trace_id and parent_span_id + let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent { + parse_traceparent(tp) + } else { + (String::new(), None) + }; + + let mut span_builder = SpanBuilder::new(&operation_name) + .with_kind(SpanKind::Internal) + .with_start_time(agent_start_time) + .with_end_time(agent_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, full_path) + .with_attribute("agent.name", terminal_agent_name.clone()) + .with_attribute("duration_ms", format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0)); + + if !trace_id.is_empty() { + span_builder = span_builder.with_trace_id(trace_id); + } + if let Some(parent_id) = parent_span_id { + span_builder = span_builder.with_parent_span_id(parent_id); + } + + let span = span_builder.build(); + // Use plano(agent) as service name for the agent processing span + trace_collector.record_span(operation_component::AGENT, span); + // Create streaming response response_handler .create_streaming_response(llm_response) diff --git a/crates/brightstaff/src/handlers/jsonrpc.rs b/crates/brightstaff/src/handlers/jsonrpc.rs index 834fcb09..0f8b9373 100644 --- a/crates/brightstaff/src/handlers/jsonrpc.rs +++ b/crates/brightstaff/src/handlers/jsonrpc.rs @@ -1,6 +1,11 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; +pub const JSON_RPC_VERSION: &str = "2.0"; +pub const TOOL_CALL_METHOD : &str = "tools/call"; +pub const MCP_INITIALIZE: &str = "initialize"; +pub const MCP_INITIALIZE_NOTIFICATION: &str = "initialize/notification"; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum JsonRpcId { diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index 7b95230a..ad2c3504 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use common::configuration::{Agent, AgentFilterChain}; -use common::consts::{ARCH_UPSTREAM_HOST_HEADER, ENVOY_RETRY_HEADER}; +use common::consts::{ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER}; use common::traces::{SpanBuilder, SpanKind}; use hermesllm::{ProviderRequest, ProviderRequestType}; use hermesllm::apis::openai::{Message}; @@ -10,7 +10,10 @@ use opentelemetry::trace::TraceContextExt; use tracing::{debug, info, warn}; use std::time::{Instant, SystemTime}; -use crate::handlers::jsonrpc::{JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse}; +use crate::tracing::operation_component::{self}; +use crate::tracing::{OperationNameBuilder, http}; + +use crate::handlers::jsonrpc::{JSON_RPC_VERSION, JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD}; use uuid::Uuid; /// Errors that can occur during pipeline processing @@ -107,12 +110,22 @@ impl PipelineProcessor { ) { let (trace_id, parent_span_id) = self.extract_trace_context(); - let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name)) - .with_kind(SpanKind::Internal) + // Build operation name: POST /agents/* {filter_name} + // Using generic path since we don't have access to specific endpoint here + let operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path("/agents/*") + .with_target(agent_name) + .build(); + + let mut span_builder = SpanBuilder::new(&operation_name) + .with_kind(SpanKind::Client) .with_start_time(start_time) .with_end_time(end_time) - .with_attribute("filter_name", agent_name.to_string()) - .with_attribute("tool_name", tool_name.to_string()) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, "/agents/*") + .with_attribute("filter.name", agent_name.to_string()) + .with_attribute("filter.tool_name", tool_name.to_string()) .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); if !trace_id.is_empty() { @@ -123,7 +136,8 @@ impl PipelineProcessor { } let span = span_builder.build(); - collector.record_span("brightstaff", span); + // Use plano(filter) as service name for filter execution spans + collector.record_span(operation_component::AGENT_FILTER, span); } /// Record a span for MCP protocol interactions @@ -139,10 +153,20 @@ impl PipelineProcessor { ) { let (trace_id, parent_span_id) = self.extract_trace_context(); - let mut span_builder = SpanBuilder::new(format!("mcp_{}", operation)) + // Build operation name: POST /mcp {agent_id} + let operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path("/mcp") + .with_operation(operation) + .with_target(agent_id) + .build(); + + let mut span_builder = SpanBuilder::new(&operation_name) .with_kind(SpanKind::Client) .with_start_time(start_time) .with_end_time(end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, &format!("/mcp ({})", operation.to_string())) .with_attribute("mcp.operation", operation.to_string()) .with_attribute("mcp.agent_id", agent_id.to_string()) .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); @@ -161,7 +185,8 @@ impl PipelineProcessor { } let span = span_builder.build(); - collector.record_span("brightstaff", span); + // MCP spans also use plano(filter) service name as they are part of filter operations + collector.record_span(operation_component::AGENT_FILTER, span); } /// Process the filter chain of agents (all except the terminal agent) @@ -336,20 +361,21 @@ impl PipelineProcessor { tool_name: &str, messages: &[Message], ) -> Result { - let arguments = serde_json::json!({ - "messages": messages - }); + let mut arguments = HashMap::new(); + arguments.insert( + "messages".to_string(), + serde_json::to_value(messages)?, + ); - let params = serde_json::json!({ - "name": tool_name, - "arguments": arguments - }); + let mut params = HashMap::new(); + params.insert("name".to_string(), serde_json::to_value(tool_name)?); + params.insert("arguments".to_string(), serde_json::to_value(arguments)?); Ok(JsonRpcRequest { - jsonrpc: "2.0".to_string(), + jsonrpc: JSON_RPC_VERSION.to_string(), id: JsonRpcId::String(Uuid::new_v4().to_string()), - method: "tools/call".to_string(), - params: Some(serde_json::from_value(params)?), + method: TOOL_CALL_METHOD.to_string(), + params: Some(params), }) } @@ -479,9 +505,9 @@ impl PipelineProcessor { /// Build an initialize JSON-RPC request fn build_initialize_request(&self) -> JsonRpcRequest { JsonRpcRequest { - jsonrpc: "2.0".to_string(), - id: JsonRpcId::Number(1), - method: "initialize".to_string(), + jsonrpc: JSON_RPC_VERSION.to_string(), + id: JsonRpcId::String(Uuid::new_v4().to_string()), + method: MCP_INITIALIZE.to_string(), params: Some({ let mut params = HashMap::new(); params.insert( @@ -492,7 +518,7 @@ impl PipelineProcessor { params.insert( "clientInfo".to_string(), serde_json::json!({ - "name": "brightstaff", + "name": BRIGHT_STAFF_SERVICE_NAME, "version": "1.0.0" }), ); @@ -509,8 +535,8 @@ impl PipelineProcessor { trace_collector: Option<&std::sync::Arc>, ) -> Result<(), PipelineError> { let initialized_notification = JsonRpcNotification { - jsonrpc: "2.0".to_string(), - method: "notifications/initialized".to_string(), + jsonrpc: JSON_RPC_VERSION.to_string(), + method: MCP_INITIALIZE_NOTIFICATION.to_string(), params: None, }; @@ -616,7 +642,7 @@ impl PipelineProcessor { } /// Send request to terminal agent and return the raw response for streaming - pub async fn invoke_terminal_agent( + pub async fn invoke_agent( &self, messages: &[Message], mut original_request: ProviderRequestType, @@ -786,7 +812,7 @@ mod tests { #[tokio::test] async fn test_execute_filter_mcp_error_flag() { let rpc_body = serde_json::json!({ - "jsonrpc": "2.0", + "jsonrpc": JSON_RPC_VERSION, "id": "1", "result": { "isError": true, diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index f14e3469..c516c536 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -60,18 +60,18 @@ async fn main() -> Result<(), Box> { let arch_config = Arc::new(config); - // combine agents and agent_filters into a single list of agents + // combine agents and filters into a single list of agents let all_agents: Vec = arch_config .agents .as_deref() .unwrap_or_default() .iter() - .chain(arch_config.agent_filters.as_deref().unwrap_or_default()) + .chain(arch_config.filters.as_deref().unwrap_or_default()) .cloned() .collect(); let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone())); - let agents_list = Arc::new(RwLock::new(Some(all_agents))); + let combined_agents_filters_list = Arc::new(RwLock::new(Some(all_agents))); let listeners = Arc::new(RwLock::new(arch_config.listeners.clone())); let llm_provider_url = env::var("LLM_PROVIDER_ENDPOINT").unwrap_or_else(|_| "http://localhost:12001".to_string()); @@ -125,7 +125,7 @@ async fn main() -> Result<(), Box> { let llm_provider_url = llm_provider_url.clone(); let llm_providers = llm_providers.clone(); - let agents_list = agents_list.clone(); + let agents_list = combined_agents_filters_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); let service = service_fn(move |req| { diff --git a/crates/brightstaff/src/tracing/constants.rs b/crates/brightstaff/src/tracing/constants.rs index bd946aac..6a5dbc2a 100644 --- a/crates/brightstaff/src/tracing/constants.rs +++ b/crates/brightstaff/src/tracing/constants.rs @@ -203,6 +203,7 @@ pub mod operation_component { pub struct OperationNameBuilder { method: Option, path: Option, + operation: Option, target: Option, } @@ -212,6 +213,7 @@ impl OperationNameBuilder { Self { method: None, path: None, + operation: None, target: None, } } @@ -234,6 +236,15 @@ impl OperationNameBuilder { self } + /// Set the operation type (optional, for MCP operations) + /// + /// # Arguments + /// * `operation` - Operation type (e.g., "tool_call", "session_init", "notification") + pub fn with_operation(mut self, operation: impl Into) -> Self { + self.operation = Some(operation.into()); + self + } + /// Set the target (model name, agent name, or filter name) /// /// # Arguments @@ -246,7 +257,8 @@ impl OperationNameBuilder { /// Build the operation name string /// /// # Format - /// - With all components: `{method} {path} {target}` + /// - With all components: `{method} {path} ({operation}) {target}` + /// - Without operation: `{method} {path} {target}` /// - Without target: `{method} {path}` /// - Without path: `{method}` /// - Empty: returns empty string @@ -258,7 +270,11 @@ impl OperationNameBuilder { } if let Some(path) = self.path { - parts.push(path); + if let Some(operation) = self.operation { + parts.push(format!("{} ({})", path, operation)); + } else { + parts.push(path); + } } if let Some(target) = self.target { diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 4bfb0460..eeee78dd 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -60,7 +60,7 @@ pub struct Configuration { pub mode: Option, pub routing: Option, pub agents: Option>, - pub agent_filters: Option>, + pub filters: Option>, pub listeners: Vec, } diff --git a/crates/common/src/consts.rs b/crates/common/src/consts.rs index 12d35ab4..ce34fad5 100644 --- a/crates/common/src/consts.rs +++ b/crates/common/src/consts.rs @@ -32,3 +32,4 @@ pub const OTEL_COLLECTOR_HTTP: &str = "opentelemetry_collector_http"; pub const OTEL_POST_PATH: &str = "/v1/traces"; pub const LLM_ROUTE_HEADER: &str = "x-arch-llm-route"; pub const ENVOY_RETRY_HEADER: &str = "x-envoy-max-retries"; +pub const BRIGHT_STAFF_SERVICE_NAME : &str = "brightstaff"; diff --git a/crates/hermesllm/src/apis/amazon_bedrock.rs b/crates/hermesllm/src/apis/amazon_bedrock.rs index 7b4a511f..3c953850 100644 --- a/crates/hermesllm/src/apis/amazon_bedrock.rs +++ b/crates/hermesllm/src/apis/amazon_bedrock.rs @@ -233,6 +233,104 @@ impl ProviderRequest for ConverseRequest { fn get_temperature(&self) -> Option { self.inference_config.as_ref()?.temperature } + + fn get_messages(&self) -> Vec { + use crate::apis::openai::{Message, MessageContent, Role}; + + let mut openai_messages = Vec::new(); + + // Add system messages if present + if let Some(system) = &self.system { + for sys_block in system { + match sys_block { + SystemContentBlock::Text { text } => { + openai_messages.push(Message { + role: Role::System, + content: MessageContent::Text(text.clone()), + name: None, + tool_calls: None, + tool_call_id: None, + }); + } + _ => {} // Skip other system content types + } + } + } + + // Convert conversation messages + if let Some(messages) = &self.messages { + for msg in messages { + let role = match msg.role { + ConversationRole::User => Role::User, + ConversationRole::Assistant => Role::Assistant, + }; + + // Extract text from content blocks + let content = msg.content.iter() + .filter_map(|block| { + if let ContentBlock::Text { text } = block { + Some(text.clone()) + } else { + None + } + }) + .collect::>() + .join("\n"); + + openai_messages.push(Message { + role, + content: MessageContent::Text(content), + name: None, + tool_calls: None, + tool_call_id: None, + }); + } + } + + openai_messages + } + + fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) { + // Convert OpenAI messages to Bedrock format + use crate::apis::amazon_bedrock::{ContentBlock, ConversationRole, SystemContentBlock}; + + let mut system_blocks = Vec::new(); + let mut bedrock_messages = Vec::new(); + + for msg in messages { + match msg.role { + crate::apis::openai::Role::System => { + if let crate::apis::openai::MessageContent::Text(text) = &msg.content { + system_blocks.push(SystemContentBlock::Text { text: text.clone() }); + } + } + crate::apis::openai::Role::User | crate::apis::openai::Role::Assistant => { + let role = match msg.role { + crate::apis::openai::Role::User => ConversationRole::User, + crate::apis::openai::Role::Assistant => ConversationRole::Assistant, + _ => continue, + }; + + let content = if let crate::apis::openai::MessageContent::Text(text) = &msg.content { + vec![ContentBlock::Text { text: text.clone() }] + } else { + vec![] + }; + + bedrock_messages.push(crate::apis::amazon_bedrock::Message { + role, + content, + }); + } + _ => {} + } + } + + if !system_blocks.is_empty() { + self.system = Some(system_blocks); + } + self.messages = Some(bedrock_messages); + } } // ============================================================================ diff --git a/crates/hermesllm/src/apis/anthropic.rs b/crates/hermesllm/src/apis/anthropic.rs index 06d632d9..20f422e2 100644 --- a/crates/hermesllm/src/apis/anthropic.rs +++ b/crates/hermesllm/src/apis/anthropic.rs @@ -541,6 +541,65 @@ impl ProviderRequest for MessagesRequest { fn get_temperature(&self) -> Option { self.temperature } + + fn get_messages(&self) -> Vec { + use crate::apis::openai::Message; + + let mut openai_messages = Vec::new(); + + // Add system prompt as system message if present + if let Some(system) = &self.system { + openai_messages.push(system.clone().into()); + } + + // Convert each Anthropic message to OpenAI format + for msg in &self.messages { + if let Ok(converted_msgs) = TryInto::>::try_into(msg.clone()) { + openai_messages.extend(converted_msgs); + } + } + + openai_messages + } + + fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) { + // Convert OpenAI messages to Anthropic format + // Separate system messages from regular messages + let mut system_messages = Vec::new(); + let mut regular_messages = Vec::new(); + + for msg in messages { + if msg.role == crate::apis::openai::Role::System { + system_messages.push(msg.clone()); + } else { + regular_messages.push(msg.clone()); + } + } + + // Set system prompt if there are system messages + if !system_messages.is_empty() { + // Combine all system messages into one + let system_text = system_messages.iter() + .filter_map(|msg| { + if let crate::apis::openai::MessageContent::Text(text) = &msg.content { + Some(text.as_str()) + } else { + None + } + }) + .collect::>() + .join("\n"); + + self.system = Some(crate::apis::anthropic::MessagesSystemPrompt::Single(system_text)); + } + + // Convert regular messages + self.messages = regular_messages.iter() + .filter_map(|msg| { + msg.clone().try_into().ok() + }) + .collect(); + } } impl MessagesResponse { diff --git a/crates/hermesllm/src/apis/openai.rs b/crates/hermesllm/src/apis/openai.rs index 4e006c3a..79154d39 100644 --- a/crates/hermesllm/src/apis/openai.rs +++ b/crates/hermesllm/src/apis/openai.rs @@ -735,6 +735,14 @@ impl ProviderRequest for ChatCompletionsRequest { fn get_temperature(&self) -> Option { self.temperature } + + fn get_messages(&self) -> Vec { + self.messages.clone() + } + + fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) { + self.messages = messages.to_vec(); + } } /// Implementation of ProviderResponse for ChatCompletionsResponse diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 33dea44a..6afe9f09 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -1134,6 +1134,140 @@ impl ProviderRequest for ResponsesAPIRequest { fn get_temperature(&self) -> Option { self.temperature } + + fn get_messages(&self) -> Vec { + use crate::apis::openai::{Message, MessageContent, Role}; + + let mut openai_messages = Vec::new(); + + // Add instructions as system message if present + if let Some(instructions) = &self.instructions { + openai_messages.push(Message { + role: Role::System, + content: MessageContent::Text(instructions.clone()), + name: None, + tool_calls: None, + tool_call_id: None, + }); + } + + // Convert input to messages + match &self.input { + InputParam::Text(text) => { + openai_messages.push(Message { + role: Role::User, + content: MessageContent::Text(text.clone()), + name: None, + tool_calls: None, + tool_call_id: None, + }); + } + InputParam::Items(items) => { + for item in items { + match item { + InputItem::Message(msg) => { + // Convert message role + let role = match msg.role { + MessageRole::User => Role::User, + MessageRole::Assistant => Role::Assistant, + MessageRole::System => Role::System, + MessageRole::Developer => Role::System, // Map developer to system + }; + + // Extract text from message content + let content = match &msg.content { + crate::apis::openai_responses::MessageContent::Text(text) => text.clone(), + crate::apis::openai_responses::MessageContent::Items(items) => { + items.iter() + .filter_map(|c| { + if let InputContent::InputText { text } = c { + Some(text.clone()) + } else { + None + } + }) + .collect::>() + .join("\n") + } + }; + + openai_messages.push(Message { + role, + content: MessageContent::Text(content), + name: None, + tool_calls: None, + tool_call_id: None, + }); + } + // Skip other input item types for now + InputItem::ItemReference { .. } | InputItem::FunctionCallOutput { .. } => { + // These are not yet supported in agent framework + } + } + } + } + } + + openai_messages + } + + fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) { + // For ResponsesAPI, we need to convert messages back to input format + // Extract system messages as instructions + let system_text = messages.iter() + .filter(|msg| msg.role == crate::apis::openai::Role::System) + .filter_map(|msg| { + if let crate::apis::openai::MessageContent::Text(text) = &msg.content { + Some(text.as_str()) + } else { + None + } + }) + .collect::>() + .join("\n"); + + if !system_text.is_empty() { + self.instructions = Some(system_text); + } + + // Convert user/assistant messages to InputParam + // For simplicity, we'll use the last user message as the input + // or combine all non-system messages + let input_messages: Vec<_> = messages.iter() + .filter(|msg| msg.role != crate::apis::openai::Role::System) + .collect(); + + if !input_messages.is_empty() { + // If there's only one message, use Text format + if input_messages.len() == 1 { + if let crate::apis::openai::MessageContent::Text(text) = &input_messages[0].content { + self.input = crate::apis::openai_responses::InputParam::Text(text.clone()); + } + } else { + // Multiple messages - combine them as text for now + // A more sophisticated approach would use InputParam::Items + let combined_text = input_messages.iter() + .filter_map(|msg| { + if let crate::apis::openai::MessageContent::Text(text) = &msg.content { + Some(format!("{}: {}", + match msg.role { + crate::apis::openai::Role::User => "User", + crate::apis::openai::Role::Assistant => "Assistant", + _ => "Unknown", + }, + text + )) + } else { + None + } + }) + .collect::>() + .join("\n"); + + self.input = crate::apis::openai_responses::InputParam::Text(combined_text); + } + } + } } // ============================================================================ diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index f8d26f68..7cd951c3 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -47,360 +47,26 @@ pub trait ProviderRequest: Send + Sync { fn remove_metadata_key(&mut self, key: &str) -> bool; fn get_temperature(&self) -> Option; + + /// Get message history as OpenAI Message format + /// This is useful for processing chat history across different provider formats + fn get_messages(&self) -> Vec; + + /// Set message history from OpenAI Message format + /// This converts OpenAI messages to the appropriate format for each provider type + fn set_messages(&mut self, messages: &[crate::apis::openai::Message]); } impl ProviderRequestType { - /// Get message history as OpenAI Message format - /// This is useful for processing chat history across different provider formats - pub fn get_message_history(&self) -> Vec { - use crate::apis::openai::{Message, MessageContent, Role}; - - match self { - Self::ChatCompletionsRequest(r) => r.messages.clone(), - Self::MessagesRequest(r) => { - // Convert Anthropic messages to OpenAI format - let mut openai_messages = Vec::new(); - - // Add system prompt as system message if present - if let Some(system) = &r.system { - openai_messages.push(system.clone().into()); - } - - // Convert each Anthropic message to OpenAI format - for msg in &r.messages { - if let Ok(converted_msgs) = TryInto::>::try_into(msg.clone()) { - openai_messages.extend(converted_msgs); - } - } - - openai_messages - } - Self::BedrockConverse(r) => { - // Convert Bedrock messages to OpenAI format - let mut openai_messages = Vec::new(); - - // Add system messages if present - if let Some(system) = &r.system { - for sys_block in system { - match sys_block { - crate::apis::amazon_bedrock::SystemContentBlock::Text { text } => { - openai_messages.push(Message { - role: Role::System, - content: MessageContent::Text(text.clone()), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - _ => {} // Skip other system content types - } - } - } - - // Convert conversation messages - if let Some(messages) = &r.messages { - for msg in messages { - let role = match msg.role { - crate::apis::amazon_bedrock::ConversationRole::User => Role::User, - crate::apis::amazon_bedrock::ConversationRole::Assistant => Role::Assistant, - }; - - // Extract text from content blocks - let content = msg.content.iter() - .filter_map(|block| { - if let crate::apis::amazon_bedrock::ContentBlock::Text { text } = block { - Some(text.clone()) - } else { - None - } - }) - .collect::>() - .join("\n"); - - openai_messages.push(Message { - role, - content: MessageContent::Text(content), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - } - - openai_messages - } - Self::BedrockConverseStream(r) => { - // Same as BedrockConverse - let mut openai_messages = Vec::new(); - - if let Some(system) = &r.system { - for sys_block in system { - match sys_block { - crate::apis::amazon_bedrock::SystemContentBlock::Text { text } => { - openai_messages.push(Message { - role: Role::System, - content: MessageContent::Text(text.clone()), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - _ => {} // Skip other system content types - } - } - } - - if let Some(messages) = &r.messages { - for msg in messages { - let role = match msg.role { - crate::apis::amazon_bedrock::ConversationRole::User => Role::User, - crate::apis::amazon_bedrock::ConversationRole::Assistant => Role::Assistant, - }; - - let content = msg.content.iter() - .filter_map(|block| { - if let crate::apis::amazon_bedrock::ContentBlock::Text { text } = block { - Some(text.clone()) - } else { - None - } - }) - .collect::>() - .join("\n"); - - openai_messages.push(Message { - role, - content: MessageContent::Text(content), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - } - - openai_messages - } - Self::ResponsesAPIRequest(r) => { - // Convert ResponsesAPIRequest input to a user message - let mut openai_messages = Vec::new(); - - // Add instructions as system message if present - if let Some(instructions) = &r.instructions { - openai_messages.push(Message { - role: Role::System, - content: MessageContent::Text(instructions.clone()), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - - // Convert input to messages - use crate::apis::openai_responses::{InputParam, InputItem}; - match &r.input { - InputParam::Text(text) => { - openai_messages.push(Message { - role: Role::User, - content: MessageContent::Text(text.clone()), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - InputParam::Items(items) => { - for item in items { - match item { - InputItem::Message(msg) => { - // Convert message role - let role = match msg.role { - crate::apis::openai_responses::MessageRole::User => Role::User, - crate::apis::openai_responses::MessageRole::Assistant => Role::Assistant, - crate::apis::openai_responses::MessageRole::System => Role::System, - crate::apis::openai_responses::MessageRole::Developer => Role::System, // Map developer to system - }; - - // Extract text from message content - let content = match &msg.content { - crate::apis::openai_responses::MessageContent::Text(text) => text.clone(), - crate::apis::openai_responses::MessageContent::Items(items) => { - items.iter() - .filter_map(|c| { - if let crate::apis::openai_responses::InputContent::InputText { text } = c { - Some(text.clone()) - } else { - None - } - }) - .collect::>() - .join("\n") - } - }; - - openai_messages.push(Message { - role, - content: MessageContent::Text(content), - name: None, - tool_calls: None, - tool_call_id: None, - }); - } - // Skip other input item types for now - InputItem::ItemReference { .. } | InputItem::FunctionCallOutput { .. } => { - // These are not yet supported in agent framework - } - } - } - } - } - - openai_messages - } - } - } - /// Set message history from OpenAI Message format /// This converts OpenAI messages to the appropriate format for each provider type pub fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) { match self { - Self::ChatCompletionsRequest(r) => { - r.messages = messages.to_vec(); - } - Self::MessagesRequest(r) => { - // Convert OpenAI messages to Anthropic format - // Separate system messages from regular messages - let mut system_messages = Vec::new(); - let mut regular_messages = Vec::new(); - - for msg in messages { - if msg.role == crate::apis::openai::Role::System { - system_messages.push(msg.clone()); - } else { - regular_messages.push(msg.clone()); - } - } - - // Set system prompt if there are system messages - if !system_messages.is_empty() { - // Combine all system messages into one - let system_text = system_messages.iter() - .filter_map(|msg| { - if let crate::apis::openai::MessageContent::Text(text) = &msg.content { - Some(text.as_str()) - } else { - None - } - }) - .collect::>() - .join("\n"); - - r.system = Some(crate::apis::anthropic::MessagesSystemPrompt::Single(system_text)); - } - - // Convert regular messages - r.messages = regular_messages.iter() - .filter_map(|msg| { - msg.clone().try_into().ok() - }) - .collect(); - } - Self::BedrockConverse(r) | Self::BedrockConverseStream(r) => { - // Convert OpenAI messages to Bedrock format - use crate::apis::amazon_bedrock::{ContentBlock, ConversationRole, SystemContentBlock}; - - let mut system_blocks = Vec::new(); - let mut bedrock_messages = Vec::new(); - - for msg in messages { - match msg.role { - crate::apis::openai::Role::System => { - if let crate::apis::openai::MessageContent::Text(text) = &msg.content { - system_blocks.push(SystemContentBlock::Text { text: text.clone() }); - } - } - crate::apis::openai::Role::User | crate::apis::openai::Role::Assistant => { - let role = match msg.role { - crate::apis::openai::Role::User => ConversationRole::User, - crate::apis::openai::Role::Assistant => ConversationRole::Assistant, - _ => continue, - }; - - let content = if let crate::apis::openai::MessageContent::Text(text) = &msg.content { - vec![ContentBlock::Text { text: text.clone() }] - } else { - vec![] - }; - - bedrock_messages.push(crate::apis::amazon_bedrock::Message { - role, - content, - }); - } - _ => {} - } - } - - if !system_blocks.is_empty() { - r.system = Some(system_blocks); - } - r.messages = Some(bedrock_messages); - } - Self::ResponsesAPIRequest(r) => { - // For ResponsesAPI, we need to convert messages back to input format - // Extract system messages as instructions - let system_text = messages.iter() - .filter(|msg| msg.role == crate::apis::openai::Role::System) - .filter_map(|msg| { - if let crate::apis::openai::MessageContent::Text(text) = &msg.content { - Some(text.as_str()) - } else { - None - } - }) - .collect::>() - .join("\n"); - - if !system_text.is_empty() { - r.instructions = Some(system_text); - } - - // Convert user/assistant messages to InputParam - // For simplicity, we'll use the last user message as the input - // or combine all non-system messages - let input_messages: Vec<_> = messages.iter() - .filter(|msg| msg.role != crate::apis::openai::Role::System) - .collect(); - - if !input_messages.is_empty() { - // If there's only one message, use Text format - if input_messages.len() == 1 { - if let crate::apis::openai::MessageContent::Text(text) = &input_messages[0].content { - r.input = crate::apis::openai_responses::InputParam::Text(text.clone()); - } - } else { - // Multiple messages - combine them as text for now - // A more sophisticated approach would use InputParam::Items - let combined_text = input_messages.iter() - .filter_map(|msg| { - if let crate::apis::openai::MessageContent::Text(text) = &msg.content { - Some(format!("{}: {}", - match msg.role { - crate::apis::openai::Role::User => "User", - crate::apis::openai::Role::Assistant => "Assistant", - _ => "Unknown", - }, - text - )) - } else { - None - } - }) - .collect::>() - .join("\n"); - - r.input = crate::apis::openai_responses::InputParam::Text(combined_text); - } - } - } + Self::ChatCompletionsRequest(r) => r.set_messages(messages), + Self::MessagesRequest(r) => r.set_messages(messages), + Self::BedrockConverse(r) => r.set_messages(messages), + Self::BedrockConverseStream(r) => r.set_messages(messages), + Self::ResponsesAPIRequest(r) => r.set_messages(messages), } } } @@ -505,6 +171,26 @@ impl ProviderRequest for ProviderRequestType { Self::ResponsesAPIRequest(r) => r.get_temperature(), } } + + fn get_messages(&self) -> Vec { + match self { + Self::ChatCompletionsRequest(r) => r.get_messages(), + Self::MessagesRequest(r) => r.get_messages(), + Self::BedrockConverse(r) => r.get_messages(), + Self::BedrockConverseStream(r) => r.get_messages(), + Self::ResponsesAPIRequest(r) => r.get_messages(), + } + } + + fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) { + match self { + Self::ChatCompletionsRequest(r) => r.set_messages(messages), + Self::MessagesRequest(r) => r.set_messages(messages), + Self::BedrockConverse(r) => r.set_messages(messages), + Self::BedrockConverseStream(r) => r.set_messages(messages), + Self::ResponsesAPIRequest(r) => r.set_messages(messages), + } + } } /// Parse the client API from a byte slice. @@ -1317,7 +1003,7 @@ mod tests { }; let provider_req = ProviderRequestType::ChatCompletionsRequest(chat_req); - let messages = provider_req.get_message_history(); + let messages = provider_req.get_messages(); assert_eq!(messages.len(), 2); assert_eq!(messages[0].role, Role::System); @@ -1356,7 +1042,7 @@ mod tests { }; let provider_req = ProviderRequestType::MessagesRequest(anthropic_req); - let messages = provider_req.get_message_history(); + let messages = provider_req.get_messages(); // Should have system message + user message assert_eq!(messages.len(), 2); @@ -1404,7 +1090,7 @@ mod tests { }; let provider_req = ProviderRequestType::ResponsesAPIRequest(responses_req); - let messages = provider_req.get_message_history(); + let messages = provider_req.get_messages(); // Should have system message (instructions) + user message (input) assert_eq!(messages.len(), 2); diff --git a/demos/use_cases/rag_agent/README.md b/demos/use_cases/mcp_filter/README.md similarity index 99% rename from demos/use_cases/rag_agent/README.md rename to demos/use_cases/mcp_filter/README.md index 9f83dea8..a524c1b4 100644 --- a/demos/use_cases/rag_agent/README.md +++ b/demos/use_cases/mcp_filter/README.md @@ -58,7 +58,7 @@ curl -X POST http://localhost:8001/v1/chat/completions \ The `arch_config.yaml` defines how agents are connected: ```yaml -agent_filters: +filters: - id: query_rewriter url: mcp://host.docker.internal:10500 tool: rewrite_query_with_archgw # MCP tool name diff --git a/demos/use_cases/rag_agent/arch_config.yaml b/demos/use_cases/mcp_filter/arch_config.yaml similarity index 98% rename from demos/use_cases/rag_agent/arch_config.yaml rename to demos/use_cases/mcp_filter/arch_config.yaml index d74fb39d..e5aacc03 100644 --- a/demos/use_cases/rag_agent/arch_config.yaml +++ b/demos/use_cases/mcp_filter/arch_config.yaml @@ -4,7 +4,7 @@ agents: - id: rag_agent url: http://host.docker.internal:10505 -agent_filters: +filters: - id: query_rewriter url: http://host.docker.internal:10501 # type: mcp # default is mcp diff --git a/demos/use_cases/rag_agent/docker-compose.yaml b/demos/use_cases/mcp_filter/docker-compose.yaml similarity index 100% rename from demos/use_cases/rag_agent/docker-compose.yaml rename to demos/use_cases/mcp_filter/docker-compose.yaml diff --git a/demos/use_cases/rag_agent/mcp_query.rest b/demos/use_cases/mcp_filter/mcp_query.rest similarity index 100% rename from demos/use_cases/rag_agent/mcp_query.rest rename to demos/use_cases/mcp_filter/mcp_query.rest diff --git a/demos/use_cases/rag_agent/pyproject.toml b/demos/use_cases/mcp_filter/pyproject.toml similarity index 100% rename from demos/use_cases/rag_agent/pyproject.toml rename to demos/use_cases/mcp_filter/pyproject.toml diff --git a/demos/use_cases/rag_agent/sample_queries.md b/demos/use_cases/mcp_filter/sample_queries.md similarity index 100% rename from demos/use_cases/rag_agent/sample_queries.md rename to demos/use_cases/mcp_filter/sample_queries.md diff --git a/demos/use_cases/rag_agent/src/rag_agent/__init__.py b/demos/use_cases/mcp_filter/src/rag_agent/__init__.py similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/__init__.py rename to demos/use_cases/mcp_filter/src/rag_agent/__init__.py diff --git a/demos/use_cases/rag_agent/src/rag_agent/__main__.py b/demos/use_cases/mcp_filter/src/rag_agent/__main__.py similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/__main__.py rename to demos/use_cases/mcp_filter/src/rag_agent/__main__.py diff --git a/demos/use_cases/rag_agent/src/rag_agent/api.py b/demos/use_cases/mcp_filter/src/rag_agent/api.py similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/api.py rename to demos/use_cases/mcp_filter/src/rag_agent/api.py diff --git a/demos/use_cases/rag_agent/src/rag_agent/context_builder.py b/demos/use_cases/mcp_filter/src/rag_agent/context_builder.py similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/context_builder.py rename to demos/use_cases/mcp_filter/src/rag_agent/context_builder.py diff --git a/demos/use_cases/rag_agent/src/rag_agent/query_rewriter.py b/demos/use_cases/mcp_filter/src/rag_agent/query_rewriter.py similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/query_rewriter.py rename to demos/use_cases/mcp_filter/src/rag_agent/query_rewriter.py diff --git a/demos/use_cases/rag_agent/src/rag_agent/rag_agent.py b/demos/use_cases/mcp_filter/src/rag_agent/rag_agent.py similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/rag_agent.py rename to demos/use_cases/mcp_filter/src/rag_agent/rag_agent.py diff --git a/demos/use_cases/rag_agent/src/rag_agent/sample_knowledge_base.csv b/demos/use_cases/mcp_filter/src/rag_agent/sample_knowledge_base.csv similarity index 100% rename from demos/use_cases/rag_agent/src/rag_agent/sample_knowledge_base.csv rename to demos/use_cases/mcp_filter/src/rag_agent/sample_knowledge_base.csv diff --git a/demos/use_cases/rag_agent/start_agents.sh b/demos/use_cases/mcp_filter/start_agents.sh similarity index 100% rename from demos/use_cases/rag_agent/start_agents.sh rename to demos/use_cases/mcp_filter/start_agents.sh diff --git a/demos/use_cases/rag_agent/test.rest b/demos/use_cases/mcp_filter/test.rest similarity index 100% rename from demos/use_cases/rag_agent/test.rest rename to demos/use_cases/mcp_filter/test.rest diff --git a/demos/use_cases/rag_agent/uv.lock b/demos/use_cases/mcp_filter/uv.lock similarity index 100% rename from demos/use_cases/rag_agent/uv.lock rename to demos/use_cases/mcp_filter/uv.lock