mirror of
https://github.com/katanemo/plano.git
synced 2026-05-12 01:02:56 +02:00
use standard tracing and logging in brightstaff (#721)
This commit is contained in:
parent
4d9ed74b68
commit
46de89590b
55 changed files with 1494 additions and 2432 deletions
|
|
@ -4,20 +4,18 @@ use common::configuration::{Agent, AgentFilterChain};
|
|||
use common::consts::{
|
||||
ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER, TRACE_PARENT_HEADER,
|
||||
};
|
||||
use common::traces::{generate_random_span_id, SpanBuilder, SpanKind};
|
||||
use hermesllm::apis::openai::Message;
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
use hyper::header::HeaderMap;
|
||||
use std::time::{Instant, SystemTime};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::tracing::operation_component::{self};
|
||||
use crate::tracing::{http, OperationNameBuilder};
|
||||
use opentelemetry::global;
|
||||
use opentelemetry_http::HeaderInjector;
|
||||
use tracing::{debug, info, instrument, warn};
|
||||
|
||||
use crate::handlers::jsonrpc::{
|
||||
JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION,
|
||||
MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD,
|
||||
};
|
||||
use crate::tracing::{operation_component, set_service_name};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Errors that can occur during pipeline processing
|
||||
|
|
@ -81,115 +79,14 @@ impl PipelineProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Record a span for filter execution
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn record_filter_span(
|
||||
&self,
|
||||
collector: &std::sync::Arc<common::traces::TraceCollector>,
|
||||
agent_name: &str,
|
||||
tool_name: &str,
|
||||
start_time: SystemTime,
|
||||
end_time: SystemTime,
|
||||
elapsed: std::time::Duration,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
span_id: String,
|
||||
) -> String {
|
||||
// let (trace_id, parent_span_id) = self.extract_trace_context();
|
||||
|
||||
// Build operation name: POST /agents/* {filter_name}
|
||||
// Using generic path since we don't have access to specific endpoint here
|
||||
let operation_name = OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path("/agents/*")
|
||||
.with_target(agent_name)
|
||||
.build();
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id.clone())
|
||||
.with_kind(SpanKind::Client)
|
||||
.with_start_time(start_time)
|
||||
.with_end_time(end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, "/agents/*")
|
||||
.with_attribute("filter.name", agent_name.to_string())
|
||||
.with_attribute("filter.tool_name", tool_name.to_string())
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", elapsed.as_secs_f64() * 1000.0),
|
||||
);
|
||||
|
||||
if !trace_id.is_empty() {
|
||||
span_builder = span_builder.with_trace_id(trace_id);
|
||||
}
|
||||
if !parent_span_id.is_empty() {
|
||||
span_builder = span_builder.with_parent_span_id(parent_span_id);
|
||||
}
|
||||
|
||||
let span = span_builder.build();
|
||||
// Use plano(filter) as service name for filter execution spans
|
||||
collector.record_span(operation_component::AGENT_FILTER, span);
|
||||
span_id.clone()
|
||||
}
|
||||
|
||||
/// Record a span for MCP protocol interactions
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn record_agent_filter_span(
|
||||
&self,
|
||||
collector: &std::sync::Arc<common::traces::TraceCollector>,
|
||||
operation: &str,
|
||||
agent_id: &str,
|
||||
start_time: SystemTime,
|
||||
end_time: SystemTime,
|
||||
elapsed: std::time::Duration,
|
||||
additional_attrs: Option<HashMap<&str, String>>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
span_id: Option<String>,
|
||||
) {
|
||||
// let (trace_id, parent_span_id) = self.extract_trace_context();
|
||||
|
||||
// Build operation name: POST /mcp {agent_id}
|
||||
let operation_name = OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path("/mcp")
|
||||
.with_operation(operation)
|
||||
.with_target(agent_id)
|
||||
.build();
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id.unwrap_or_else(generate_random_span_id))
|
||||
.with_kind(SpanKind::Client)
|
||||
.with_start_time(start_time)
|
||||
.with_end_time(end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, format!("/mcp ({})", operation))
|
||||
.with_attribute("mcp.operation", operation.to_string())
|
||||
.with_attribute("mcp.agent_id", agent_id.to_string())
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", elapsed.as_secs_f64() * 1000.0),
|
||||
);
|
||||
|
||||
if let Some(attrs) = additional_attrs {
|
||||
for (key, value) in attrs {
|
||||
span_builder = span_builder.with_attribute(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
if !trace_id.is_empty() {
|
||||
span_builder = span_builder.with_trace_id(trace_id);
|
||||
}
|
||||
if !parent_span_id.is_empty() {
|
||||
span_builder = span_builder.with_parent_span_id(parent_span_id);
|
||||
}
|
||||
|
||||
let span = span_builder.build();
|
||||
// MCP spans also use plano(filter) service name as they are part of filter operations
|
||||
collector.record_span(operation_component::AGENT_FILTER, span);
|
||||
}
|
||||
|
||||
/// Process the filter chain of agents (all except the terminal agent)
|
||||
// /// Process the filter chain of agents (all except the terminal agent)
|
||||
// #[instrument(
|
||||
// skip(self, chat_history, agent_filter_chain, agent_map, request_headers),
|
||||
// fields(
|
||||
// filter_count = agent_filter_chain.filter_chain.as_ref().map(|fc| fc.len()).unwrap_or(0),
|
||||
// message_count = chat_history.len()
|
||||
// )
|
||||
// )]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn process_filter_chain(
|
||||
&mut self,
|
||||
|
|
@ -197,9 +94,6 @@ impl PipelineProcessor {
|
|||
agent_filter_chain: &AgentFilterChain,
|
||||
agent_map: &HashMap<String, Agent>,
|
||||
request_headers: &HeaderMap,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
) -> Result<Vec<Message>, PipelineError> {
|
||||
let mut chat_history_updated = chat_history.to_vec();
|
||||
|
||||
|
|
@ -210,7 +104,7 @@ impl PipelineProcessor {
|
|||
};
|
||||
|
||||
for agent_name in filter_chain {
|
||||
debug!("Processing filter agent: {}", agent_name);
|
||||
debug!(agent = %agent_name, "processing filter agent");
|
||||
|
||||
let agent = agent_map
|
||||
.get(agent_name)
|
||||
|
|
@ -219,68 +113,29 @@ impl PipelineProcessor {
|
|||
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
|
||||
|
||||
info!(
|
||||
"executing filter: {}/{}, url: {}, type: {}, conversation length: {}",
|
||||
agent_name,
|
||||
tool_name,
|
||||
agent.url,
|
||||
agent.agent_type.as_deref().unwrap_or("mcp"),
|
||||
chat_history.len()
|
||||
agent = %agent_name,
|
||||
tool = %tool_name,
|
||||
url = %agent.url,
|
||||
agent_type = %agent.agent_type.as_deref().unwrap_or("mcp"),
|
||||
conversation_len = chat_history.len(),
|
||||
"executing filter"
|
||||
);
|
||||
|
||||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
|
||||
// Generate filter span ID before execution so MCP spans can use it as parent
|
||||
let filter_span_id = generate_random_span_id();
|
||||
|
||||
if agent.agent_type.as_deref().unwrap_or("mcp") == "mcp" {
|
||||
chat_history_updated = self
|
||||
.execute_mcp_filter(
|
||||
&chat_history_updated,
|
||||
agent,
|
||||
request_headers,
|
||||
trace_collector,
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
)
|
||||
.execute_mcp_filter(&chat_history_updated, agent, request_headers)
|
||||
.await?;
|
||||
} else {
|
||||
chat_history_updated = self
|
||||
.execute_http_filter(
|
||||
&chat_history_updated,
|
||||
agent,
|
||||
request_headers,
|
||||
trace_collector,
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
)
|
||||
.execute_http_filter(&chat_history_updated, agent, request_headers)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
let elapsed = start_instant.elapsed();
|
||||
|
||||
info!(
|
||||
"Filter '{}' completed in {:.2}ms, updated conversation length: {}",
|
||||
agent_name,
|
||||
elapsed.as_secs_f64() * 1000.0,
|
||||
chat_history_updated.len()
|
||||
agent = %agent_name,
|
||||
updated_len = chat_history_updated.len(),
|
||||
"filter completed"
|
||||
);
|
||||
|
||||
// Record span for this filter execution
|
||||
if let Some(collector) = trace_collector {
|
||||
self.record_filter_span(
|
||||
collector,
|
||||
agent_name,
|
||||
tool_name,
|
||||
start_time,
|
||||
end_time,
|
||||
elapsed,
|
||||
trace_id.clone(),
|
||||
parent_span_id.clone(),
|
||||
filter_span_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(chat_history_updated)
|
||||
|
|
@ -292,18 +147,17 @@ impl PipelineProcessor {
|
|||
request_headers: &HeaderMap,
|
||||
agent_id: &str,
|
||||
session_id: Option<&str>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
) -> Result<HeaderMap, PipelineError> {
|
||||
let trace_parent = format!("00-{}-{}-01", trace_id, parent_span_id);
|
||||
let mut headers = request_headers.clone();
|
||||
headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
// Inject OpenTelemetry trace context automatically
|
||||
headers.remove(TRACE_PARENT_HEADER);
|
||||
headers.insert(
|
||||
TRACE_PARENT_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx =
|
||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut headers));
|
||||
});
|
||||
|
||||
headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
|
|
@ -348,9 +202,9 @@ impl PipelineProcessor {
|
|||
// Validate SSE format: first line should be "event: message"
|
||||
if lines.is_empty() || lines[0] != "event: message" {
|
||||
warn!(
|
||||
"Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}",
|
||||
agent_id,
|
||||
lines.first()
|
||||
agent = %agent_id,
|
||||
first_line = ?lines.first(),
|
||||
"invalid SSE response format"
|
||||
);
|
||||
return Err(PipelineError::NoContentInResponse(format!(
|
||||
"Invalid SSE response format from agent {}: expected 'event: message' as first line",
|
||||
|
|
@ -367,9 +221,9 @@ impl PipelineProcessor {
|
|||
|
||||
if data_lines.len() != 1 {
|
||||
warn!(
|
||||
"Expected exactly one 'data:' line from agent {}, found {}",
|
||||
agent_id,
|
||||
data_lines.len()
|
||||
agent = %agent_id,
|
||||
found = data_lines.len(),
|
||||
"expected exactly one 'data:' line"
|
||||
);
|
||||
return Err(PipelineError::NoContentInResponse(format!(
|
||||
"Expected exactly one 'data:' line from agent {}, found {}",
|
||||
|
|
@ -429,27 +283,34 @@ impl PipelineProcessor {
|
|||
}
|
||||
|
||||
/// Send request to a specific agent and return the response content
|
||||
#[instrument(
|
||||
skip(self, messages, agent, request_headers),
|
||||
fields(
|
||||
agent_id = %agent.id,
|
||||
filter_name = %agent.id,
|
||||
message_count = messages.len()
|
||||
)
|
||||
)]
|
||||
async fn execute_mcp_filter(
|
||||
&mut self,
|
||||
messages: &[Message],
|
||||
agent: &Agent,
|
||||
request_headers: &HeaderMap,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
filter_span_id: String,
|
||||
) -> Result<Vec<Message>, PipelineError> {
|
||||
// Set service name for this filter span
|
||||
set_service_name(operation_component::AGENT_FILTER);
|
||||
|
||||
// Update current span name to include filter name
|
||||
use opentelemetry::trace::get_active_span;
|
||||
get_active_span(|span| {
|
||||
span.update_name(format!("execute_mcp_filter ({})", agent.id));
|
||||
});
|
||||
|
||||
// Get or create MCP session
|
||||
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
|
||||
session_id.clone()
|
||||
} else {
|
||||
let session_id = self
|
||||
.get_new_session_id(
|
||||
&agent.id,
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
request_headers,
|
||||
)
|
||||
.await;
|
||||
let session_id = self.get_new_session_id(&agent.id, request_headers).await;
|
||||
self.agent_id_session_map
|
||||
.insert(agent.id.clone(), session_id.clone());
|
||||
session_id
|
||||
|
|
@ -464,21 +325,9 @@ impl PipelineProcessor {
|
|||
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
|
||||
let json_rpc_request = self.build_tool_call_request(tool_name, messages)?;
|
||||
|
||||
// Generate span ID for this MCP tool call (child of filter span)
|
||||
let mcp_span_id = generate_random_span_id();
|
||||
|
||||
// Build headers
|
||||
let agent_headers = self.build_mcp_headers(
|
||||
request_headers,
|
||||
&agent.id,
|
||||
Some(&mcp_session_id),
|
||||
trace_id.clone(),
|
||||
mcp_span_id.clone(),
|
||||
)?;
|
||||
|
||||
// Send request with tracing
|
||||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
let agent_headers =
|
||||
self.build_mcp_headers(request_headers, &agent.id, Some(&mcp_session_id))?;
|
||||
|
||||
let response = self
|
||||
.send_mcp_request(&json_rpc_request, &agent_headers, &agent.id)
|
||||
|
|
@ -486,31 +335,6 @@ impl PipelineProcessor {
|
|||
let http_status = response.status();
|
||||
let response_bytes = response.bytes().await?;
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
let elapsed = start_instant.elapsed();
|
||||
|
||||
// Record MCP tool call span
|
||||
if let Some(collector) = trace_collector {
|
||||
let mut attrs = HashMap::new();
|
||||
attrs.insert("mcp.method", "tools/call".to_string());
|
||||
attrs.insert("mcp.tool_name", tool_name.to_string());
|
||||
attrs.insert("mcp.session_id", mcp_session_id.clone());
|
||||
attrs.insert("http.status_code", http_status.as_u16().to_string());
|
||||
|
||||
self.record_agent_filter_span(
|
||||
collector,
|
||||
"tool_call",
|
||||
&agent.id,
|
||||
start_time,
|
||||
end_time,
|
||||
elapsed,
|
||||
Some(attrs),
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
Some(mcp_span_id),
|
||||
);
|
||||
}
|
||||
|
||||
// Handle HTTP errors
|
||||
if !http_status.is_success() {
|
||||
let error_body = String::from_utf8_lossy(&response_bytes).to_string();
|
||||
|
|
@ -611,8 +435,6 @@ impl PipelineProcessor {
|
|||
&self,
|
||||
agent_id: &str,
|
||||
session_id: &str,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
request_headers: &HeaderMap,
|
||||
) -> Result<(), PipelineError> {
|
||||
let initialized_notification = JsonRpcNotification {
|
||||
|
|
@ -622,15 +444,9 @@ impl PipelineProcessor {
|
|||
};
|
||||
|
||||
let notification_body = serde_json::to_string(&initialized_notification)?;
|
||||
debug!("Sending initialized notification for agent {}", agent_id);
|
||||
debug!("sending initialized notification for agent {}", agent_id);
|
||||
|
||||
let headers = self.build_mcp_headers(
|
||||
request_headers,
|
||||
agent_id,
|
||||
Some(session_id),
|
||||
trace_id.clone(),
|
||||
parent_span_id.clone(),
|
||||
)?;
|
||||
let headers = self.build_mcp_headers(request_headers, agent_id, Some(session_id))?;
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
|
@ -641,31 +457,19 @@ impl PipelineProcessor {
|
|||
.await?;
|
||||
|
||||
info!(
|
||||
"Initialized notification response status: {}",
|
||||
"initialized notification response status: {}",
|
||||
response.status()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_new_session_id(
|
||||
&self,
|
||||
agent_id: &str,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
request_headers: &HeaderMap,
|
||||
) -> String {
|
||||
info!("Initializing MCP session for agent {}", agent_id);
|
||||
async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String {
|
||||
info!("initializing MCP session for agent {}", agent_id);
|
||||
|
||||
let initialize_request = self.build_initialize_request();
|
||||
let headers = self
|
||||
.build_mcp_headers(
|
||||
request_headers,
|
||||
agent_id,
|
||||
None,
|
||||
trace_id.clone(),
|
||||
parent_span_id.clone(),
|
||||
)
|
||||
.build_mcp_headers(request_headers, agent_id, None)
|
||||
.expect("Failed to build headers for initialization");
|
||||
|
||||
let response = self
|
||||
|
|
@ -673,7 +477,7 @@ impl PipelineProcessor {
|
|||
.await
|
||||
.expect("Failed to initialize MCP session");
|
||||
|
||||
info!("Initialize response status: {}", response.status());
|
||||
info!("initialize response status: {}", response.status());
|
||||
|
||||
let session_id = response
|
||||
.headers()
|
||||
|
|
@ -683,49 +487,54 @@ impl PipelineProcessor {
|
|||
.to_string();
|
||||
|
||||
info!(
|
||||
"Created new MCP session for agent {}: {}",
|
||||
"created new MCP session for agent {}: {}",
|
||||
agent_id, session_id
|
||||
);
|
||||
|
||||
// Send initialized notification
|
||||
self.send_initialized_notification(
|
||||
agent_id,
|
||||
&session_id,
|
||||
trace_id.clone(),
|
||||
parent_span_id.clone(),
|
||||
&headers,
|
||||
)
|
||||
.await
|
||||
.expect("Failed to send initialized notification");
|
||||
self.send_initialized_notification(agent_id, &session_id, &headers)
|
||||
.await
|
||||
.expect("Failed to send initialized notification");
|
||||
|
||||
session_id
|
||||
}
|
||||
|
||||
/// Execute a HTTP-based filter agent
|
||||
#[instrument(
|
||||
skip(self, messages, agent, request_headers),
|
||||
fields(
|
||||
agent_id = %agent.id,
|
||||
agent_url = %agent.url,
|
||||
filter_name = %agent.id,
|
||||
message_count = messages.len()
|
||||
)
|
||||
)]
|
||||
async fn execute_http_filter(
|
||||
&mut self,
|
||||
messages: &[Message],
|
||||
agent: &Agent,
|
||||
request_headers: &HeaderMap,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
filter_span_id: String,
|
||||
) -> Result<Vec<Message>, PipelineError> {
|
||||
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
|
||||
// Set service name for this filter span
|
||||
set_service_name(operation_component::AGENT_FILTER);
|
||||
|
||||
// Generate span ID for this HTTP call (child of filter span)
|
||||
let http_span_id = generate_random_span_id();
|
||||
// Update current span name to include filter name
|
||||
use opentelemetry::trace::get_active_span;
|
||||
get_active_span(|span| {
|
||||
span.update_name(format!("execute_http_filter ({})", agent.id));
|
||||
});
|
||||
|
||||
// Build headers
|
||||
let trace_parent = format!("00-{}-{}-01", trace_id, http_span_id);
|
||||
let mut agent_headers = request_headers.clone();
|
||||
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
// Inject OpenTelemetry trace context automatically
|
||||
agent_headers.remove(TRACE_PARENT_HEADER);
|
||||
agent_headers.insert(
|
||||
TRACE_PARENT_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx =
|
||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
|
||||
});
|
||||
|
||||
agent_headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
|
|
@ -748,10 +557,6 @@ impl PipelineProcessor {
|
|||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
);
|
||||
|
||||
// Send request with tracing
|
||||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
|
||||
debug!(
|
||||
"Sending HTTP request to agent {} at URL: {}",
|
||||
agent.id, agent.url
|
||||
|
|
@ -769,30 +574,6 @@ impl PipelineProcessor {
|
|||
let http_status = response.status();
|
||||
let response_bytes = response.bytes().await?;
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
let elapsed = start_instant.elapsed();
|
||||
|
||||
// Record HTTP call span
|
||||
if let Some(collector) = trace_collector {
|
||||
let mut attrs = HashMap::new();
|
||||
attrs.insert("http.tool_name", tool_name.to_string());
|
||||
attrs.insert("http.url", agent.url.clone());
|
||||
attrs.insert("http.status_code", http_status.as_u16().to_string());
|
||||
|
||||
self.record_agent_filter_span(
|
||||
collector,
|
||||
"http_call",
|
||||
&agent.id,
|
||||
start_time,
|
||||
end_time,
|
||||
elapsed,
|
||||
Some(attrs),
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
Some(http_span_id),
|
||||
);
|
||||
}
|
||||
|
||||
// Handle HTTP errors
|
||||
if !http_status.is_success() {
|
||||
let error_body = String::from_utf8_lossy(&response_bytes).to_string();
|
||||
|
|
@ -825,34 +606,34 @@ impl PipelineProcessor {
|
|||
}
|
||||
|
||||
/// Send request to terminal agent and return the raw response for streaming
|
||||
/// Note: The caller is responsible for creating the plano(agent) span that wraps
|
||||
/// both this call and the subsequent response consumption.
|
||||
pub async fn invoke_agent(
|
||||
&self,
|
||||
messages: &[Message],
|
||||
mut original_request: ProviderRequestType,
|
||||
terminal_agent: &Agent,
|
||||
request_headers: &HeaderMap,
|
||||
trace_id: String,
|
||||
agent_span_id: String,
|
||||
) -> Result<reqwest::Response, PipelineError> {
|
||||
// let mut request = original_request.clone();
|
||||
original_request.set_messages(messages);
|
||||
|
||||
let request_url = "/v1/chat/completions";
|
||||
|
||||
let request_body = ProviderRequestType::to_bytes(&original_request).unwrap();
|
||||
// let request_body = serde_json::to_string(&request)?;
|
||||
debug!("Sending request to terminal agent {}", terminal_agent.id);
|
||||
debug!("sending request to terminal agent {}", terminal_agent.id);
|
||||
|
||||
let mut agent_headers = request_headers.clone();
|
||||
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
// Set traceparent header to make the egress span a child of the agent span
|
||||
if !trace_id.is_empty() && !agent_span_id.is_empty() {
|
||||
let trace_parent = format!("00-{}-{}-01", trace_id, agent_span_id);
|
||||
agent_headers.remove(TRACE_PARENT_HEADER);
|
||||
agent_headers.insert(
|
||||
TRACE_PARENT_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
}
|
||||
// Inject OpenTelemetry trace context automatically
|
||||
agent_headers.remove(TRACE_PARENT_HEADER);
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx =
|
||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
|
||||
});
|
||||
|
||||
agent_headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
|
|
@ -867,7 +648,7 @@ impl PipelineProcessor {
|
|||
|
||||
let response = self
|
||||
.client
|
||||
.post(format!("{}/v1/chat/completions", self.url))
|
||||
.post(format!("{}{}", self.url, request_url))
|
||||
.headers(agent_headers)
|
||||
.body(request_body)
|
||||
.send()
|
||||
|
|
@ -914,15 +695,7 @@ mod tests {
|
|||
let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]);
|
||||
|
||||
let result = processor
|
||||
.process_filter_chain(
|
||||
&messages,
|
||||
&pipeline,
|
||||
&agent_map,
|
||||
&request_headers,
|
||||
None,
|
||||
String::new(),
|
||||
String::new(),
|
||||
)
|
||||
.process_filter_chain(&messages, &pipeline, &agent_map, &request_headers)
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
|
|
@ -956,14 +729,7 @@ mod tests {
|
|||
let request_headers = HeaderMap::new();
|
||||
|
||||
let result = processor
|
||||
.execute_mcp_filter(
|
||||
&messages,
|
||||
&agent,
|
||||
&request_headers,
|
||||
None,
|
||||
"trace-123".to_string(),
|
||||
"span-123".to_string(),
|
||||
)
|
||||
.execute_mcp_filter(&messages, &agent, &request_headers)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
|
@ -1002,14 +768,7 @@ mod tests {
|
|||
let request_headers = HeaderMap::new();
|
||||
|
||||
let result = processor
|
||||
.execute_mcp_filter(
|
||||
&messages,
|
||||
&agent,
|
||||
&request_headers,
|
||||
None,
|
||||
"trace-456".to_string(),
|
||||
"span-456".to_string(),
|
||||
)
|
||||
.execute_mcp_filter(&messages, &agent, &request_headers)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
|
@ -1061,14 +820,7 @@ mod tests {
|
|||
let request_headers = HeaderMap::new();
|
||||
|
||||
let result = processor
|
||||
.execute_mcp_filter(
|
||||
&messages,
|
||||
&agent,
|
||||
&request_headers,
|
||||
None,
|
||||
"trace-789".to_string(),
|
||||
"span-789".to_string(),
|
||||
)
|
||||
.execute_mcp_filter(&messages, &agent, &request_headers)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue