address pr feedback

This commit is contained in:
Adil Hafeez 2025-12-17 14:03:48 -08:00
parent fc52274836
commit bb9503e873
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
30 changed files with 477 additions and 394 deletions

View file

@ -23,7 +23,7 @@ properties:
required:
- id
- url
agent_filters:
filters:
type: array
items:
type: object

View file

@ -101,8 +101,8 @@ def validate_and_render_schema():
# Process agents section and convert to endpoints
agents = config_yaml.get("agents", [])
agent_filters = config_yaml.get("agent_filters", [])
agents_combined = agents + agent_filters
filters = config_yaml.get("filters", [])
agents_combined = agents + filters
agent_id_keys = set()
for agent in agents_combined:

View file

@ -1,8 +1,12 @@
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use bytes::Bytes;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{SpanBuilder, SpanKind, parse_traceparent};
use hermesllm::apis::OpenAIMessage;
use hermesllm::clients::SupportedAPIsFromClient;
use hermesllm::providers::request::ProviderRequest;
use hermesllm::ProviderRequestType;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
@ -14,6 +18,7 @@ use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::llm_router::RouterService;
use crate::tracing::{OperationNameBuilder, operation_component, http};
/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
@ -179,7 +184,7 @@ async fn handle_agent_chat(
}
};
let message: Vec<OpenAIMessage> = client_request.get_message_history();
let message: Vec<OpenAIMessage> = client_request.get_messages();
// let chat_completions_request: ChatCompletionsRequest =
// serde_json::from_slice(&chat_request_bytes).map_err(|err| {
@ -193,7 +198,7 @@ async fn handle_agent_chat(
// Extract trace parent for routing
let trace_parent = request_headers
.iter()
.find(|(key, _)| key.as_str() == "traceparent")
.find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER)
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
// Create agent map for pipeline processing and agent selection
@ -205,11 +210,15 @@ async fn handle_agent_chat(
// Select appropriate agent using arch router llm model
let selected_agent = agent_selector
.select_agent(&message, &listener, trace_parent)
.select_agent(&message, &listener, trace_parent.clone())
.await?;
debug!("Processing agent pipeline: {}", selected_agent.id);
// Record the start time for agent span
let agent_start_time = SystemTime::now();
let agent_start_instant = Instant::now();
// Process the filter chain
let chat_history = pipeline_processor
.process_filter_chain(
@ -222,14 +231,14 @@ async fn handle_agent_chat(
.await?;
// Get terminal agent and send final response
let terminal_agent_name = selected_agent.id;
let terminal_agent_name = selected_agent.id.clone();
let terminal_agent = agent_map.get(&terminal_agent_name).unwrap();
debug!("Processing terminal agent: {}", terminal_agent_name);
debug!("Terminal agent details: {:?}", terminal_agent);
let llm_response = pipeline_processor
.invoke_terminal_agent(
.invoke_agent(
&chat_history,
client_request,
terminal_agent,
@ -237,6 +246,47 @@ async fn handle_agent_chat(
)
.await?;
// Record agent span after processing is complete
let agent_end_time = SystemTime::now();
let agent_elapsed = agent_start_instant.elapsed();
// Build full path with /agents prefix
let full_path = format!("/agents{}", request_path);
// Build operation name: POST {full_path} {agent_name}
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path(&full_path)
.with_target(&terminal_agent_name)
.build();
// Parse trace parent to get trace_id and parent_span_id
let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent {
parse_traceparent(tp)
} else {
(String::new(), None)
};
let mut span_builder = SpanBuilder::new(&operation_name)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", terminal_agent_name.clone())
.with_attribute("duration_ms", format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0));
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if let Some(parent_id) = parent_span_id {
span_builder = span_builder.with_parent_span_id(parent_id);
}
let span = span_builder.build();
// Use plano(agent) as service name for the agent processing span
trace_collector.record_span(operation_component::AGENT, span);
// Create streaming response
response_handler
.create_streaming_response(llm_response)

View file

@ -1,6 +1,11 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub const JSON_RPC_VERSION: &str = "2.0";
pub const TOOL_CALL_METHOD : &str = "tools/call";
pub const MCP_INITIALIZE: &str = "initialize";
pub const MCP_INITIALIZE_NOTIFICATION: &str = "initialize/notification";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum JsonRpcId {

View file

@ -1,7 +1,7 @@
use std::collections::HashMap;
use common::configuration::{Agent, AgentFilterChain};
use common::consts::{ARCH_UPSTREAM_HOST_HEADER, ENVOY_RETRY_HEADER};
use common::consts::{ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER};
use common::traces::{SpanBuilder, SpanKind};
use hermesllm::{ProviderRequest, ProviderRequestType};
use hermesllm::apis::openai::{Message};
@ -10,7 +10,10 @@ use opentelemetry::trace::TraceContextExt;
use tracing::{debug, info, warn};
use std::time::{Instant, SystemTime};
use crate::handlers::jsonrpc::{JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
use crate::tracing::operation_component::{self};
use crate::tracing::{OperationNameBuilder, http};
use crate::handlers::jsonrpc::{JSON_RPC_VERSION, JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD};
use uuid::Uuid;
/// Errors that can occur during pipeline processing
@ -107,12 +110,22 @@ impl PipelineProcessor {
) {
let (trace_id, parent_span_id) = self.extract_trace_context();
let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name))
.with_kind(SpanKind::Internal)
// Build operation name: POST /agents/* {filter_name}
// Using generic path since we don't have access to specific endpoint here
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path("/agents/*")
.with_target(agent_name)
.build();
let mut span_builder = SpanBuilder::new(&operation_name)
.with_kind(SpanKind::Client)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute("filter_name", agent_name.to_string())
.with_attribute("tool_name", tool_name.to_string())
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/*")
.with_attribute("filter.name", agent_name.to_string())
.with_attribute("filter.tool_name", tool_name.to_string())
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
if !trace_id.is_empty() {
@ -123,7 +136,8 @@ impl PipelineProcessor {
}
let span = span_builder.build();
collector.record_span("brightstaff", span);
// Use plano(filter) as service name for filter execution spans
collector.record_span(operation_component::AGENT_FILTER, span);
}
/// Record a span for MCP protocol interactions
@ -139,10 +153,20 @@ impl PipelineProcessor {
) {
let (trace_id, parent_span_id) = self.extract_trace_context();
let mut span_builder = SpanBuilder::new(format!("mcp_{}", operation))
// Build operation name: POST /mcp {agent_id}
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path("/mcp")
.with_operation(operation)
.with_target(agent_id)
.build();
let mut span_builder = SpanBuilder::new(&operation_name)
.with_kind(SpanKind::Client)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, &format!("/mcp ({})", operation.to_string()))
.with_attribute("mcp.operation", operation.to_string())
.with_attribute("mcp.agent_id", agent_id.to_string())
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
@ -161,7 +185,8 @@ impl PipelineProcessor {
}
let span = span_builder.build();
collector.record_span("brightstaff", span);
// MCP spans also use plano(filter) service name as they are part of filter operations
collector.record_span(operation_component::AGENT_FILTER, span);
}
/// Process the filter chain of agents (all except the terminal agent)
@ -336,20 +361,21 @@ impl PipelineProcessor {
tool_name: &str,
messages: &[Message],
) -> Result<JsonRpcRequest, PipelineError> {
let arguments = serde_json::json!({
"messages": messages
});
let mut arguments = HashMap::new();
arguments.insert(
"messages".to_string(),
serde_json::to_value(messages)?,
);
let params = serde_json::json!({
"name": tool_name,
"arguments": arguments
});
let mut params = HashMap::new();
params.insert("name".to_string(), serde_json::to_value(tool_name)?);
params.insert("arguments".to_string(), serde_json::to_value(arguments)?);
Ok(JsonRpcRequest {
jsonrpc: "2.0".to_string(),
jsonrpc: JSON_RPC_VERSION.to_string(),
id: JsonRpcId::String(Uuid::new_v4().to_string()),
method: "tools/call".to_string(),
params: Some(serde_json::from_value(params)?),
method: TOOL_CALL_METHOD.to_string(),
params: Some(params),
})
}
@ -479,9 +505,9 @@ impl PipelineProcessor {
/// Build an initialize JSON-RPC request
fn build_initialize_request(&self) -> JsonRpcRequest {
JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: JsonRpcId::Number(1),
method: "initialize".to_string(),
jsonrpc: JSON_RPC_VERSION.to_string(),
id: JsonRpcId::String(Uuid::new_v4().to_string()),
method: MCP_INITIALIZE.to_string(),
params: Some({
let mut params = HashMap::new();
params.insert(
@ -492,7 +518,7 @@ impl PipelineProcessor {
params.insert(
"clientInfo".to_string(),
serde_json::json!({
"name": "brightstaff",
"name": BRIGHT_STAFF_SERVICE_NAME,
"version": "1.0.0"
}),
);
@ -509,8 +535,8 @@ impl PipelineProcessor {
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
) -> Result<(), PipelineError> {
let initialized_notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/initialized".to_string(),
jsonrpc: JSON_RPC_VERSION.to_string(),
method: MCP_INITIALIZE_NOTIFICATION.to_string(),
params: None,
};
@ -616,7 +642,7 @@ impl PipelineProcessor {
}
/// Send request to terminal agent and return the raw response for streaming
pub async fn invoke_terminal_agent(
pub async fn invoke_agent(
&self,
messages: &[Message],
mut original_request: ProviderRequestType,
@ -786,7 +812,7 @@ mod tests {
#[tokio::test]
async fn test_execute_filter_mcp_error_flag() {
let rpc_body = serde_json::json!({
"jsonrpc": "2.0",
"jsonrpc": JSON_RPC_VERSION,
"id": "1",
"result": {
"isError": true,

View file

@ -60,18 +60,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let arch_config = Arc::new(config);
// combine agents and agent_filters into a single list of agents
// combine agents and filters into a single list of agents
let all_agents: Vec<Agent> = arch_config
.agents
.as_deref()
.unwrap_or_default()
.iter()
.chain(arch_config.agent_filters.as_deref().unwrap_or_default())
.chain(arch_config.filters.as_deref().unwrap_or_default())
.cloned()
.collect();
let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone()));
let agents_list = Arc::new(RwLock::new(Some(all_agents)));
let combined_agents_filters_list = Arc::new(RwLock::new(Some(all_agents)));
let listeners = Arc::new(RwLock::new(arch_config.listeners.clone()));
let llm_provider_url =
env::var("LLM_PROVIDER_ENDPOINT").unwrap_or_else(|_| "http://localhost:12001".to_string());
@ -125,7 +125,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_provider_url = llm_provider_url.clone();
let llm_providers = llm_providers.clone();
let agents_list = agents_list.clone();
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let service = service_fn(move |req| {

View file

@ -203,6 +203,7 @@ pub mod operation_component {
pub struct OperationNameBuilder {
method: Option<String>,
path: Option<String>,
operation: Option<String>,
target: Option<String>,
}
@ -212,6 +213,7 @@ impl OperationNameBuilder {
Self {
method: None,
path: None,
operation: None,
target: None,
}
}
@ -234,6 +236,15 @@ impl OperationNameBuilder {
self
}
/// Set the operation type (optional, for MCP operations)
///
/// # Arguments
/// * `operation` - Operation type (e.g., "tool_call", "session_init", "notification")
pub fn with_operation(mut self, operation: impl Into<String>) -> Self {
self.operation = Some(operation.into());
self
}
/// Set the target (model name, agent name, or filter name)
///
/// # Arguments
@ -246,7 +257,8 @@ impl OperationNameBuilder {
/// Build the operation name string
///
/// # Format
/// - With all components: `{method} {path} {target}`
/// - With all components: `{method} {path} ({operation}) {target}`
/// - Without operation: `{method} {path} {target}`
/// - Without target: `{method} {path}`
/// - Without path: `{method}`
/// - Empty: returns empty string
@ -258,7 +270,11 @@ impl OperationNameBuilder {
}
if let Some(path) = self.path {
parts.push(path);
if let Some(operation) = self.operation {
parts.push(format!("{} ({})", path, operation));
} else {
parts.push(path);
}
}
if let Some(target) = self.target {

View file

@ -60,7 +60,7 @@ pub struct Configuration {
pub mode: Option<GatewayMode>,
pub routing: Option<Routing>,
pub agents: Option<Vec<Agent>>,
pub agent_filters: Option<Vec<Agent>>,
pub filters: Option<Vec<Agent>>,
pub listeners: Vec<Listener>,
}

View file

@ -32,3 +32,4 @@ pub const OTEL_COLLECTOR_HTTP: &str = "opentelemetry_collector_http";
pub const OTEL_POST_PATH: &str = "/v1/traces";
pub const LLM_ROUTE_HEADER: &str = "x-arch-llm-route";
pub const ENVOY_RETRY_HEADER: &str = "x-envoy-max-retries";
pub const BRIGHT_STAFF_SERVICE_NAME : &str = "brightstaff";

View file

@ -233,6 +233,104 @@ impl ProviderRequest for ConverseRequest {
fn get_temperature(&self) -> Option<f32> {
self.inference_config.as_ref()?.temperature
}
fn get_messages(&self) -> Vec<crate::apis::openai::Message> {
use crate::apis::openai::{Message, MessageContent, Role};
let mut openai_messages = Vec::new();
// Add system messages if present
if let Some(system) = &self.system {
for sys_block in system {
match sys_block {
SystemContentBlock::Text { text } => {
openai_messages.push(Message {
role: Role::System,
content: MessageContent::Text(text.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
_ => {} // Skip other system content types
}
}
}
// Convert conversation messages
if let Some(messages) = &self.messages {
for msg in messages {
let role = match msg.role {
ConversationRole::User => Role::User,
ConversationRole::Assistant => Role::Assistant,
};
// Extract text from content blocks
let content = msg.content.iter()
.filter_map(|block| {
if let ContentBlock::Text { text } = block {
Some(text.clone())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
openai_messages.push(Message {
role,
content: MessageContent::Text(content),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
}
openai_messages
}
fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) {
// Convert OpenAI messages to Bedrock format
use crate::apis::amazon_bedrock::{ContentBlock, ConversationRole, SystemContentBlock};
let mut system_blocks = Vec::new();
let mut bedrock_messages = Vec::new();
for msg in messages {
match msg.role {
crate::apis::openai::Role::System => {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
system_blocks.push(SystemContentBlock::Text { text: text.clone() });
}
}
crate::apis::openai::Role::User | crate::apis::openai::Role::Assistant => {
let role = match msg.role {
crate::apis::openai::Role::User => ConversationRole::User,
crate::apis::openai::Role::Assistant => ConversationRole::Assistant,
_ => continue,
};
let content = if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
vec![ContentBlock::Text { text: text.clone() }]
} else {
vec![]
};
bedrock_messages.push(crate::apis::amazon_bedrock::Message {
role,
content,
});
}
_ => {}
}
}
if !system_blocks.is_empty() {
self.system = Some(system_blocks);
}
self.messages = Some(bedrock_messages);
}
}
// ============================================================================

View file

@ -541,6 +541,65 @@ impl ProviderRequest for MessagesRequest {
fn get_temperature(&self) -> Option<f32> {
self.temperature
}
fn get_messages(&self) -> Vec<crate::apis::openai::Message> {
use crate::apis::openai::Message;
let mut openai_messages = Vec::new();
// Add system prompt as system message if present
if let Some(system) = &self.system {
openai_messages.push(system.clone().into());
}
// Convert each Anthropic message to OpenAI format
for msg in &self.messages {
if let Ok(converted_msgs) = TryInto::<Vec<Message>>::try_into(msg.clone()) {
openai_messages.extend(converted_msgs);
}
}
openai_messages
}
fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) {
// Convert OpenAI messages to Anthropic format
// Separate system messages from regular messages
let mut system_messages = Vec::new();
let mut regular_messages = Vec::new();
for msg in messages {
if msg.role == crate::apis::openai::Role::System {
system_messages.push(msg.clone());
} else {
regular_messages.push(msg.clone());
}
}
// Set system prompt if there are system messages
if !system_messages.is_empty() {
// Combine all system messages into one
let system_text = system_messages.iter()
.filter_map(|msg| {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
self.system = Some(crate::apis::anthropic::MessagesSystemPrompt::Single(system_text));
}
// Convert regular messages
self.messages = regular_messages.iter()
.filter_map(|msg| {
msg.clone().try_into().ok()
})
.collect();
}
}
impl MessagesResponse {

View file

@ -735,6 +735,14 @@ impl ProviderRequest for ChatCompletionsRequest {
fn get_temperature(&self) -> Option<f32> {
self.temperature
}
fn get_messages(&self) -> Vec<crate::apis::openai::Message> {
self.messages.clone()
}
fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) {
self.messages = messages.to_vec();
}
}
/// Implementation of ProviderResponse for ChatCompletionsResponse

View file

@ -1134,6 +1134,140 @@ impl ProviderRequest for ResponsesAPIRequest {
fn get_temperature(&self) -> Option<f32> {
self.temperature
}
fn get_messages(&self) -> Vec<crate::apis::openai::Message> {
use crate::apis::openai::{Message, MessageContent, Role};
let mut openai_messages = Vec::new();
// Add instructions as system message if present
if let Some(instructions) = &self.instructions {
openai_messages.push(Message {
role: Role::System,
content: MessageContent::Text(instructions.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
// Convert input to messages
match &self.input {
InputParam::Text(text) => {
openai_messages.push(Message {
role: Role::User,
content: MessageContent::Text(text.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
InputParam::Items(items) => {
for item in items {
match item {
InputItem::Message(msg) => {
// Convert message role
let role = match msg.role {
MessageRole::User => Role::User,
MessageRole::Assistant => Role::Assistant,
MessageRole::System => Role::System,
MessageRole::Developer => Role::System, // Map developer to system
};
// Extract text from message content
let content = match &msg.content {
crate::apis::openai_responses::MessageContent::Text(text) => text.clone(),
crate::apis::openai_responses::MessageContent::Items(items) => {
items.iter()
.filter_map(|c| {
if let InputContent::InputText { text } = c {
Some(text.clone())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n")
}
};
openai_messages.push(Message {
role,
content: MessageContent::Text(content),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
// Skip other input item types for now
InputItem::ItemReference { .. } | InputItem::FunctionCallOutput { .. } => {
// These are not yet supported in agent framework
}
}
}
}
}
openai_messages
}
fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) {
// For ResponsesAPI, we need to convert messages back to input format
// Extract system messages as instructions
let system_text = messages.iter()
.filter(|msg| msg.role == crate::apis::openai::Role::System)
.filter_map(|msg| {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
if !system_text.is_empty() {
self.instructions = Some(system_text);
}
// Convert user/assistant messages to InputParam
// For simplicity, we'll use the last user message as the input
// or combine all non-system messages
let input_messages: Vec<_> = messages.iter()
.filter(|msg| msg.role != crate::apis::openai::Role::System)
.collect();
if !input_messages.is_empty() {
// If there's only one message, use Text format
if input_messages.len() == 1 {
if let crate::apis::openai::MessageContent::Text(text) = &input_messages[0].content {
self.input = crate::apis::openai_responses::InputParam::Text(text.clone());
}
} else {
// Multiple messages - combine them as text for now
// A more sophisticated approach would use InputParam::Items
let combined_text = input_messages.iter()
.filter_map(|msg| {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
Some(format!("{}: {}",
match msg.role {
crate::apis::openai::Role::User => "User",
crate::apis::openai::Role::Assistant => "Assistant",
_ => "Unknown",
},
text
))
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
self.input = crate::apis::openai_responses::InputParam::Text(combined_text);
}
}
}
}
// ============================================================================

View file

@ -47,360 +47,26 @@ pub trait ProviderRequest: Send + Sync {
fn remove_metadata_key(&mut self, key: &str) -> bool;
fn get_temperature(&self) -> Option<f32>;
/// Get message history as OpenAI Message format
/// This is useful for processing chat history across different provider formats
fn get_messages(&self) -> Vec<crate::apis::openai::Message>;
/// Set message history from OpenAI Message format
/// This converts OpenAI messages to the appropriate format for each provider type
fn set_messages(&mut self, messages: &[crate::apis::openai::Message]);
}
impl ProviderRequestType {
/// Get message history as OpenAI Message format
/// This is useful for processing chat history across different provider formats
pub fn get_message_history(&self) -> Vec<crate::apis::openai::Message> {
use crate::apis::openai::{Message, MessageContent, Role};
match self {
Self::ChatCompletionsRequest(r) => r.messages.clone(),
Self::MessagesRequest(r) => {
// Convert Anthropic messages to OpenAI format
let mut openai_messages = Vec::new();
// Add system prompt as system message if present
if let Some(system) = &r.system {
openai_messages.push(system.clone().into());
}
// Convert each Anthropic message to OpenAI format
for msg in &r.messages {
if let Ok(converted_msgs) = TryInto::<Vec<Message>>::try_into(msg.clone()) {
openai_messages.extend(converted_msgs);
}
}
openai_messages
}
Self::BedrockConverse(r) => {
// Convert Bedrock messages to OpenAI format
let mut openai_messages = Vec::new();
// Add system messages if present
if let Some(system) = &r.system {
for sys_block in system {
match sys_block {
crate::apis::amazon_bedrock::SystemContentBlock::Text { text } => {
openai_messages.push(Message {
role: Role::System,
content: MessageContent::Text(text.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
_ => {} // Skip other system content types
}
}
}
// Convert conversation messages
if let Some(messages) = &r.messages {
for msg in messages {
let role = match msg.role {
crate::apis::amazon_bedrock::ConversationRole::User => Role::User,
crate::apis::amazon_bedrock::ConversationRole::Assistant => Role::Assistant,
};
// Extract text from content blocks
let content = msg.content.iter()
.filter_map(|block| {
if let crate::apis::amazon_bedrock::ContentBlock::Text { text } = block {
Some(text.clone())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
openai_messages.push(Message {
role,
content: MessageContent::Text(content),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
}
openai_messages
}
Self::BedrockConverseStream(r) => {
// Same as BedrockConverse
let mut openai_messages = Vec::new();
if let Some(system) = &r.system {
for sys_block in system {
match sys_block {
crate::apis::amazon_bedrock::SystemContentBlock::Text { text } => {
openai_messages.push(Message {
role: Role::System,
content: MessageContent::Text(text.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
_ => {} // Skip other system content types
}
}
}
if let Some(messages) = &r.messages {
for msg in messages {
let role = match msg.role {
crate::apis::amazon_bedrock::ConversationRole::User => Role::User,
crate::apis::amazon_bedrock::ConversationRole::Assistant => Role::Assistant,
};
let content = msg.content.iter()
.filter_map(|block| {
if let crate::apis::amazon_bedrock::ContentBlock::Text { text } = block {
Some(text.clone())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
openai_messages.push(Message {
role,
content: MessageContent::Text(content),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
}
openai_messages
}
Self::ResponsesAPIRequest(r) => {
// Convert ResponsesAPIRequest input to a user message
let mut openai_messages = Vec::new();
// Add instructions as system message if present
if let Some(instructions) = &r.instructions {
openai_messages.push(Message {
role: Role::System,
content: MessageContent::Text(instructions.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
// Convert input to messages
use crate::apis::openai_responses::{InputParam, InputItem};
match &r.input {
InputParam::Text(text) => {
openai_messages.push(Message {
role: Role::User,
content: MessageContent::Text(text.clone()),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
InputParam::Items(items) => {
for item in items {
match item {
InputItem::Message(msg) => {
// Convert message role
let role = match msg.role {
crate::apis::openai_responses::MessageRole::User => Role::User,
crate::apis::openai_responses::MessageRole::Assistant => Role::Assistant,
crate::apis::openai_responses::MessageRole::System => Role::System,
crate::apis::openai_responses::MessageRole::Developer => Role::System, // Map developer to system
};
// Extract text from message content
let content = match &msg.content {
crate::apis::openai_responses::MessageContent::Text(text) => text.clone(),
crate::apis::openai_responses::MessageContent::Items(items) => {
items.iter()
.filter_map(|c| {
if let crate::apis::openai_responses::InputContent::InputText { text } = c {
Some(text.clone())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n")
}
};
openai_messages.push(Message {
role,
content: MessageContent::Text(content),
name: None,
tool_calls: None,
tool_call_id: None,
});
}
// Skip other input item types for now
InputItem::ItemReference { .. } | InputItem::FunctionCallOutput { .. } => {
// These are not yet supported in agent framework
}
}
}
}
}
openai_messages
}
}
}
/// Set message history from OpenAI Message format
/// This converts OpenAI messages to the appropriate format for each provider type
pub fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) {
match self {
Self::ChatCompletionsRequest(r) => {
r.messages = messages.to_vec();
}
Self::MessagesRequest(r) => {
// Convert OpenAI messages to Anthropic format
// Separate system messages from regular messages
let mut system_messages = Vec::new();
let mut regular_messages = Vec::new();
for msg in messages {
if msg.role == crate::apis::openai::Role::System {
system_messages.push(msg.clone());
} else {
regular_messages.push(msg.clone());
}
}
// Set system prompt if there are system messages
if !system_messages.is_empty() {
// Combine all system messages into one
let system_text = system_messages.iter()
.filter_map(|msg| {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
r.system = Some(crate::apis::anthropic::MessagesSystemPrompt::Single(system_text));
}
// Convert regular messages
r.messages = regular_messages.iter()
.filter_map(|msg| {
msg.clone().try_into().ok()
})
.collect();
}
Self::BedrockConverse(r) | Self::BedrockConverseStream(r) => {
// Convert OpenAI messages to Bedrock format
use crate::apis::amazon_bedrock::{ContentBlock, ConversationRole, SystemContentBlock};
let mut system_blocks = Vec::new();
let mut bedrock_messages = Vec::new();
for msg in messages {
match msg.role {
crate::apis::openai::Role::System => {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
system_blocks.push(SystemContentBlock::Text { text: text.clone() });
}
}
crate::apis::openai::Role::User | crate::apis::openai::Role::Assistant => {
let role = match msg.role {
crate::apis::openai::Role::User => ConversationRole::User,
crate::apis::openai::Role::Assistant => ConversationRole::Assistant,
_ => continue,
};
let content = if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
vec![ContentBlock::Text { text: text.clone() }]
} else {
vec![]
};
bedrock_messages.push(crate::apis::amazon_bedrock::Message {
role,
content,
});
}
_ => {}
}
}
if !system_blocks.is_empty() {
r.system = Some(system_blocks);
}
r.messages = Some(bedrock_messages);
}
Self::ResponsesAPIRequest(r) => {
// For ResponsesAPI, we need to convert messages back to input format
// Extract system messages as instructions
let system_text = messages.iter()
.filter(|msg| msg.role == crate::apis::openai::Role::System)
.filter_map(|msg| {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
if !system_text.is_empty() {
r.instructions = Some(system_text);
}
// Convert user/assistant messages to InputParam
// For simplicity, we'll use the last user message as the input
// or combine all non-system messages
let input_messages: Vec<_> = messages.iter()
.filter(|msg| msg.role != crate::apis::openai::Role::System)
.collect();
if !input_messages.is_empty() {
// If there's only one message, use Text format
if input_messages.len() == 1 {
if let crate::apis::openai::MessageContent::Text(text) = &input_messages[0].content {
r.input = crate::apis::openai_responses::InputParam::Text(text.clone());
}
} else {
// Multiple messages - combine them as text for now
// A more sophisticated approach would use InputParam::Items
let combined_text = input_messages.iter()
.filter_map(|msg| {
if let crate::apis::openai::MessageContent::Text(text) = &msg.content {
Some(format!("{}: {}",
match msg.role {
crate::apis::openai::Role::User => "User",
crate::apis::openai::Role::Assistant => "Assistant",
_ => "Unknown",
},
text
))
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
r.input = crate::apis::openai_responses::InputParam::Text(combined_text);
}
}
}
Self::ChatCompletionsRequest(r) => r.set_messages(messages),
Self::MessagesRequest(r) => r.set_messages(messages),
Self::BedrockConverse(r) => r.set_messages(messages),
Self::BedrockConverseStream(r) => r.set_messages(messages),
Self::ResponsesAPIRequest(r) => r.set_messages(messages),
}
}
}
@ -505,6 +171,26 @@ impl ProviderRequest for ProviderRequestType {
Self::ResponsesAPIRequest(r) => r.get_temperature(),
}
}
fn get_messages(&self) -> Vec<crate::apis::openai::Message> {
match self {
Self::ChatCompletionsRequest(r) => r.get_messages(),
Self::MessagesRequest(r) => r.get_messages(),
Self::BedrockConverse(r) => r.get_messages(),
Self::BedrockConverseStream(r) => r.get_messages(),
Self::ResponsesAPIRequest(r) => r.get_messages(),
}
}
fn set_messages(&mut self, messages: &[crate::apis::openai::Message]) {
match self {
Self::ChatCompletionsRequest(r) => r.set_messages(messages),
Self::MessagesRequest(r) => r.set_messages(messages),
Self::BedrockConverse(r) => r.set_messages(messages),
Self::BedrockConverseStream(r) => r.set_messages(messages),
Self::ResponsesAPIRequest(r) => r.set_messages(messages),
}
}
}
/// Parse the client API from a byte slice.
@ -1317,7 +1003,7 @@ mod tests {
};
let provider_req = ProviderRequestType::ChatCompletionsRequest(chat_req);
let messages = provider_req.get_message_history();
let messages = provider_req.get_messages();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, Role::System);
@ -1356,7 +1042,7 @@ mod tests {
};
let provider_req = ProviderRequestType::MessagesRequest(anthropic_req);
let messages = provider_req.get_message_history();
let messages = provider_req.get_messages();
// Should have system message + user message
assert_eq!(messages.len(), 2);
@ -1404,7 +1090,7 @@ mod tests {
};
let provider_req = ProviderRequestType::ResponsesAPIRequest(responses_req);
let messages = provider_req.get_message_history();
let messages = provider_req.get_messages();
// Should have system message (instructions) + user message (input)
assert_eq!(messages.len(), 2);

View file

@ -58,7 +58,7 @@ curl -X POST http://localhost:8001/v1/chat/completions \
The `arch_config.yaml` defines how agents are connected:
```yaml
agent_filters:
filters:
- id: query_rewriter
url: mcp://host.docker.internal:10500
tool: rewrite_query_with_archgw # MCP tool name

View file

@ -4,7 +4,7 @@ agents:
- id: rag_agent
url: http://host.docker.internal:10505
agent_filters:
filters:
- id: query_rewriter
url: http://host.docker.internal:10501
# type: mcp # default is mcp