mirror of
https://github.com/katanemo/plano.git
synced 2026-06-20 15:28:07 +02:00
use standard logging format
This commit is contained in:
parent
891f3a7413
commit
b861f41c03
18 changed files with 458 additions and 378 deletions
|
|
@ -1018,7 +1018,7 @@ static_resources:
|
||||||
- endpoint:
|
- endpoint:
|
||||||
address:
|
address:
|
||||||
socket_address:
|
socket_address:
|
||||||
address: host.docker.internal
|
address: 0.0.0.0
|
||||||
port_value: 9091
|
port_value: 9091
|
||||||
hostname: localhost
|
hostname: localhost
|
||||||
|
|
||||||
|
|
|
||||||
2
crates/.vscode/launch.json
vendored
2
crates/.vscode/launch.json
vendored
|
|
@ -13,7 +13,7 @@
|
||||||
"env": {
|
"env": {
|
||||||
"RUST_LOG": "debug",
|
"RUST_LOG": "debug",
|
||||||
"RUST_BACKTRACE": "1",
|
"RUST_BACKTRACE": "1",
|
||||||
"ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/mcp_filter/config.yaml_rendered",
|
"ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/multi_agent_with_crewai_langchain/config.yaml_rendered",
|
||||||
"OTEL_COLLECTOR_URL": "http://localhost:4317",
|
"OTEL_COLLECTOR_URL": "http://localhost:4317",
|
||||||
"OTEL_TRACING_ENABLED": "true"
|
"OTEL_TRACING_ENABLED": "true"
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ use http_body_util::combinators::BoxBody;
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper::{Request, Response};
|
use hyper::{Request, Response};
|
||||||
use serde::ser::Error as SerError;
|
use serde::ser::Error as SerError;
|
||||||
use tracing::{debug, info, instrument, warn};
|
use tracing::{debug, info, info_span, warn, Instrument};
|
||||||
|
|
||||||
use super::agent_selector::{AgentSelectionError, AgentSelector};
|
use super::agent_selector::{AgentSelectionError, AgentSelector};
|
||||||
use super::pipeline_processor::{PipelineError, PipelineProcessor};
|
use super::pipeline_processor::{PipelineError, PipelineProcessor};
|
||||||
|
|
@ -39,90 +39,117 @@ pub async fn agent_chat(
|
||||||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
match handle_agent_chat(request, orchestrator_service, agents_list, listeners).await {
|
// Extract request_id from headers or generate a new one
|
||||||
Ok(response) => Ok(response),
|
let request_id: String = match request
|
||||||
Err(err) => {
|
.headers()
|
||||||
// Check if this is a client error from the pipeline that should be cascaded
|
.get(common::consts::REQUEST_ID_HEADER)
|
||||||
if let AgentFilterChainError::Pipeline(PipelineError::ClientError {
|
.and_then(|h| h.to_str().ok())
|
||||||
agent,
|
.map(|s| s.to_string())
|
||||||
status,
|
{
|
||||||
body,
|
Some(id) => id,
|
||||||
}) = &err
|
None => uuid::Uuid::new_v4().to_string(),
|
||||||
{
|
};
|
||||||
warn!(
|
|
||||||
"Client error from agent '{}' (HTTP {}): {}",
|
|
||||||
agent, status, body
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create error response with the original status code and body
|
// Create a span with request_id that will be included in all log lines
|
||||||
let error_json = serde_json::json!({
|
let request_span = info_span!(
|
||||||
"error": "ClientError",
|
"agent_chat_handler",
|
||||||
"agent": agent,
|
request_id = %request_id,
|
||||||
"status": status,
|
|
||||||
"agent_response": body
|
|
||||||
});
|
|
||||||
|
|
||||||
let json_string = error_json.to_string();
|
|
||||||
let mut response = Response::new(ResponseHandler::create_full_body(json_string));
|
|
||||||
*response.status_mut() =
|
|
||||||
hyper::StatusCode::from_u16(*status).unwrap_or(hyper::StatusCode::BAD_REQUEST);
|
|
||||||
response.headers_mut().insert(
|
|
||||||
hyper::header::CONTENT_TYPE,
|
|
||||||
"application/json".parse().unwrap(),
|
|
||||||
);
|
|
||||||
return Ok(response);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Print detailed error information with full error chain for other errors
|
|
||||||
let mut error_chain = Vec::new();
|
|
||||||
let mut current_error: &dyn std::error::Error = &err;
|
|
||||||
|
|
||||||
// Collect the full error chain
|
|
||||||
loop {
|
|
||||||
error_chain.push(current_error.to_string());
|
|
||||||
match current_error.source() {
|
|
||||||
Some(source) => current_error = source,
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log the complete error chain
|
|
||||||
warn!("Agent chat error chain: {:#?}", error_chain);
|
|
||||||
warn!("Root error: {:?}", err);
|
|
||||||
|
|
||||||
// Create structured error response as JSON
|
|
||||||
let error_json = serde_json::json!({
|
|
||||||
"error": {
|
|
||||||
"type": "AgentFilterChainError",
|
|
||||||
"message": err.to_string(),
|
|
||||||
"error_chain": error_chain,
|
|
||||||
"debug_info": format!("{:?}", err)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Log the error for debugging
|
|
||||||
info!("Structured error info: {}", error_json);
|
|
||||||
|
|
||||||
// Return JSON error response
|
|
||||||
Ok(ResponseHandler::create_json_error_response(&error_json))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(
|
|
||||||
name = "agent_chat_handler",
|
|
||||||
skip(request, orchestrator_service, agents_list, listeners),
|
|
||||||
level = "info",
|
|
||||||
fields(
|
|
||||||
http.method = %request.method(),
|
http.method = %request.method(),
|
||||||
http.path = %request.uri().path()
|
http.path = %request.uri().path()
|
||||||
)
|
);
|
||||||
)]
|
|
||||||
async fn handle_agent_chat(
|
// Execute the handler inside the span
|
||||||
|
async {
|
||||||
|
match handle_agent_chat_inner(
|
||||||
|
request,
|
||||||
|
orchestrator_service,
|
||||||
|
agents_list,
|
||||||
|
listeners,
|
||||||
|
request_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(response) => Ok(response),
|
||||||
|
Err(err) => {
|
||||||
|
// Check if this is a client error from the pipeline that should be cascaded
|
||||||
|
if let AgentFilterChainError::Pipeline(PipelineError::ClientError {
|
||||||
|
agent,
|
||||||
|
status,
|
||||||
|
body,
|
||||||
|
}) = &err
|
||||||
|
{
|
||||||
|
warn!(
|
||||||
|
agent = %agent,
|
||||||
|
status = %status,
|
||||||
|
body = %body,
|
||||||
|
"client error from agent"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Create error response with the original status code and body
|
||||||
|
let error_json = serde_json::json!({
|
||||||
|
"error": "ClientError",
|
||||||
|
"agent": agent,
|
||||||
|
"status": status,
|
||||||
|
"agent_response": body
|
||||||
|
});
|
||||||
|
|
||||||
|
let json_string = error_json.to_string();
|
||||||
|
let mut response =
|
||||||
|
Response::new(ResponseHandler::create_full_body(json_string));
|
||||||
|
*response.status_mut() = hyper::StatusCode::from_u16(*status)
|
||||||
|
.unwrap_or(hyper::StatusCode::BAD_REQUEST);
|
||||||
|
response.headers_mut().insert(
|
||||||
|
hyper::header::CONTENT_TYPE,
|
||||||
|
"application/json".parse().unwrap(),
|
||||||
|
);
|
||||||
|
return Ok(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print detailed error information with full error chain for other errors
|
||||||
|
let mut error_chain = Vec::new();
|
||||||
|
let mut current_error: &dyn std::error::Error = &err;
|
||||||
|
|
||||||
|
// Collect the full error chain
|
||||||
|
loop {
|
||||||
|
error_chain.push(current_error.to_string());
|
||||||
|
match current_error.source() {
|
||||||
|
Some(source) => current_error = source,
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log the complete error chain
|
||||||
|
warn!(error_chain = ?error_chain, "agent chat error chain");
|
||||||
|
warn!(root_error = ?err, "root error");
|
||||||
|
|
||||||
|
// Create structured error response as JSON
|
||||||
|
let error_json = serde_json::json!({
|
||||||
|
"error": {
|
||||||
|
"type": "AgentFilterChainError",
|
||||||
|
"message": err.to_string(),
|
||||||
|
"error_chain": error_chain,
|
||||||
|
"debug_info": format!("{:?}", err)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Log the error for debugging
|
||||||
|
info!(error = %error_json, "structured error info");
|
||||||
|
|
||||||
|
// Return JSON error response
|
||||||
|
Ok(ResponseHandler::create_json_error_response(&error_json))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(request_span)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_agent_chat_inner(
|
||||||
request: Request<hyper::body::Incoming>,
|
request: Request<hyper::body::Incoming>,
|
||||||
orchestrator_service: Arc<OrchestratorService>,
|
orchestrator_service: Arc<OrchestratorService>,
|
||||||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||||
|
request_id: String,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||||
// Initialize services
|
// Initialize services
|
||||||
let agent_selector = AgentSelector::new(orchestrator_service);
|
let agent_selector = AgentSelector::new(orchestrator_service);
|
||||||
|
|
@ -143,7 +170,7 @@ async fn handle_agent_chat(
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Handling request for listener: {}", listener.name);
|
info!(listener = %listener.name, "handling request");
|
||||||
|
|
||||||
// Parse request body
|
// Parse request body
|
||||||
let request_path = request
|
let request_path = request
|
||||||
|
|
@ -158,12 +185,8 @@ async fn handle_agent_chat(
|
||||||
let mut headers = request.headers().clone();
|
let mut headers = request.headers().clone();
|
||||||
headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER);
|
headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER);
|
||||||
|
|
||||||
|
// Set the request_id in headers if not already present
|
||||||
if !headers.contains_key(common::consts::REQUEST_ID_HEADER) {
|
if !headers.contains_key(common::consts::REQUEST_ID_HEADER) {
|
||||||
let request_id = uuid::Uuid::new_v4().to_string();
|
|
||||||
info!(
|
|
||||||
"Request id not found in headers, generated new request id: {}",
|
|
||||||
request_id
|
|
||||||
);
|
|
||||||
headers.insert(
|
headers.insert(
|
||||||
common::consts::REQUEST_ID_HEADER,
|
common::consts::REQUEST_ID_HEADER,
|
||||||
hyper::header::HeaderValue::from_str(&request_id).unwrap(),
|
hyper::header::HeaderValue::from_str(&request_id).unwrap(),
|
||||||
|
|
@ -176,8 +199,8 @@ async fn handle_agent_chat(
|
||||||
let chat_request_bytes = request.collect().await?.to_bytes();
|
let chat_request_bytes = request.collect().await?.to_bytes();
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Received request body (raw utf8): {}",
|
body = %String::from_utf8_lossy(&chat_request_bytes),
|
||||||
String::from_utf8_lossy(&chat_request_bytes)
|
"received request body"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Determine the API type from the endpoint
|
// Determine the API type from the endpoint
|
||||||
|
|
@ -191,7 +214,7 @@ async fn handle_agent_chat(
|
||||||
let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) {
|
let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) {
|
||||||
Ok(request) => request,
|
Ok(request) => request,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Failed to parse request as ProviderRequestType: {}", err);
|
warn!("failed to parse request as ProviderRequestType: {}", err);
|
||||||
let err_msg = format!("Failed to parse request: {}", err);
|
let err_msg = format!("Failed to parse request: {}", err);
|
||||||
return Err(AgentFilterChainError::RequestParsing(
|
return Err(AgentFilterChainError::RequestParsing(
|
||||||
serde_json::Error::custom(err_msg),
|
serde_json::Error::custom(err_msg),
|
||||||
|
|
@ -224,7 +247,10 @@ async fn handle_agent_chat(
|
||||||
.select_agents(&message, &listener, traceparent.clone(), request_id.clone())
|
.select_agents(&message, &listener, traceparent.clone(), request_id.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Selected {} agent(s) for execution", selected_agents.len());
|
info!(
|
||||||
|
count = selected_agents.len(),
|
||||||
|
"selected agents for execution"
|
||||||
|
);
|
||||||
|
|
||||||
// Execute agents sequentially, passing output from one to the next
|
// Execute agents sequentially, passing output from one to the next
|
||||||
let mut current_messages = message.clone();
|
let mut current_messages = message.clone();
|
||||||
|
|
@ -234,10 +260,10 @@ async fn handle_agent_chat(
|
||||||
let is_last_agent = agent_index == agent_count - 1;
|
let is_last_agent = agent_index == agent_count - 1;
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Processing agent {}/{}: {}",
|
agent_index = agent_index + 1,
|
||||||
agent_index + 1,
|
total = agent_count,
|
||||||
agent_count,
|
agent = %selected_agent.id,
|
||||||
selected_agent.id
|
"processing agent"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Get agent name
|
// Get agent name
|
||||||
|
|
@ -256,7 +282,7 @@ async fn handle_agent_chat(
|
||||||
// Get agent details and invoke
|
// Get agent details and invoke
|
||||||
let agent = agent_map.get(&agent_name).unwrap();
|
let agent = agent_map.get(&agent_name).unwrap();
|
||||||
|
|
||||||
debug!("Invoking agent: {}", agent_name);
|
debug!(agent = %agent_name, "invoking agent");
|
||||||
|
|
||||||
let llm_response = pipeline_processor
|
let llm_response = pipeline_processor
|
||||||
.invoke_agent(
|
.invoke_agent(
|
||||||
|
|
@ -270,8 +296,8 @@ async fn handle_agent_chat(
|
||||||
// If this is the last agent, return the streaming response
|
// If this is the last agent, return the streaming response
|
||||||
if is_last_agent {
|
if is_last_agent {
|
||||||
info!(
|
info!(
|
||||||
"Completed agent chain, returning response from last agent: {}",
|
agent = %agent_name,
|
||||||
agent_name
|
"completed agent chain, returning response"
|
||||||
);
|
);
|
||||||
return response_handler
|
return response_handler
|
||||||
.create_streaming_response(llm_response)
|
.create_streaming_response(llm_response)
|
||||||
|
|
@ -280,16 +306,13 @@ async fn handle_agent_chat(
|
||||||
}
|
}
|
||||||
|
|
||||||
// For intermediate agents, collect the full response and pass to next agent
|
// For intermediate agents, collect the full response and pass to next agent
|
||||||
debug!(
|
debug!(agent = %agent_name, "collecting response from intermediate agent");
|
||||||
"Collecting response from intermediate agent: {}",
|
|
||||||
agent_name
|
|
||||||
);
|
|
||||||
let response_text = response_handler.collect_full_response(llm_response).await?;
|
let response_text = response_handler.collect_full_response(llm_response).await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Agent {} completed, passing {} character response to next agent",
|
agent = %agent_name,
|
||||||
agent_name,
|
response_len = response_text.len(),
|
||||||
response_text.len()
|
"agent completed, passing response to next agent"
|
||||||
);
|
);
|
||||||
|
|
||||||
// remove last message and add new one at the end
|
// remove last message and add new one at the end
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ impl AgentSelector {
|
||||||
.cloned()
|
.cloned()
|
||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
warn!(
|
warn!(
|
||||||
"No default agent found, routing request to first agent: {}",
|
"no default agent found, routing request to first agent: {}",
|
||||||
agents[0].id
|
agents[0].id
|
||||||
);
|
);
|
||||||
Some(agents[0].clone())
|
Some(agents[0].clone())
|
||||||
|
|
@ -118,7 +118,7 @@ impl AgentSelector {
|
||||||
|
|
||||||
// If only one agent, skip orchestration
|
// If only one agent, skip orchestration
|
||||||
if agents.len() == 1 {
|
if agents.len() == 1 {
|
||||||
debug!("Only one agent available, skipping orchestration");
|
debug!("only one agent available, skipping orchestration");
|
||||||
return Ok(vec![agents[0].clone()]);
|
return Ok(vec![agents[0].clone()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -136,11 +136,11 @@ impl AgentSelector {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(routes)) => {
|
Ok(Some(routes)) => {
|
||||||
debug!("Determined {} agent(s) via orchestration", routes.len());
|
debug!(count = routes.len(), "determined agents via orchestration");
|
||||||
let mut selected_agents = Vec::new();
|
let mut selected_agents = Vec::new();
|
||||||
|
|
||||||
for (route_name, agent_name) in routes {
|
for (route_name, agent_name) in routes {
|
||||||
debug!("Processing route: {}, agent: {}", route_name, agent_name);
|
debug!(route = %route_name, agent = %agent_name, "processing route");
|
||||||
let selected_agent = agents
|
let selected_agent = agents
|
||||||
.iter()
|
.iter()
|
||||||
.find(|a| a.id == agent_name)
|
.find(|a| a.id == agent_name)
|
||||||
|
|
@ -155,14 +155,14 @@ impl AgentSelector {
|
||||||
}
|
}
|
||||||
|
|
||||||
if selected_agents.is_empty() {
|
if selected_agents.is_empty() {
|
||||||
debug!("No agents determined using orchestration, using default agent");
|
debug!("no agents determined via orchestration, using default");
|
||||||
Ok(vec![self.get_default_agent(agents, &listener.name)?])
|
Ok(vec![self.get_default_agent(agents, &listener.name)?])
|
||||||
} else {
|
} else {
|
||||||
Ok(selected_agents)
|
Ok(selected_agents)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!("No agents determined using orchestration, using default agent");
|
debug!("no agents determined using orchestration, using default agent");
|
||||||
Ok(vec![self.get_default_agent(agents, &listener.name)?])
|
Ok(vec![self.get_default_agent(agents, &listener.name)?])
|
||||||
}
|
}
|
||||||
Err(err) => Err(AgentSelectionError::OrchestrationError(err.to_string())),
|
Err(err) => Err(AgentSelectionError::OrchestrationError(err.to_string())),
|
||||||
|
|
|
||||||
|
|
@ -944,7 +944,7 @@ impl ArchFunctionHandler {
|
||||||
) -> Result<ChatCompletionsResponse> {
|
) -> Result<ChatCompletionsResponse> {
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
info!("[Arch-Function] - ChatCompletion");
|
info!("processing chat completion request");
|
||||||
|
|
||||||
let messages = self.process_messages(
|
let messages = self.process_messages(
|
||||||
&request.messages,
|
&request.messages,
|
||||||
|
|
@ -955,9 +955,9 @@ impl ArchFunctionHandler {
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"[request to arch-fc]: model: {}, messages count: {}",
|
model = %self.model_name,
|
||||||
self.model_name,
|
message_count = messages.len(),
|
||||||
messages.len()
|
"sending request to arch-fc"
|
||||||
);
|
);
|
||||||
|
|
||||||
let use_agent_orchestrator = request
|
let use_agent_orchestrator = request
|
||||||
|
|
@ -991,7 +991,7 @@ impl ArchFunctionHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("[Agent Orchestrator]: response received");
|
info!("agent orchestrator response received");
|
||||||
} else if let Some(tools) = request.tools.as_ref() {
|
} else if let Some(tools) = request.tools.as_ref() {
|
||||||
let mut hallucination_state = HallucinationState::new(tools);
|
let mut hallucination_state = HallucinationState::new(tools);
|
||||||
let mut has_tool_calls = None;
|
let mut has_tool_calls = None;
|
||||||
|
|
@ -1040,7 +1040,10 @@ impl ArchFunctionHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
if has_tool_calls == Some(true) && has_hallucination {
|
if has_tool_calls == Some(true) && has_hallucination {
|
||||||
info!("[Hallucination]: {}", hallucination_state.error_message);
|
info!(
|
||||||
|
"detected hallucination: {}",
|
||||||
|
hallucination_state.error_message
|
||||||
|
);
|
||||||
|
|
||||||
let clarify_messages = self.prefill_message(messages.clone(), &self.clarify_prefix);
|
let clarify_messages = self.prefill_message(messages.clone(), &self.clarify_prefix);
|
||||||
let clarify_request = self.create_request_with_extra_body(clarify_messages, false);
|
let clarify_request = self.create_request_with_extra_body(clarify_messages, false);
|
||||||
|
|
@ -1075,8 +1078,8 @@ impl ArchFunctionHandler {
|
||||||
let response_dict = self.parse_model_response(&model_response);
|
let response_dict = self.parse_model_response(&model_response);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"[arch-fc]: raw model response: {}",
|
raw_response = %response_dict.raw_response,
|
||||||
response_dict.raw_response
|
"arch-fc model response"
|
||||||
);
|
);
|
||||||
|
|
||||||
// General model response (no intent matched - should route to default target)
|
// General model response (no intent matched - should route to default target)
|
||||||
|
|
@ -1126,7 +1129,7 @@ impl ArchFunctionHandler {
|
||||||
|
|
||||||
if verification.is_valid {
|
if verification.is_valid {
|
||||||
info!(
|
info!(
|
||||||
"[Tool calls]: {:?}",
|
"tool calls extracted: {:?}",
|
||||||
response_dict
|
response_dict
|
||||||
.tool_calls
|
.tool_calls
|
||||||
.iter()
|
.iter()
|
||||||
|
|
@ -1143,7 +1146,7 @@ impl ArchFunctionHandler {
|
||||||
tool_calls: Some(response_dict.tool_calls.clone()),
|
tool_calls: Some(response_dict.tool_calls.clone()),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("Invalid tool call - {}", verification.error_message);
|
error!(error = %verification.error_message, "invalid tool call");
|
||||||
ResponseMessage {
|
ResponseMessage {
|
||||||
role: Role::Assistant,
|
role: Role::Assistant,
|
||||||
content: Some(String::new()),
|
content: Some(String::new()),
|
||||||
|
|
@ -1155,7 +1158,7 @@ impl ArchFunctionHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("Tool calls present but no tools provided in request");
|
error!("tool calls present but no tools provided in request");
|
||||||
ResponseMessage {
|
ResponseMessage {
|
||||||
role: Role::Assistant,
|
role: Role::Assistant,
|
||||||
content: Some(String::new()),
|
content: Some(String::new()),
|
||||||
|
|
@ -1168,7 +1171,7 @@ impl ArchFunctionHandler {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"[Tool calls]: {:?}",
|
"tool calls extracted: {:?}",
|
||||||
response_dict
|
response_dict
|
||||||
.tool_calls
|
.tool_calls
|
||||||
.iter()
|
.iter()
|
||||||
|
|
@ -1187,8 +1190,8 @@ impl ArchFunctionHandler {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!(
|
error!(
|
||||||
"Invalid tool calls in response: {}",
|
error = %response_dict.error_message,
|
||||||
response_dict.error_message
|
"invalid tool calls in response"
|
||||||
);
|
);
|
||||||
ResponseMessage {
|
ResponseMessage {
|
||||||
role: Role::Assistant,
|
role: Role::Assistant,
|
||||||
|
|
@ -1201,7 +1204,7 @@ impl ArchFunctionHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("Invalid model response - {}", model_response);
|
error!(response = %model_response, "invalid model response");
|
||||||
ResponseMessage {
|
ResponseMessage {
|
||||||
role: Role::Assistant,
|
role: Role::Assistant,
|
||||||
content: Some(String::new()),
|
content: Some(String::new()),
|
||||||
|
|
@ -1244,7 +1247,7 @@ impl ArchFunctionHandler {
|
||||||
metadata: Some(metadata),
|
metadata: Some(metadata),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("[response arch-fc]: {:?}", chat_completion_response);
|
info!(response = ?chat_completion_response, "arch-fc response");
|
||||||
|
|
||||||
Ok(chat_completion_response)
|
Ok(chat_completion_response)
|
||||||
}
|
}
|
||||||
|
|
@ -1331,7 +1334,7 @@ pub async fn function_calling_chat_handler(
|
||||||
let mut body_json: Value = match serde_json::from_slice(&whole_body) {
|
let mut body_json: Value = match serde_json::from_slice(&whole_body) {
|
||||||
Ok(json) => json,
|
Ok(json) => json,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to parse request body as JSON: {}", e);
|
error!(error = %e, "failed to parse request body as json");
|
||||||
let mut response = Response::new(full(
|
let mut response = Response::new(full(
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"error": format!("Invalid request body: {}", e)
|
"error": format!("Invalid request body: {}", e)
|
||||||
|
|
@ -1355,13 +1358,13 @@ pub async fn function_calling_chat_handler(
|
||||||
let chat_request: ChatCompletionsRequest = match serde_json::from_value(body_json) {
|
let chat_request: ChatCompletionsRequest = match serde_json::from_value(body_json) {
|
||||||
Ok(req) => {
|
Ok(req) => {
|
||||||
info!(
|
info!(
|
||||||
"[request body]: {}",
|
request_body = %serde_json::to_string(&req).unwrap_or_default(),
|
||||||
serde_json::to_string(&req).unwrap_or_default()
|
"received request"
|
||||||
);
|
);
|
||||||
req
|
req
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to parse request body: {}", e);
|
error!(error = %e, "failed to parse request body");
|
||||||
let mut response = Response::new(full(
|
let mut response = Response::new(full(
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"error": format!("Invalid request body: {}", e)
|
"error": format!("Invalid request body: {}", e)
|
||||||
|
|
@ -1384,7 +1387,10 @@ pub async fn function_calling_chat_handler(
|
||||||
.and_then(|v| v.as_bool())
|
.and_then(|v| v.as_bool())
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
info!("Use agent orchestrator: {}", use_agent_orchestrator);
|
info!(
|
||||||
|
use_agent_orchestrator = use_agent_orchestrator,
|
||||||
|
"handler mode"
|
||||||
|
);
|
||||||
|
|
||||||
// Create the appropriate handler
|
// Create the appropriate handler
|
||||||
let handler_name = if use_agent_orchestrator {
|
let handler_name = if use_agent_orchestrator {
|
||||||
|
|
@ -1415,7 +1421,7 @@ pub async fn function_calling_chat_handler(
|
||||||
match final_response {
|
match final_response {
|
||||||
Ok(response_data) => {
|
Ok(response_data) => {
|
||||||
let response_json = serde_json::to_string(&response_data).unwrap_or_else(|e| {
|
let response_json = serde_json::to_string(&response_data).unwrap_or_else(|e| {
|
||||||
error!("Failed to serialize response: {}", e);
|
error!(error = %e, "failed to serialize response");
|
||||||
serde_json::json!({"error": "Failed to serialize response"}).to_string()
|
serde_json::json!({"error": "Failed to serialize response"}).to_string()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -1428,7 +1434,7 @@ pub async fn function_calling_chat_handler(
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("[{}] - Error in function calling: {}", handler_name, e);
|
error!(handler = handler_name, error = %e, "error in function calling");
|
||||||
|
|
||||||
let error_response = serde_json::json!({
|
let error_response = serde_json::json!({
|
||||||
"error": format!("[{}] - Error in function calling: {}", handler_name, e)
|
"error": format!("[{}] - Error in function calling: {}", handler_name, e)
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ use opentelemetry::trace::get_active_span;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tracing::{debug, info, instrument, warn};
|
use tracing::{debug, info, info_span, warn, Instrument};
|
||||||
|
|
||||||
use crate::handlers::router_chat::router_chat_get_upstream_model;
|
use crate::handlers::router_chat::router_chat_get_upstream_model;
|
||||||
use crate::handlers::utils::{
|
use crate::handlers::utils::{
|
||||||
|
|
@ -34,17 +34,6 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(
|
|
||||||
name = "llm_chat_handler",
|
|
||||||
skip_all,
|
|
||||||
fields(
|
|
||||||
http.method = %request.method(),
|
|
||||||
http.path = %request.uri().path(),
|
|
||||||
model.requested = tracing::field::Empty,
|
|
||||||
model.alias_resolved = tracing::field::Empty,
|
|
||||||
model.routing_resolved = tracing::field::Empty
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
pub async fn llm_chat(
|
pub async fn llm_chat(
|
||||||
request: Request<hyper::body::Incoming>,
|
request: Request<hyper::body::Incoming>,
|
||||||
router_service: Arc<RouterService>,
|
router_service: Arc<RouterService>,
|
||||||
|
|
@ -61,16 +50,48 @@ pub async fn llm_chat(
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
{
|
{
|
||||||
Some(id) => id,
|
Some(id) => id,
|
||||||
None => {
|
None => uuid::Uuid::new_v4().to_string(),
|
||||||
let generated_id = uuid::Uuid::new_v4().to_string();
|
|
||||||
warn!(
|
|
||||||
"[PLANO_REQ_ID:{}] | REQUEST_ID header missing, generated new ID",
|
|
||||||
generated_id
|
|
||||||
);
|
|
||||||
generated_id
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Create a span with request_id that will be included in all log lines
|
||||||
|
let request_span = info_span!(
|
||||||
|
"llm_chat_handler",
|
||||||
|
request_id = %request_id,
|
||||||
|
http.method = %request.method(),
|
||||||
|
http.path = %request_path,
|
||||||
|
model.requested = tracing::field::Empty,
|
||||||
|
model.alias_resolved = tracing::field::Empty,
|
||||||
|
model.routing_resolved = tracing::field::Empty
|
||||||
|
);
|
||||||
|
|
||||||
|
// Execute the rest of the handler inside the span
|
||||||
|
llm_chat_inner(
|
||||||
|
request,
|
||||||
|
router_service,
|
||||||
|
full_qualified_llm_provider_url,
|
||||||
|
model_aliases,
|
||||||
|
llm_providers,
|
||||||
|
state_storage,
|
||||||
|
request_id,
|
||||||
|
request_path,
|
||||||
|
request_headers,
|
||||||
|
)
|
||||||
|
.instrument(request_span)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn llm_chat_inner(
|
||||||
|
request: Request<hyper::body::Incoming>,
|
||||||
|
router_service: Arc<RouterService>,
|
||||||
|
full_qualified_llm_provider_url: String,
|
||||||
|
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||||
|
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||||
|
state_storage: Option<Arc<dyn StateStorage>>,
|
||||||
|
request_id: String,
|
||||||
|
request_path: String,
|
||||||
|
mut request_headers: hyper::HeaderMap,
|
||||||
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
// Extract or generate traceparent - this establishes the trace context for all spans
|
// Extract or generate traceparent - this establishes the trace context for all spans
|
||||||
let traceparent: String = match request_headers
|
let traceparent: String = match request_headers
|
||||||
.get(TRACE_PARENT_HEADER)
|
.get(TRACE_PARENT_HEADER)
|
||||||
|
|
@ -83,20 +104,18 @@ pub async fn llm_chat(
|
||||||
let trace_id = Uuid::new_v4().to_string().replace("-", "");
|
let trace_id = Uuid::new_v4().to_string().replace("-", "");
|
||||||
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | TRACE_PARENT header missing, generated new traceparent: {}",
|
generated_traceparent = %generated_tp,
|
||||||
request_id, generated_tp
|
"TRACE_PARENT header missing, generated new traceparent"
|
||||||
);
|
);
|
||||||
generated_tp
|
generated_tp
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut request_headers = request_headers;
|
|
||||||
let chat_request_bytes = request.collect().await?.to_bytes();
|
let chat_request_bytes = request.collect().await?.to_bytes();
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"[PLANO_REQ_ID:{}] | REQUEST_BODY (UTF8): {}",
|
body = %String::from_utf8_lossy(&chat_request_bytes),
|
||||||
request_id,
|
"request body received"
|
||||||
String::from_utf8_lossy(&chat_request_bytes)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut client_request = match ProviderRequestType::try_from((
|
let mut client_request = match ProviderRequestType::try_from((
|
||||||
|
|
@ -106,13 +125,10 @@ pub async fn llm_chat(
|
||||||
Ok(request) => request,
|
Ok(request) => request,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request as ProviderRequestType: {}",
|
error = %err,
|
||||||
request_id, err
|
"failed to parse request as ProviderRequestType"
|
||||||
);
|
|
||||||
let err_msg = format!(
|
|
||||||
"[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request: {}",
|
|
||||||
request_id, err
|
|
||||||
);
|
);
|
||||||
|
let err_msg = format!("Failed to parse request: {}", err);
|
||||||
let mut bad_request = Response::new(full(err_msg));
|
let mut bad_request = Response::new(full(err_msg));
|
||||||
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
return Ok(bad_request);
|
return Ok(bad_request);
|
||||||
|
|
@ -145,7 +161,7 @@ pub async fn llm_chat(
|
||||||
"Model '{}' not found in configured providers",
|
"Model '{}' not found in configured providers",
|
||||||
resolved_model
|
resolved_model
|
||||||
);
|
);
|
||||||
warn!("[PLANO_REQ_ID:{}] | FAILURE | {}", request_id, err_msg);
|
warn!(model = %resolved_model, "model not found in configured providers");
|
||||||
let mut bad_request = Response::new(full(err_msg));
|
let mut bad_request = Response::new(full(err_msg));
|
||||||
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
return Ok(bad_request);
|
return Ok(bad_request);
|
||||||
|
|
@ -172,10 +188,7 @@ pub async fn llm_chat(
|
||||||
// This ensures upstream receives "gpt-4" not "openai/gpt-4"
|
// This ensures upstream receives "gpt-4" not "openai/gpt-4"
|
||||||
client_request.set_model(model_name_only.clone());
|
client_request.set_model(model_name_only.clone());
|
||||||
if client_request.remove_metadata_key("archgw_preference_config") {
|
if client_request.remove_metadata_key("archgw_preference_config") {
|
||||||
debug!(
|
debug!("removed archgw_preference_config from metadata");
|
||||||
"[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata",
|
|
||||||
request_id
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// === v1/responses state management: Determine upstream API and combine input if needed ===
|
// === v1/responses state management: Determine upstream API and combine input if needed ===
|
||||||
|
|
@ -223,14 +236,17 @@ pub async fn llm_chat(
|
||||||
// Update both the request and original_input_items
|
// Update both the request and original_input_items
|
||||||
responses_req.input = InputParam::Items(combined_input.clone());
|
responses_req.input = InputParam::Items(combined_input.clone());
|
||||||
original_input_items = combined_input;
|
original_input_items = combined_input;
|
||||||
info!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Updated request with conversation history ({} items)", request_id, original_input_items.len());
|
info!(
|
||||||
|
items = original_input_items.len(),
|
||||||
|
"updated request with conversation history"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Err(StateStorageError::NotFound(_)) => {
|
Err(StateStorageError::NotFound(_)) => {
|
||||||
// Return 409 Conflict when previous_response_id not found
|
// Return 409 Conflict when previous_response_id not found
|
||||||
warn!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Previous response_id not found: {}", request_id, prev_resp_id);
|
warn!(previous_response_id = %prev_resp_id, "previous response_id not found");
|
||||||
let err_msg = format!(
|
let err_msg = format!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Conversation state not found for previous_response_id: {}",
|
"Conversation state not found for previous_response_id: {}",
|
||||||
request_id, prev_resp_id
|
prev_resp_id
|
||||||
);
|
);
|
||||||
let mut conflict_response = Response::new(full(err_msg));
|
let mut conflict_response = Response::new(full(err_msg));
|
||||||
*conflict_response.status_mut() = StatusCode::CONFLICT;
|
*conflict_response.status_mut() = StatusCode::CONFLICT;
|
||||||
|
|
@ -239,8 +255,9 @@ pub async fn llm_chat(
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Log warning but continue on other storage errors
|
// Log warning but continue on other storage errors
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to retrieve conversation state for {}: {}",
|
previous_response_id = %prev_resp_id,
|
||||||
request_id, prev_resp_id, e
|
error = %e,
|
||||||
|
"failed to retrieve conversation state"
|
||||||
);
|
);
|
||||||
// Restore original_input_items since we passed ownership
|
// Restore original_input_items since we passed ownership
|
||||||
original_input_items = extract_input_items(&responses_req.input);
|
original_input_items = extract_input_items(&responses_req.input);
|
||||||
|
|
@ -248,10 +265,7 @@ pub async fn llm_chat(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!("upstream supports ResponsesAPI natively");
|
||||||
"[PLANO_REQ_ID:{}] | BRIGHT_STAFF | Upstream supports ResponsesAPI natively.",
|
|
||||||
request_id
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -296,8 +310,10 @@ pub async fn llm_chat(
|
||||||
});
|
});
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Provider Hint: {}, Model for upstream: {}",
|
url = %full_qualified_llm_provider_url,
|
||||||
request_id, full_qualified_llm_provider_url, model_name, model_name_only
|
provider_hint = %model_name,
|
||||||
|
upstream_model = %model_name_only,
|
||||||
|
"Routing to upstream"
|
||||||
);
|
);
|
||||||
|
|
||||||
request_headers.insert(
|
request_headers.insert(
|
||||||
|
|
|
||||||
|
|
@ -116,7 +116,7 @@ impl PipelineProcessor {
|
||||||
};
|
};
|
||||||
|
|
||||||
for agent_name in filter_chain {
|
for agent_name in filter_chain {
|
||||||
debug!("Processing filter agent: {}", agent_name);
|
debug!(agent = %agent_name, "processing filter agent");
|
||||||
|
|
||||||
let agent = agent_map
|
let agent = agent_map
|
||||||
.get(agent_name)
|
.get(agent_name)
|
||||||
|
|
@ -125,12 +125,12 @@ impl PipelineProcessor {
|
||||||
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
|
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"executing filter: {}/{}, url: {}, type: {}, conversation length: {}",
|
agent = %agent_name,
|
||||||
agent_name,
|
tool = %tool_name,
|
||||||
tool_name,
|
url = %agent.url,
|
||||||
agent.url,
|
agent_type = %agent.agent_type.as_deref().unwrap_or("mcp"),
|
||||||
agent.agent_type.as_deref().unwrap_or("mcp"),
|
conversation_len = chat_history.len(),
|
||||||
chat_history.len()
|
"executing filter"
|
||||||
);
|
);
|
||||||
|
|
||||||
if agent.agent_type.as_deref().unwrap_or("mcp") == "mcp" {
|
if agent.agent_type.as_deref().unwrap_or("mcp") == "mcp" {
|
||||||
|
|
@ -144,9 +144,9 @@ impl PipelineProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Filter '{}' completed, updated conversation length: {}",
|
agent = %agent_name,
|
||||||
agent_name,
|
updated_len = chat_history_updated.len(),
|
||||||
chat_history_updated.len()
|
"filter completed"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -214,9 +214,9 @@ impl PipelineProcessor {
|
||||||
// Validate SSE format: first line should be "event: message"
|
// Validate SSE format: first line should be "event: message"
|
||||||
if lines.is_empty() || lines[0] != "event: message" {
|
if lines.is_empty() || lines[0] != "event: message" {
|
||||||
warn!(
|
warn!(
|
||||||
"Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}",
|
agent = %agent_id,
|
||||||
agent_id,
|
first_line = ?lines.first(),
|
||||||
lines.first()
|
"invalid SSE response format"
|
||||||
);
|
);
|
||||||
return Err(PipelineError::NoContentInResponse(format!(
|
return Err(PipelineError::NoContentInResponse(format!(
|
||||||
"Invalid SSE response format from agent {}: expected 'event: message' as first line",
|
"Invalid SSE response format from agent {}: expected 'event: message' as first line",
|
||||||
|
|
@ -233,9 +233,9 @@ impl PipelineProcessor {
|
||||||
|
|
||||||
if data_lines.len() != 1 {
|
if data_lines.len() != 1 {
|
||||||
warn!(
|
warn!(
|
||||||
"Expected exactly one 'data:' line from agent {}, found {}",
|
agent = %agent_id,
|
||||||
agent_id,
|
found = data_lines.len(),
|
||||||
data_lines.len()
|
"expected exactly one 'data:' line"
|
||||||
);
|
);
|
||||||
return Err(PipelineError::NoContentInResponse(format!(
|
return Err(PipelineError::NoContentInResponse(format!(
|
||||||
"Expected exactly one 'data:' line from agent {}, found {}",
|
"Expected exactly one 'data:' line from agent {}, found {}",
|
||||||
|
|
@ -453,7 +453,7 @@ impl PipelineProcessor {
|
||||||
};
|
};
|
||||||
|
|
||||||
let notification_body = serde_json::to_string(&initialized_notification)?;
|
let notification_body = serde_json::to_string(&initialized_notification)?;
|
||||||
debug!("Sending initialized notification for agent {}", agent_id);
|
debug!("sending initialized notification for agent {}", agent_id);
|
||||||
|
|
||||||
let headers = self.build_mcp_headers(request_headers, agent_id, Some(session_id))?;
|
let headers = self.build_mcp_headers(request_headers, agent_id, Some(session_id))?;
|
||||||
|
|
||||||
|
|
@ -466,7 +466,7 @@ impl PipelineProcessor {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Initialized notification response status: {}",
|
"initialized notification response status: {}",
|
||||||
response.status()
|
response.status()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -474,7 +474,7 @@ impl PipelineProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String {
|
async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String {
|
||||||
info!("Initializing MCP session for agent {}", agent_id);
|
info!("initializing MCP session for agent {}", agent_id);
|
||||||
|
|
||||||
let initialize_request = self.build_initialize_request();
|
let initialize_request = self.build_initialize_request();
|
||||||
let headers = self
|
let headers = self
|
||||||
|
|
@ -486,7 +486,7 @@ impl PipelineProcessor {
|
||||||
.await
|
.await
|
||||||
.expect("Failed to initialize MCP session");
|
.expect("Failed to initialize MCP session");
|
||||||
|
|
||||||
info!("Initialize response status: {}", response.status());
|
info!("initialize response status: {}", response.status());
|
||||||
|
|
||||||
let session_id = response
|
let session_id = response
|
||||||
.headers()
|
.headers()
|
||||||
|
|
@ -496,7 +496,7 @@ impl PipelineProcessor {
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Created new MCP session for agent {}: {}",
|
"created new MCP session for agent {}: {}",
|
||||||
agent_id, session_id
|
agent_id, session_id
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -631,7 +631,7 @@ impl PipelineProcessor {
|
||||||
|
|
||||||
let request_body = ProviderRequestType::to_bytes(&original_request).unwrap();
|
let request_body = ProviderRequestType::to_bytes(&original_request).unwrap();
|
||||||
// let request_body = serde_json::to_string(&request)?;
|
// let request_body = serde_json::to_string(&request)?;
|
||||||
debug!("Sending request to terminal agent {}", terminal_agent.id);
|
debug!("sending request to terminal agent {}", terminal_agent.id);
|
||||||
|
|
||||||
let mut agent_headers = request_headers.clone();
|
let mut agent_headers = request_headers.clone();
|
||||||
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
||||||
|
|
|
||||||
|
|
@ -97,13 +97,13 @@ impl ResponseHandler {
|
||||||
let chunk = match item {
|
let chunk = match item {
|
||||||
Ok(chunk) => chunk,
|
Ok(chunk) => chunk,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Error receiving chunk: {:?}", err);
|
warn!(error = ?err, "error receiving chunk");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if tx.send(chunk).await.is_err() {
|
if tx.send(chunk).await.is_err() {
|
||||||
warn!("Receiver dropped");
|
warn!("receiver dropped");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -164,11 +164,11 @@ impl ResponseHandler {
|
||||||
if let Some(content) = provider_response.content_delta() {
|
if let Some(content) = provider_response.content_delta() {
|
||||||
accumulated_text.push_str(content);
|
accumulated_text.push_str(content);
|
||||||
} else {
|
} else {
|
||||||
info!("No content delta in provider response");
|
info!("no content delta in provider response");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to parse provider response: {:?}", e);
|
warn!(error = ?e, "failed to parse provider response");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,14 +52,14 @@ pub async fn router_chat_get_upstream_model(
|
||||||
| ProviderRequestType::BedrockConverseStream(_)
|
| ProviderRequestType::BedrockConverseStream(_)
|
||||||
| ProviderRequestType::ResponsesAPIRequest(_),
|
| ProviderRequestType::ResponsesAPIRequest(_),
|
||||||
) => {
|
) => {
|
||||||
warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format");
|
warn!("unexpected: got non-ChatCompletions request after converting to OpenAI format");
|
||||||
return Err(RoutingError::internal_error(
|
return Err(RoutingError::internal_error(
|
||||||
"Request conversion failed".to_string(),
|
"Request conversion failed".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to convert request to ChatCompletionsRequest: {}",
|
"failed to convert request to ChatCompletionsRequest: {}",
|
||||||
err
|
err
|
||||||
);
|
);
|
||||||
return Err(RoutingError::internal_error(format!(
|
return Err(RoutingError::internal_error(format!(
|
||||||
|
|
@ -70,9 +70,8 @@ pub async fn router_chat_get_upstream_model(
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}",
|
request = %serde_json::to_string(&chat_request).unwrap(),
|
||||||
request_id,
|
"router request"
|
||||||
&serde_json::to_string(&chat_request).unwrap()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Extract usage preferences from metadata
|
// Extract usage preferences from metadata
|
||||||
|
|
@ -108,11 +107,10 @@ pub async fn router_chat_get_upstream_model(
|
||||||
};
|
};
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
|
has_usage_preferences = usage_preferences.is_some(),
|
||||||
request_id,
|
path = %request_path,
|
||||||
usage_preferences.is_some(),
|
latest_message = %latest_message_for_log,
|
||||||
request_path,
|
"processing router request"
|
||||||
latest_message_for_log
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Capture start time for routing span
|
// Capture start time for routing span
|
||||||
|
|
@ -135,10 +133,7 @@ pub async fn router_chat_get_upstream_model(
|
||||||
None => {
|
None => {
|
||||||
// No route determined, return sentinel value "none"
|
// No route determined, return sentinel value "none"
|
||||||
// This signals to llm.rs to use the original validated request model
|
// This signals to llm.rs to use the original validated request model
|
||||||
info!(
|
info!("no route determined, using default model");
|
||||||
"[PLANO_REQ_ID: {}] | ROUTER_REQ | No route determined, returning sentinel 'none'",
|
|
||||||
request_id
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(RoutingResult {
|
Ok(RoutingResult {
|
||||||
model_name: "none".to_string(),
|
model_name: "none".to_string(),
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ use std::time::Instant;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn, Instrument};
|
||||||
|
|
||||||
use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer};
|
use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer};
|
||||||
use hermesllm::apis::openai::Message;
|
use hermesllm::apis::openai::Message;
|
||||||
|
|
@ -88,7 +88,7 @@ impl StreamProcessor for ObservableStreamProcessor {
|
||||||
chunk_count = self.chunk_count,
|
chunk_count = self.chunk_count,
|
||||||
duration_ms = self.start_time.elapsed().as_millis(),
|
duration_ms = self.start_time.elapsed().as_millis(),
|
||||||
time_to_first_token_ms = ?self.time_to_first_token,
|
time_to_first_token_ms = ?self.time_to_first_token,
|
||||||
"Streaming completed"
|
"streaming completed"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,7 +97,7 @@ impl StreamProcessor for ObservableStreamProcessor {
|
||||||
service = %self.service_name,
|
service = %self.service_name,
|
||||||
error = error_msg,
|
error = error_msg,
|
||||||
duration_ms = self.start_time.elapsed().as_millis(),
|
duration_ms = self.start_time.elapsed().as_millis(),
|
||||||
"Stream error"
|
"stream error"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -119,49 +119,55 @@ where
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::channel::<Bytes>(buffer_size);
|
let (tx, rx) = mpsc::channel::<Bytes>(buffer_size);
|
||||||
|
|
||||||
|
// Capture the current span so the spawned task inherits the request context
|
||||||
|
let current_span = tracing::Span::current();
|
||||||
|
|
||||||
// Spawn a task to process and forward chunks
|
// Spawn a task to process and forward chunks
|
||||||
let processor_handle = tokio::spawn(async move {
|
let processor_handle = tokio::spawn(
|
||||||
let mut is_first_chunk = true;
|
async move {
|
||||||
|
let mut is_first_chunk = true;
|
||||||
|
|
||||||
while let Some(item) = byte_stream.next().await {
|
while let Some(item) = byte_stream.next().await {
|
||||||
let chunk = match item {
|
let chunk = match item {
|
||||||
Ok(chunk) => chunk,
|
Ok(chunk) => chunk,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let err_msg = format!("Error receiving chunk: {:?}", err);
|
let err_msg = format!("Error receiving chunk: {:?}", err);
|
||||||
warn!("{}", err_msg);
|
warn!(error = %err_msg, "stream error");
|
||||||
processor.on_error(&err_msg);
|
processor.on_error(&err_msg);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Call on_first_bytes for the first chunk
|
||||||
|
if is_first_chunk {
|
||||||
|
processor.on_first_bytes();
|
||||||
|
is_first_chunk = false;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
// Call on_first_bytes for the first chunk
|
// Process the chunk
|
||||||
if is_first_chunk {
|
match processor.process_chunk(chunk) {
|
||||||
processor.on_first_bytes();
|
Ok(Some(processed_chunk)) => {
|
||||||
is_first_chunk = false;
|
if tx.send(processed_chunk).await.is_err() {
|
||||||
}
|
warn!("receiver dropped");
|
||||||
|
break;
|
||||||
// Process the chunk
|
}
|
||||||
match processor.process_chunk(chunk) {
|
}
|
||||||
Ok(Some(processed_chunk)) => {
|
Ok(None) => {
|
||||||
if tx.send(processed_chunk).await.is_err() {
|
// Skip this chunk
|
||||||
warn!("Receiver dropped");
|
continue;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!("processor error: {}", err);
|
||||||
|
processor.on_error(&err);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
|
||||||
// Skip this chunk
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Processor error: {}", err);
|
|
||||||
processor.on_error(&err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
processor.on_complete();
|
processor.on_complete();
|
||||||
});
|
}
|
||||||
|
.instrument(current_span),
|
||||||
|
);
|
||||||
|
|
||||||
// Convert channel receiver to HTTP stream
|
// Convert channel receiver to HTTP stream
|
||||||
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
|
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// loading arch_config.yaml file
|
// loading arch_config.yaml file
|
||||||
let arch_config_path = env::var("ARCH_CONFIG_PATH_RENDERED")
|
let arch_config_path = env::var("ARCH_CONFIG_PATH_RENDERED")
|
||||||
.unwrap_or_else(|_| "./arch_config_rendered.yaml".to_string());
|
.unwrap_or_else(|_| "./arch_config_rendered.yaml".to_string());
|
||||||
info!("Loading arch_config.yaml from {}", arch_config_path);
|
info!(path = %arch_config_path, "loading arch_config.yaml");
|
||||||
|
|
||||||
let config_contents =
|
let config_contents =
|
||||||
fs::read_to_string(&arch_config_path).expect("Failed to read arch_config.yaml");
|
fs::read_to_string(&arch_config_path).expect("Failed to read arch_config.yaml");
|
||||||
|
|
@ -125,7 +125,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
if let Some(storage_config) = &arch_config.state_storage {
|
if let Some(storage_config) = &arch_config.state_storage {
|
||||||
let storage: Arc<dyn StateStorage> = match storage_config.storage_type {
|
let storage: Arc<dyn StateStorage> = match storage_config.storage_type {
|
||||||
common::configuration::StateStorageType::Memory => {
|
common::configuration::StateStorageType::Memory => {
|
||||||
info!("Initialized conversation state storage: Memory");
|
info!(
|
||||||
|
storage_type = "memory",
|
||||||
|
"initialized conversation state storage"
|
||||||
|
);
|
||||||
Arc::new(MemoryConversationalStorage::new())
|
Arc::new(MemoryConversationalStorage::new())
|
||||||
}
|
}
|
||||||
common::configuration::StateStorageType::Postgres => {
|
common::configuration::StateStorageType::Postgres => {
|
||||||
|
|
@ -134,8 +137,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("connection_string is required for postgres state_storage");
|
.expect("connection_string is required for postgres state_storage");
|
||||||
|
|
||||||
debug!("Postgres connection string (full): {}", connection_string);
|
debug!(connection_string = %connection_string, "postgres connection");
|
||||||
info!("Initializing conversation state storage: Postgres");
|
info!(
|
||||||
|
storage_type = "postgres",
|
||||||
|
"initializing conversation state storage"
|
||||||
|
);
|
||||||
Arc::new(
|
Arc::new(
|
||||||
PostgreSQLConversationStorage::new(connection_string.clone())
|
PostgreSQLConversationStorage::new(connection_string.clone())
|
||||||
.await
|
.await
|
||||||
|
|
@ -145,7 +151,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
};
|
};
|
||||||
Some(storage)
|
Some(storage)
|
||||||
} else {
|
} else {
|
||||||
info!("No state_storage configured - conversation state management disabled");
|
info!("no state_storage configured, conversation state management disabled");
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -250,7 +256,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
debug!("No route for {} {}", req.method(), req.uri().path());
|
debug!(method = %req.method(), path = %req.uri().path(), "no route found");
|
||||||
let mut not_found = Response::new(empty());
|
let mut not_found = Response::new(empty());
|
||||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||||
Ok(not_found)
|
Ok(not_found)
|
||||||
|
|
@ -260,13 +266,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
debug!("Accepted connection from {:?}", peer_addr);
|
debug!(peer = ?peer_addr, "accepted connection");
|
||||||
if let Err(err) = http1::Builder::new()
|
if let Err(err) = http1::Builder::new()
|
||||||
// .serve_connection(io, service_fn(chat_completion))
|
// .serve_connection(io, service_fn(chat_completion))
|
||||||
.serve_connection(io, service)
|
.serve_connection(io, service)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
warn!("Error serving connection: {:?}", err);
|
warn!(error = ?err, "error serving connection");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -96,14 +96,14 @@ impl RouterService {
|
||||||
.generate_request(messages, &usage_preferences);
|
.generate_request(messages, &usage_preferences);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"sending request to arch-router model: {}, endpoint: {}",
|
model = %self.router_model.get_model_name(),
|
||||||
self.router_model.get_model_name(),
|
endpoint = %self.router_url,
|
||||||
self.router_url
|
"sending request to arch-router"
|
||||||
);
|
);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"arch request body: {}",
|
body = %serde_json::to_string(&router_request).unwrap(),
|
||||||
&serde_json::to_string(&router_request).unwrap(),
|
"arch router request"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut llm_route_request_headers = header::HeaderMap::new();
|
let mut llm_route_request_headers = header::HeaderMap::new();
|
||||||
|
|
@ -148,9 +148,9 @@ impl RouterService {
|
||||||
Ok(response) => response,
|
Ok(response) => response,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to parse JSON: {}. Body: {}",
|
error = %err,
|
||||||
err,
|
body = %serde_json::to_string(&body).unwrap(),
|
||||||
&serde_json::to_string(&body).unwrap()
|
"failed to parse json response"
|
||||||
);
|
);
|
||||||
return Err(RoutingError::JsonError(
|
return Err(RoutingError::JsonError(
|
||||||
err,
|
err,
|
||||||
|
|
@ -160,7 +160,7 @@ impl RouterService {
|
||||||
};
|
};
|
||||||
|
|
||||||
if chat_completion_response.choices.is_empty() {
|
if chat_completion_response.choices.is_empty() {
|
||||||
warn!("No choices in router response: {}", body);
|
warn!(body = %body, "no choices in router response");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,10 +169,10 @@ impl RouterService {
|
||||||
.router_model
|
.router_model
|
||||||
.parse_response(content, &usage_preferences)?;
|
.parse_response(content, &usage_preferences)?;
|
||||||
info!(
|
info!(
|
||||||
"arch-router determined route: {}, selected_model: {:?}, response time: {}ms",
|
content = %content.replace("\n", "\\n"),
|
||||||
content.replace("\n", "\\n"),
|
selected_model = ?parsed_response,
|
||||||
parsed_response,
|
response_time_ms = router_response_time.as_millis(),
|
||||||
router_response_time.as_millis()
|
"arch-router determined route"
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(ref parsed_response) = parsed_response {
|
if let Some(ref parsed_response) = parsed_response {
|
||||||
|
|
|
||||||
|
|
@ -197,12 +197,12 @@ impl OrchestratorModel for OrchestratorModelV1 {
|
||||||
token_count += message_token_count;
|
token_count += message_token_count;
|
||||||
if token_count > self.max_token_length {
|
if token_count > self.max_token_length {
|
||||||
debug!(
|
debug!(
|
||||||
"OrchestratorModelV1: token count {} exceeds max token length {}, truncating conversation, selected message count {}, total message count: {}",
|
token_count = token_count,
|
||||||
token_count,
|
max_tokens = self.max_token_length,
|
||||||
self.max_token_length
|
selected = selected_messsage_count,
|
||||||
, selected_messsage_count,
|
total = messages_vec.len(),
|
||||||
messages_vec.len()
|
"token count exceeds max, truncating conversation"
|
||||||
);
|
);
|
||||||
if message.role == Role::User {
|
if message.role == Role::User {
|
||||||
// If message that exceeds max token length is from user, we need to keep it
|
// If message that exceeds max token length is from user, we need to keep it
|
||||||
selected_messages_list_reversed.push(message);
|
selected_messages_list_reversed.push(message);
|
||||||
|
|
@ -214,9 +214,7 @@ impl OrchestratorModel for OrchestratorModelV1 {
|
||||||
}
|
}
|
||||||
|
|
||||||
if selected_messages_list_reversed.is_empty() {
|
if selected_messages_list_reversed.is_empty() {
|
||||||
debug!(
|
debug!("no messages selected, using last message");
|
||||||
"OrchestratorModelV1: no messages selected, using the last message in the conversation"
|
|
||||||
);
|
|
||||||
if let Some(last_message) = messages_vec.last() {
|
if let Some(last_message) = messages_vec.last() {
|
||||||
selected_messages_list_reversed.push(last_message);
|
selected_messages_list_reversed.push(last_message);
|
||||||
}
|
}
|
||||||
|
|
@ -228,12 +226,12 @@ impl OrchestratorModel for OrchestratorModelV1 {
|
||||||
// - last() is the first message in the original conversation
|
// - last() is the first message in the original conversation
|
||||||
if let Some(first_message) = selected_messages_list_reversed.first() {
|
if let Some(first_message) = selected_messages_list_reversed.first() {
|
||||||
if first_message.role != Role::User {
|
if first_message.role != Role::User {
|
||||||
warn!("OrchestratorModelV1: last message in the conversation is not from user, this may lead to incorrect orchestration");
|
warn!("last message is not from user, may lead to incorrect orchestration");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(last_message) = selected_messages_list_reversed.last() {
|
if let Some(last_message) = selected_messages_list_reversed.last() {
|
||||||
if last_message.role != Role::User {
|
if last_message.role != Role::User {
|
||||||
warn!("OrchestratorModelV1: first message in the selected conversation is not from user, this may lead to incorrect orchestration");
|
warn!("first message is not from user, may lead to incorrect orchestration");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -323,8 +321,9 @@ impl OrchestratorModel for OrchestratorModelV1 {
|
||||||
result.push((selected_route, model_name));
|
result.push((selected_route, model_name));
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"No matching model found for route: {}, usage preferences: {:?}",
|
route = %selected_route,
|
||||||
selected_route, usage_preferences
|
preferences = ?usage_preferences,
|
||||||
|
"no matching model found for route"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -339,8 +338,9 @@ impl OrchestratorModel for OrchestratorModelV1 {
|
||||||
result.push((selected_route, model));
|
result.push((selected_route, model));
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"No model found for route: {}, orchestrator model preferences: {:?}",
|
route = %selected_route,
|
||||||
selected_route, self.agent_orchestration_to_model_map
|
preferences = ?self.agent_orchestration_to_model_map,
|
||||||
|
"no model found for route"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,14 +75,14 @@ impl OrchestratorService {
|
||||||
.generate_request(messages, &usage_preferences);
|
.generate_request(messages, &usage_preferences);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"sending request to arch-orchestrator model: {}, endpoint: {}",
|
model = %self.orchestrator_model.get_model_name(),
|
||||||
self.orchestrator_model.get_model_name(),
|
endpoint = %self.orchestrator_url,
|
||||||
self.orchestrator_url
|
"sending request to arch-orchestrator"
|
||||||
);
|
);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"arch orchestrator request body: {}",
|
body = %serde_json::to_string(&orchestrator_request).unwrap(),
|
||||||
&serde_json::to_string(&orchestrator_request).unwrap(),
|
"arch orchestrator request"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut orchestration_request_headers = header::HeaderMap::new();
|
let mut orchestration_request_headers = header::HeaderMap::new();
|
||||||
|
|
@ -131,9 +131,9 @@ impl OrchestratorService {
|
||||||
Ok(response) => response,
|
Ok(response) => response,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to parse JSON: {}. Body: {}",
|
error = %err,
|
||||||
err,
|
body = %serde_json::to_string(&body).unwrap(),
|
||||||
&serde_json::to_string(&body).unwrap()
|
"failed to parse json response"
|
||||||
);
|
);
|
||||||
return Err(OrchestrationError::JsonError(
|
return Err(OrchestrationError::JsonError(
|
||||||
err,
|
err,
|
||||||
|
|
@ -143,7 +143,7 @@ impl OrchestratorService {
|
||||||
};
|
};
|
||||||
|
|
||||||
if chat_completion_response.choices.is_empty() {
|
if chat_completion_response.choices.is_empty() {
|
||||||
warn!("No choices in orchestrator response: {}", body);
|
warn!(body = %body, "no choices in orchestrator response");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -152,10 +152,10 @@ impl OrchestratorService {
|
||||||
.orchestrator_model
|
.orchestrator_model
|
||||||
.parse_response(content, &usage_preferences)?;
|
.parse_response(content, &usage_preferences)?;
|
||||||
info!(
|
info!(
|
||||||
"arch-orchestrator determined routes: {}, selected_routes: {:?}, response time: {}ms",
|
content = %content.replace("\n", "\\n"),
|
||||||
content.replace("\n", "\\n"),
|
selected_routes = ?parsed_response,
|
||||||
parsed_response,
|
response_time_ms = orchestrator_response_time.as_millis(),
|
||||||
orchestrator_response_time.as_millis()
|
"arch-orchestrator determined routes"
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(ref parsed_response) = parsed_response {
|
if let Some(ref parsed_response) = parsed_response {
|
||||||
|
|
|
||||||
|
|
@ -94,12 +94,12 @@ impl RouterModel for RouterModelV1 {
|
||||||
token_count += message_token_count;
|
token_count += message_token_count;
|
||||||
if token_count > self.max_token_length {
|
if token_count > self.max_token_length {
|
||||||
debug!(
|
debug!(
|
||||||
"RouterModelV1: token count {} exceeds max token length {}, truncating conversation, selected message count {}, total message count: {}",
|
token_count = token_count,
|
||||||
token_count,
|
max_tokens = self.max_token_length,
|
||||||
self.max_token_length
|
selected = selected_messsage_count,
|
||||||
, selected_messsage_count,
|
total = messages_vec.len(),
|
||||||
messages_vec.len()
|
"token count exceeds max, truncating conversation"
|
||||||
);
|
);
|
||||||
if message.role == Role::User {
|
if message.role == Role::User {
|
||||||
// If message that exceeds max token length is from user, we need to keep it
|
// If message that exceeds max token length is from user, we need to keep it
|
||||||
selected_messages_list_reversed.push(message);
|
selected_messages_list_reversed.push(message);
|
||||||
|
|
@ -111,9 +111,7 @@ impl RouterModel for RouterModelV1 {
|
||||||
}
|
}
|
||||||
|
|
||||||
if selected_messages_list_reversed.is_empty() {
|
if selected_messages_list_reversed.is_empty() {
|
||||||
debug!(
|
debug!("no messages selected, using last message");
|
||||||
"RouterModelV1: no messages selected, using the last message in the conversation"
|
|
||||||
);
|
|
||||||
if let Some(last_message) = messages_vec.last() {
|
if let Some(last_message) = messages_vec.last() {
|
||||||
selected_messages_list_reversed.push(last_message);
|
selected_messages_list_reversed.push(last_message);
|
||||||
}
|
}
|
||||||
|
|
@ -122,12 +120,12 @@ impl RouterModel for RouterModelV1 {
|
||||||
// ensure that first and last selected message is from user
|
// ensure that first and last selected message is from user
|
||||||
if let Some(first_message) = selected_messages_list_reversed.first() {
|
if let Some(first_message) = selected_messages_list_reversed.first() {
|
||||||
if first_message.role != Role::User {
|
if first_message.role != Role::User {
|
||||||
warn!("RouterModelV1: last message in the conversation is not from user, this may lead to incorrect routing");
|
warn!("last message is not from user, may lead to incorrect routing");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(last_message) = selected_messages_list_reversed.last() {
|
if let Some(last_message) = selected_messages_list_reversed.last() {
|
||||||
if last_message.role != Role::User {
|
if last_message.role != Role::User {
|
||||||
warn!("RouterModelV1: first message in the conversation is not from user, this may lead to incorrect routing");
|
warn!("first message is not from user, may lead to incorrect routing");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -206,8 +204,9 @@ impl RouterModel for RouterModelV1 {
|
||||||
return Ok(Some((selected_route, model_name)));
|
return Ok(Some((selected_route, model_name)));
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"No matching model found for route: {}, usage preferences: {:?}",
|
route = %selected_route,
|
||||||
selected_route, usage_preferences
|
preferences = ?usage_preferences,
|
||||||
|
"no matching model found for route"
|
||||||
);
|
);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
@ -219,8 +218,9 @@ impl RouterModel for RouterModelV1 {
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!(
|
warn!(
|
||||||
"No model found for route: {}, router model preferences: {:?}",
|
route = %selected_route,
|
||||||
selected_route, self.llm_route_to_model_map
|
preferences = ?self.llm_route_to_model_map,
|
||||||
|
"no model found for route"
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
|
|
||||||
|
|
@ -92,18 +92,16 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
|
||||||
match decoder.read_to_end(&mut decompressed) {
|
match decoder.read_to_end(&mut decompressed) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!(
|
debug!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully decompressed {} bytes to {} bytes",
|
original_bytes = self.chunk_buffer.len(),
|
||||||
self.request_id,
|
decompressed_bytes = decompressed.len(),
|
||||||
self.chunk_buffer.len(),
|
"Successfully decompressed response"
|
||||||
decompressed.len()
|
|
||||||
);
|
);
|
||||||
decompressed
|
decompressed
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to decompress gzip buffer: {}",
|
error = %e,
|
||||||
self.request_id,
|
"Failed to decompress gzip buffer"
|
||||||
e
|
|
||||||
);
|
);
|
||||||
self.chunk_buffer.clone()
|
self.chunk_buffer.clone()
|
||||||
}
|
}
|
||||||
|
|
@ -111,9 +109,8 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
|
||||||
}
|
}
|
||||||
Some(encoding) => {
|
Some(encoding) => {
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Unsupported Content-Encoding: {}. Only gzip is currently supported.",
|
encoding = %encoding,
|
||||||
self.request_id,
|
"Unsupported Content-Encoding, only gzip is supported"
|
||||||
encoding
|
|
||||||
);
|
);
|
||||||
self.chunk_buffer.clone()
|
self.chunk_buffer.clone()
|
||||||
}
|
}
|
||||||
|
|
@ -143,10 +140,9 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
|
||||||
serde_json::from_str::<ResponsesAPIStreamEvent>(data_str)
|
serde_json::from_str::<ResponsesAPIStreamEvent>(data_str)
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured streaming response.completed: response_id={}, output_items={}",
|
response_id = %response.id,
|
||||||
self.request_id,
|
output_items = response.output.len(),
|
||||||
response.id,
|
"Captured streaming response"
|
||||||
response.output.len()
|
|
||||||
);
|
);
|
||||||
self.response_id = Some(response.id.clone());
|
self.response_id = Some(response.id.clone());
|
||||||
self.output_items = Some(response.output.clone());
|
self.output_items = Some(response.output.clone());
|
||||||
|
|
@ -175,24 +171,20 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
|
||||||
) {
|
) {
|
||||||
Ok(response) => {
|
Ok(response) => {
|
||||||
info!(
|
info!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured non-streaming response: response_id={}, output_items={}",
|
response_id = %response.id,
|
||||||
self.request_id,
|
output_items = response.output.len(),
|
||||||
response.id,
|
"Captured non-streaming response"
|
||||||
response.output.len()
|
|
||||||
);
|
);
|
||||||
self.response_id = Some(response.id.clone());
|
self.response_id = Some(response.id.clone());
|
||||||
self.output_items = Some(response.output.clone());
|
self.output_items = Some(response.output.clone());
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Log parse error with chunk preview for debugging
|
|
||||||
let chunk_preview = String::from_utf8_lossy(&decompressed);
|
let chunk_preview = String::from_utf8_lossy(&decompressed);
|
||||||
let preview_len = chunk_preview.len().min(200);
|
let preview_len = chunk_preview.len().min(200);
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to parse non-streaming ResponsesAPIResponse: {}. Decompressed preview (first {} bytes): {}",
|
error = %e,
|
||||||
self.request_id,
|
preview = %&chunk_preview[..preview_len],
|
||||||
e,
|
"Failed to parse non-streaming ResponsesAPIResponse"
|
||||||
preview_len,
|
|
||||||
&chunk_preview[..preview_len]
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -221,10 +213,7 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
|
||||||
|
|
||||||
// Skip storage for OpenAI upstream
|
// Skip storage for OpenAI upstream
|
||||||
if self.is_openai_upstream {
|
if self.is_openai_upstream {
|
||||||
debug!(
|
debug!("Skipping state storage for OpenAI upstream");
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Skipping state storage for OpenAI upstream provider",
|
|
||||||
self.request_id
|
|
||||||
);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -234,8 +223,9 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
|
||||||
let output_as_inputs = outputs_to_inputs(output_items);
|
let output_as_inputs = outputs_to_inputs(output_items);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Converting outputs to inputs: output_items_count={}, converted_input_items_count={}",
|
output_items = output_items.len(),
|
||||||
self.request_id, output_items.len(), output_as_inputs.len()
|
converted_items = output_as_inputs.len(),
|
||||||
|
"Converting outputs to inputs"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Combine original input + output as new input history
|
// Combine original input + output as new input history
|
||||||
|
|
@ -243,11 +233,9 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
|
||||||
combined_input.extend(output_as_inputs);
|
combined_input.extend(output_as_inputs);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Storing state: original_input_count={}, combined_input_count={}, combined_json={}",
|
original_input = self.original_input.len(),
|
||||||
self.request_id,
|
combined_input = combined_input.len(),
|
||||||
self.original_input.len(),
|
"Storing conversation state"
|
||||||
combined_input.len(),
|
|
||||||
serde_json::to_string(&combined_input).unwrap_or_else(|_| "serialization_error".to_string())
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let state = OpenAIConversationState {
|
let state = OpenAIConversationState {
|
||||||
|
|
@ -270,28 +258,27 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
|
||||||
match storage.put(state).await {
|
match storage.put(state).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
info!(
|
info!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully stored conversation state for response_id: {}, items_count={}",
|
request_id = %request_id,
|
||||||
request_id,
|
response_id = %response_id_clone,
|
||||||
response_id_clone,
|
items = items_count,
|
||||||
items_count
|
"Stored conversation state"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to store conversation state for response_id {}: {}",
|
request_id = %request_id,
|
||||||
request_id,
|
response_id = %response_id_clone,
|
||||||
response_id_clone,
|
error = %e,
|
||||||
e
|
"Failed to store conversation state"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | No response_id captured from upstream response - cannot store conversation state. response_id present: {}, output present: {}",
|
has_response_id = self.response_id.is_some(),
|
||||||
self.request_id,
|
has_output = self.output_items.is_some(),
|
||||||
self.response_id.is_some(),
|
"No response_id captured, cannot store conversation state"
|
||||||
self.output_items.is_some()
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,8 @@ use time::macros::format_description;
|
||||||
use tracing::{Event, Subscriber};
|
use tracing::{Event, Subscriber};
|
||||||
use tracing_subscriber::fmt::{format, time::FormatTime, FmtContext, FormatEvent, FormatFields};
|
use tracing_subscriber::fmt::{format, time::FormatTime, FmtContext, FormatEvent, FormatFields};
|
||||||
use tracing_subscriber::layer::SubscriberExt;
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
|
use tracing_subscriber::registry::LookupSpan;
|
||||||
|
use tracing_subscriber::util::SubscriberInitExt;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
struct BracketedTime;
|
struct BracketedTime;
|
||||||
|
|
@ -30,7 +32,7 @@ struct BracketedFormatter;
|
||||||
|
|
||||||
impl<S, N> FormatEvent<S, N> for BracketedFormatter
|
impl<S, N> FormatEvent<S, N> for BracketedFormatter
|
||||||
where
|
where
|
||||||
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||||
N: for<'a> FormatFields<'a> + 'static,
|
N: for<'a> FormatFields<'a> + 'static,
|
||||||
{
|
{
|
||||||
fn format_event(
|
fn format_event(
|
||||||
|
|
@ -44,16 +46,37 @@ where
|
||||||
|
|
||||||
write!(
|
write!(
|
||||||
writer,
|
writer,
|
||||||
"[{}] ",
|
"[{}]",
|
||||||
event.metadata().level().to_string().to_lowercase()
|
event.metadata().level().to_string().to_lowercase()
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Extract request_id from span context if present
|
||||||
|
if let Some(scope) = ctx.event_scope() {
|
||||||
|
for span in scope.from_root() {
|
||||||
|
let extensions = span.extensions();
|
||||||
|
if let Some(fields) = extensions.get::<FormattedFields<N>>() {
|
||||||
|
let fields_str = fields.fields.as_str();
|
||||||
|
// Look for request_id in the formatted fields
|
||||||
|
if let Some(start) = fields_str.find("request_id=") {
|
||||||
|
let rest = &fields_str[start + 11..]; // Skip "request_id="
|
||||||
|
let end = rest.find(|c: char| c.is_whitespace()).unwrap_or(rest.len());
|
||||||
|
let rid = &rest[..end];
|
||||||
|
write!(writer, "[request_id={}]", rid)?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
write!(writer, " ")?;
|
||||||
ctx.field_format().format_fields(writer.by_ref(), event)?;
|
ctx.field_format().format_fields(writer.by_ref(), event)?;
|
||||||
|
|
||||||
writeln!(writer)
|
writeln!(writer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use tracing_subscriber::fmt::FormattedFields;
|
||||||
|
|
||||||
static INIT_LOGGER: OnceLock<SdkTracerProvider> = OnceLock::new();
|
static INIT_LOGGER: OnceLock<SdkTracerProvider> = OnceLock::new();
|
||||||
|
|
||||||
pub fn init_tracer() -> &'static SdkTracerProvider {
|
pub fn init_tracer() -> &'static SdkTracerProvider {
|
||||||
|
|
@ -94,10 +117,19 @@ pub fn init_tracer() -> &'static SdkTracerProvider {
|
||||||
tracing_opentelemetry::layer().with_tracer(provider.tracer("brightstaff"));
|
tracing_opentelemetry::layer().with_tracer(provider.tracer("brightstaff"));
|
||||||
|
|
||||||
// Combine the OpenTelemetry layer with fmt layer using the registry
|
// Combine the OpenTelemetry layer with fmt layer using the registry
|
||||||
|
let env_filter =
|
||||||
|
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||||
|
|
||||||
|
// Create fmt layer with span field formatting enabled (no ANSI to keep fields parseable)
|
||||||
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||||
|
.event_format(BracketedFormatter)
|
||||||
|
.fmt_fields(format::DefaultFields::new())
|
||||||
|
.with_ansi(false);
|
||||||
|
|
||||||
let subscriber = tracing_subscriber::registry()
|
let subscriber = tracing_subscriber::registry()
|
||||||
.with(telemetry_layer)
|
.with(telemetry_layer)
|
||||||
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
.with(env_filter)
|
||||||
.with(tracing_subscriber::fmt::layer().event_format(BracketedFormatter));
|
.with(fmt_layer);
|
||||||
|
|
||||||
tracing::subscriber::set_global_default(subscriber)
|
tracing::subscriber::set_global_default(subscriber)
|
||||||
.expect("Failed to set tracing subscriber");
|
.expect("Failed to set tracing subscriber");
|
||||||
|
|
@ -108,11 +140,18 @@ pub fn init_tracer() -> &'static SdkTracerProvider {
|
||||||
let provider = SdkTracerProvider::builder().build();
|
let provider = SdkTracerProvider::builder().build();
|
||||||
global::set_tracer_provider(provider.clone());
|
global::set_tracer_provider(provider.clone());
|
||||||
|
|
||||||
tracing_subscriber::fmt()
|
let env_filter =
|
||||||
.with_env_filter(
|
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||||
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
|
|
||||||
)
|
// Create fmt layer with span field formatting enabled (no ANSI to keep fields parseable)
|
||||||
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||||
.event_format(BracketedFormatter)
|
.event_format(BracketedFormatter)
|
||||||
|
.fmt_fields(format::DefaultFields::new())
|
||||||
|
.with_ansi(false);
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(env_filter)
|
||||||
|
.with(fmt_layer)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
provider
|
provider
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@ services:
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
ports:
|
ports:
|
||||||
- "8001:8001"
|
- "8001:8001"
|
||||||
|
- "11000:11000"
|
||||||
|
- "12001:12001"
|
||||||
environment:
|
environment:
|
||||||
- ARCH_CONFIG_PATH=/app/arch_config.yaml
|
- ARCH_CONFIG_PATH=/app/arch_config.yaml
|
||||||
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
|
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue