mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
fix span
This commit is contained in:
parent
bb9503e873
commit
4d52acf60c
7 changed files with 198 additions and 149 deletions
|
|
@ -214,21 +214,21 @@ static_resources:
|
|||
- name: envoy.filters.network.http_connection_manager
|
||||
typed_config:
|
||||
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
|
||||
{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
|
||||
generate_request_id: true
|
||||
tracing:
|
||||
provider:
|
||||
name: envoy.tracers.opentelemetry
|
||||
typed_config:
|
||||
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
|
||||
grpc_service:
|
||||
envoy_grpc:
|
||||
cluster_name: opentelemetry_collector
|
||||
timeout: 0.250s
|
||||
service_name: tools
|
||||
random_sampling:
|
||||
value: {{ arch_tracing.random_sampling }}
|
||||
{% endif %}
|
||||
# {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
|
||||
# generate_request_id: true
|
||||
# tracing:
|
||||
# provider:
|
||||
# name: envoy.tracers.opentelemetry
|
||||
# typed_config:
|
||||
# "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
|
||||
# grpc_service:
|
||||
# envoy_grpc:
|
||||
# cluster_name: opentelemetry_collector
|
||||
# timeout: 0.250s
|
||||
# service_name: tools
|
||||
# random_sampling:
|
||||
# value: {{ arch_tracing.random_sampling }}
|
||||
# {% endif %}
|
||||
stat_prefix: outbound_api_traffic
|
||||
codec_type: AUTO
|
||||
scheme_header_transformation:
|
||||
|
|
@ -299,7 +299,7 @@ static_resources:
|
|||
envoy_grpc:
|
||||
cluster_name: opentelemetry_collector
|
||||
timeout: 0.250s
|
||||
service_name: arch_gateway
|
||||
service_name: plano(inbound)
|
||||
random_sampling:
|
||||
value: {{ arch_tracing.random_sampling }}
|
||||
{% endif %}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ use std::time::{Instant, SystemTime};
|
|||
|
||||
use bytes::Bytes;
|
||||
use common::consts::TRACE_PARENT_HEADER;
|
||||
use common::traces::{SpanBuilder, SpanKind, parse_traceparent};
|
||||
use common::traces::{SpanBuilder, SpanKind, parse_traceparent, generate_random_span_id};
|
||||
use hermesllm::apis::OpenAIMessage;
|
||||
use hermesllm::clients::SupportedAPIsFromClient;
|
||||
use hermesllm::providers::request::ProviderRequest;
|
||||
|
|
@ -208,6 +208,13 @@ async fn handle_agent_chat(
|
|||
agent_selector.create_agent_map(agents)
|
||||
};
|
||||
|
||||
// Parse trace parent to get trace_id and parent_span_id
|
||||
let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent {
|
||||
parse_traceparent(tp)
|
||||
} else {
|
||||
(String::new(), None)
|
||||
};
|
||||
|
||||
// Select appropriate agent using arch router llm model
|
||||
let selected_agent = agent_selector
|
||||
.select_agent(&message, &listener, trace_parent.clone())
|
||||
|
|
@ -218,6 +225,14 @@ async fn handle_agent_chat(
|
|||
// Record the start time for agent span
|
||||
let agent_start_time = SystemTime::now();
|
||||
let agent_start_instant = Instant::now();
|
||||
// let (span_id, trace_id) = trace_collector.start_span(
|
||||
// trace_parent.clone(),
|
||||
// operation_component::AGENT,
|
||||
// &format!("/agents{}", request_path),
|
||||
// &selected_agent.id,
|
||||
// );
|
||||
|
||||
let span_id = generate_random_span_id();
|
||||
|
||||
// Process the filter chain
|
||||
let chat_history = pipeline_processor
|
||||
|
|
@ -227,6 +242,8 @@ async fn handle_agent_chat(
|
|||
&agent_map,
|
||||
&request_headers,
|
||||
Some(&trace_collector),
|
||||
trace_id.clone(),
|
||||
span_id.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
@ -243,6 +260,8 @@ async fn handle_agent_chat(
|
|||
client_request,
|
||||
terminal_agent,
|
||||
&request_headers,
|
||||
trace_id.clone(),
|
||||
span_id.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
@ -260,14 +279,8 @@ async fn handle_agent_chat(
|
|||
.with_target(&terminal_agent_name)
|
||||
.build();
|
||||
|
||||
// Parse trace parent to get trace_id and parent_span_id
|
||||
let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent {
|
||||
parse_traceparent(tp)
|
||||
} else {
|
||||
(String::new(), None)
|
||||
};
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(agent_start_time)
|
||||
.with_end_time(agent_end_time)
|
||||
|
|
|
|||
|
|
@ -117,6 +117,8 @@ mod integration_tests {
|
|||
&agent_map,
|
||||
&headers,
|
||||
None,
|
||||
String::new(),
|
||||
String::new(),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,19 +1,23 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use common::configuration::{Agent, AgentFilterChain};
|
||||
use common::consts::{ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER};
|
||||
use common::traces::{SpanBuilder, SpanKind};
|
||||
use common::consts::{
|
||||
ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER, TRACE_PARENT_HEADER,
|
||||
};
|
||||
use common::traces::{SpanBuilder, SpanKind, generate_random_span_id};
|
||||
use hermesllm::apis::openai::Message;
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
use hermesllm::apis::openai::{Message};
|
||||
use hyper::header::HeaderMap;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use tracing::{debug, info, warn};
|
||||
use std::time::{Instant, SystemTime};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::tracing::operation_component::{self};
|
||||
use crate::tracing::{OperationNameBuilder, http};
|
||||
use crate::tracing::{http, OperationNameBuilder};
|
||||
|
||||
use crate::handlers::jsonrpc::{JSON_RPC_VERSION, JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD};
|
||||
use crate::handlers::jsonrpc::{
|
||||
JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION,
|
||||
MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Errors that can occur during pipeline processing
|
||||
|
|
@ -77,27 +81,6 @@ impl PipelineProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Extract trace context from current OpenTelemetry context
|
||||
fn extract_trace_context(&self) -> (String, Option<String>) {
|
||||
let current_cx = opentelemetry::Context::current();
|
||||
let span_ref = current_cx.span();
|
||||
let span_context = span_ref.span_context();
|
||||
|
||||
let trace_id = if span_context.is_valid() {
|
||||
format!("{:032x}", span_context.trace_id())
|
||||
} else {
|
||||
String::new() // SpanBuilder will generate one
|
||||
};
|
||||
|
||||
let parent_span_id = if span_context.is_valid() {
|
||||
Some(format!("{:016x}", span_context.span_id()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
(trace_id, parent_span_id)
|
||||
}
|
||||
|
||||
/// Record a span for filter execution
|
||||
fn record_filter_span(
|
||||
&self,
|
||||
|
|
@ -107,8 +90,11 @@ impl PipelineProcessor {
|
|||
start_time: SystemTime,
|
||||
end_time: SystemTime,
|
||||
elapsed: std::time::Duration,
|
||||
) {
|
||||
let (trace_id, parent_span_id) = self.extract_trace_context();
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
span_id: String,
|
||||
) -> String {
|
||||
// let (trace_id, parent_span_id) = self.extract_trace_context();
|
||||
|
||||
// Build operation name: POST /agents/* {filter_name}
|
||||
// Using generic path since we don't have access to specific endpoint here
|
||||
|
|
@ -119,6 +105,7 @@ impl PipelineProcessor {
|
|||
.build();
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id.clone())
|
||||
.with_kind(SpanKind::Client)
|
||||
.with_start_time(start_time)
|
||||
.with_end_time(end_time)
|
||||
|
|
@ -126,18 +113,22 @@ impl PipelineProcessor {
|
|||
.with_attribute(http::TARGET, "/agents/*")
|
||||
.with_attribute("filter.name", agent_name.to_string())
|
||||
.with_attribute("filter.tool_name", tool_name.to_string())
|
||||
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", elapsed.as_secs_f64() * 1000.0),
|
||||
);
|
||||
|
||||
if !trace_id.is_empty() {
|
||||
span_builder = span_builder.with_trace_id(trace_id);
|
||||
}
|
||||
if let Some(parent_id) = parent_span_id {
|
||||
span_builder = span_builder.with_parent_span_id(parent_id);
|
||||
if !parent_span_id.is_empty() {
|
||||
span_builder = span_builder.with_parent_span_id(parent_span_id);
|
||||
}
|
||||
|
||||
let span = span_builder.build();
|
||||
// Use plano(filter) as service name for filter execution spans
|
||||
collector.record_span(operation_component::AGENT_FILTER, span);
|
||||
span_id.clone()
|
||||
}
|
||||
|
||||
/// Record a span for MCP protocol interactions
|
||||
|
|
@ -150,8 +141,11 @@ impl PipelineProcessor {
|
|||
end_time: SystemTime,
|
||||
elapsed: std::time::Duration,
|
||||
additional_attrs: Option<HashMap<&str, String>>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
span_id: Option<String>,
|
||||
) {
|
||||
let (trace_id, parent_span_id) = self.extract_trace_context();
|
||||
// let (trace_id, parent_span_id) = self.extract_trace_context();
|
||||
|
||||
// Build operation name: POST /mcp {agent_id}
|
||||
let operation_name = OperationNameBuilder::new()
|
||||
|
|
@ -162,6 +156,7 @@ impl PipelineProcessor {
|
|||
.build();
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id.unwrap_or_else(|| generate_random_span_id()))
|
||||
.with_kind(SpanKind::Client)
|
||||
.with_start_time(start_time)
|
||||
.with_end_time(end_time)
|
||||
|
|
@ -169,7 +164,10 @@ impl PipelineProcessor {
|
|||
.with_attribute(http::TARGET, &format!("/mcp ({})", operation.to_string()))
|
||||
.with_attribute("mcp.operation", operation.to_string())
|
||||
.with_attribute("mcp.agent_id", agent_id.to_string())
|
||||
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", elapsed.as_secs_f64() * 1000.0),
|
||||
);
|
||||
|
||||
if let Some(attrs) = additional_attrs {
|
||||
for (key, value) in attrs {
|
||||
|
|
@ -180,8 +178,8 @@ impl PipelineProcessor {
|
|||
if !trace_id.is_empty() {
|
||||
span_builder = span_builder.with_trace_id(trace_id);
|
||||
}
|
||||
if let Some(parent_id) = parent_span_id {
|
||||
span_builder = span_builder.with_parent_span_id(parent_id);
|
||||
if !parent_span_id.is_empty() {
|
||||
span_builder = span_builder.with_parent_span_id(parent_span_id);
|
||||
}
|
||||
|
||||
let span = span_builder.build();
|
||||
|
|
@ -197,6 +195,8 @@ impl PipelineProcessor {
|
|||
agent_map: &HashMap<String, Agent>,
|
||||
request_headers: &HeaderMap,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
) -> Result<Vec<Message>, PipelineError> {
|
||||
let mut chat_history_updated = chat_history.to_vec();
|
||||
|
||||
|
|
@ -220,8 +220,18 @@ impl PipelineProcessor {
|
|||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
|
||||
// Generate filter span ID before execution so MCP spans can use it as parent
|
||||
let filter_span_id = generate_random_span_id();
|
||||
|
||||
chat_history_updated = self
|
||||
.execute_filter(&chat_history_updated, agent, request_headers, trace_collector)
|
||||
.execute_filter(
|
||||
&chat_history_updated,
|
||||
agent,
|
||||
request_headers,
|
||||
trace_collector,
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
|
|
@ -243,6 +253,9 @@ impl PipelineProcessor {
|
|||
start_time,
|
||||
end_time,
|
||||
elapsed,
|
||||
trace_id.clone(),
|
||||
parent_span_id.clone(),
|
||||
filter_span_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -256,10 +269,19 @@ impl PipelineProcessor {
|
|||
request_headers: &HeaderMap,
|
||||
agent_id: &str,
|
||||
session_id: Option<&str>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
) -> Result<HeaderMap, PipelineError> {
|
||||
let trace_parent = format!("00-{}-{}-01", trace_id, parent_span_id);
|
||||
let mut headers = request_headers.clone();
|
||||
headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
headers.remove(TRACE_PARENT_HEADER);
|
||||
headers.insert(
|
||||
TRACE_PARENT_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
|
||||
headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
hyper::header::HeaderValue::from_str(agent_id)
|
||||
|
|
@ -292,7 +314,11 @@ impl PipelineProcessor {
|
|||
}
|
||||
|
||||
/// Parse SSE formatted response and extract JSON-RPC data
|
||||
fn parse_sse_response(&self, response_bytes: &[u8], agent_id: &str) -> Result<String, PipelineError> {
|
||||
fn parse_sse_response(
|
||||
&self,
|
||||
response_bytes: &[u8],
|
||||
agent_id: &str,
|
||||
) -> Result<String, PipelineError> {
|
||||
let response_str = String::from_utf8_lossy(response_bytes);
|
||||
let lines: Vec<&str> = response_str.lines().collect();
|
||||
|
||||
|
|
@ -342,7 +368,10 @@ impl PipelineProcessor {
|
|||
) -> Result<reqwest::Response, PipelineError> {
|
||||
let request_body = serde_json::to_string(json_rpc_request)?;
|
||||
|
||||
debug!("Sending MCP request to agent {}: {}", agent_id, request_body);
|
||||
debug!(
|
||||
"Sending MCP request to agent {}: {}",
|
||||
agent_id, request_body
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
|
@ -362,10 +391,7 @@ impl PipelineProcessor {
|
|||
messages: &[Message],
|
||||
) -> Result<JsonRpcRequest, PipelineError> {
|
||||
let mut arguments = HashMap::new();
|
||||
arguments.insert(
|
||||
"messages".to_string(),
|
||||
serde_json::to_value(messages)?,
|
||||
);
|
||||
arguments.insert("messages".to_string(), serde_json::to_value(messages)?);
|
||||
|
||||
let mut params = HashMap::new();
|
||||
params.insert("name".to_string(), serde_json::to_value(tool_name)?);
|
||||
|
|
@ -386,35 +412,52 @@ impl PipelineProcessor {
|
|||
agent: &Agent,
|
||||
request_headers: &HeaderMap,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
filter_span_id: String,
|
||||
) -> Result<Vec<Message>, PipelineError> {
|
||||
// Get or create MCP session
|
||||
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
|
||||
session_id.clone()
|
||||
} else {
|
||||
let session_id = self.get_new_session_id(&agent.id, trace_collector).await;
|
||||
let session_id = self
|
||||
.get_new_session_id(
|
||||
&agent.id,
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
)
|
||||
.await;
|
||||
self.agent_id_session_map
|
||||
.insert(agent.id.clone(), session_id.clone());
|
||||
session_id
|
||||
};
|
||||
|
||||
info!("Using MCP session ID {} for agent {}", mcp_session_id, agent.id);
|
||||
info!(
|
||||
"Using MCP session ID {} for agent {}",
|
||||
mcp_session_id, agent.id
|
||||
);
|
||||
|
||||
// Build JSON-RPC request
|
||||
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
|
||||
let json_rpc_request = self.build_tool_call_request(tool_name, messages)?;
|
||||
|
||||
// Generate span ID for this MCP tool call (child of filter span)
|
||||
let mcp_span_id = generate_random_span_id();
|
||||
|
||||
// Build headers
|
||||
let agent_headers = self.build_mcp_headers(
|
||||
request_headers,
|
||||
&agent.id,
|
||||
Some(&mcp_session_id),
|
||||
)?;
|
||||
let agent_headers =
|
||||
self.build_mcp_headers(request_headers, &agent.id, Some(&mcp_session_id), trace_id.clone(), mcp_span_id.clone())?;
|
||||
|
||||
// Send request with tracing
|
||||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
|
||||
let response = self.send_mcp_request(&json_rpc_request, agent_headers, &agent.id).await?;
|
||||
let response = self
|
||||
.send_mcp_request(
|
||||
&json_rpc_request,
|
||||
agent_headers,
|
||||
&agent.id,
|
||||
)
|
||||
.await?;
|
||||
let http_status = response.status();
|
||||
let response_bytes = response.bytes().await?;
|
||||
|
||||
|
|
@ -437,6 +480,9 @@ impl PipelineProcessor {
|
|||
end_time,
|
||||
elapsed,
|
||||
Some(attrs),
|
||||
trace_id.clone(),
|
||||
filter_span_id.clone(),
|
||||
Some(mcp_span_id),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -458,7 +504,11 @@ impl PipelineProcessor {
|
|||
});
|
||||
}
|
||||
|
||||
info!("Response from agent {}: {}", agent.id, String::from_utf8_lossy(&response_bytes));
|
||||
info!(
|
||||
"Response from agent {}: {}",
|
||||
agent.id,
|
||||
String::from_utf8_lossy(&response_bytes)
|
||||
);
|
||||
|
||||
// Parse SSE response
|
||||
let data_chunk = self.parse_sse_response(&response_bytes, &agent.id)?;
|
||||
|
|
@ -468,7 +518,11 @@ impl PipelineProcessor {
|
|||
.ok_or_else(|| PipelineError::NoResultInResponse(agent.id.clone()))?;
|
||||
|
||||
// Check if error field is set in response result
|
||||
if response_result.get("isError").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||
if response_result
|
||||
.get("isError")
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let error_message = response_result
|
||||
.get("content")
|
||||
.and_then(|v| v.as_array())
|
||||
|
|
@ -532,7 +586,8 @@ impl PipelineProcessor {
|
|||
&self,
|
||||
agent_id: &str,
|
||||
session_id: &str,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
) -> Result<(), PipelineError> {
|
||||
let initialized_notification = JsonRpcNotification {
|
||||
jsonrpc: JSON_RPC_VERSION.to_string(),
|
||||
|
|
@ -543,10 +598,7 @@ impl PipelineProcessor {
|
|||
let notification_body = serde_json::to_string(&initialized_notification)?;
|
||||
debug!("Sending initialized notification for agent {}", agent_id);
|
||||
|
||||
let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, Some(session_id))?;
|
||||
|
||||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, Some(session_id), trace_id.clone(), parent_span_id.clone())?;
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
|
@ -556,28 +608,10 @@ impl PipelineProcessor {
|
|||
.send()
|
||||
.await?;
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
let elapsed = start_instant.elapsed();
|
||||
|
||||
info!("Initialized notification response status: {}", response.status());
|
||||
|
||||
// Record MCP notification span
|
||||
if let Some(collector) = trace_collector {
|
||||
let mut attrs = HashMap::new();
|
||||
attrs.insert("mcp.method", "notifications/initialized".to_string());
|
||||
attrs.insert("mcp.session_id", session_id.to_string());
|
||||
attrs.insert("http.status_code", response.status().as_u16().to_string());
|
||||
|
||||
self.record_mcp_span(
|
||||
collector,
|
||||
"notification",
|
||||
agent_id,
|
||||
start_time,
|
||||
end_time,
|
||||
elapsed,
|
||||
Some(attrs),
|
||||
);
|
||||
}
|
||||
info!(
|
||||
"Initialized notification response status: {}",
|
||||
response.status()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -585,15 +619,14 @@ impl PipelineProcessor {
|
|||
async fn get_new_session_id(
|
||||
&self,
|
||||
agent_id: &str,
|
||||
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
|
||||
trace_id: String,
|
||||
parent_span_id: String,
|
||||
) -> String {
|
||||
info!("Initializing MCP session for agent {}", agent_id);
|
||||
|
||||
let start_time = SystemTime::now();
|
||||
let start_instant = Instant::now();
|
||||
|
||||
let initialize_request = self.build_initialize_request();
|
||||
let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, None)
|
||||
let headers = self
|
||||
.build_mcp_headers(&HeaderMap::new(), agent_id, None, trace_id.clone(), parent_span_id.clone())
|
||||
.expect("Failed to build headers for initialization");
|
||||
|
||||
let response = self
|
||||
|
|
@ -610,33 +643,20 @@ impl PipelineProcessor {
|
|||
.expect("No mcp-session-id in response")
|
||||
.to_string();
|
||||
|
||||
info!("Created new MCP session for agent {}: {}", agent_id, session_id);
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
let elapsed = start_instant.elapsed();
|
||||
|
||||
// Record MCP session initialization span
|
||||
if let Some(collector) = trace_collector {
|
||||
let mut attrs = HashMap::new();
|
||||
attrs.insert("mcp.method", "initialize".to_string());
|
||||
attrs.insert("mcp.session_id", session_id.clone());
|
||||
attrs.insert("mcp.protocol_version", "2024-11-05".to_string());
|
||||
|
||||
self.record_mcp_span(
|
||||
collector,
|
||||
"session_init",
|
||||
agent_id,
|
||||
start_time,
|
||||
end_time,
|
||||
elapsed,
|
||||
Some(attrs),
|
||||
);
|
||||
}
|
||||
info!(
|
||||
"Created new MCP session for agent {}: {}",
|
||||
agent_id, session_id
|
||||
);
|
||||
|
||||
// Send initialized notification
|
||||
self.send_initialized_notification(agent_id, &session_id, trace_collector)
|
||||
.await
|
||||
.expect("Failed to send initialized notification");
|
||||
self.send_initialized_notification(
|
||||
agent_id,
|
||||
&session_id,
|
||||
trace_id.clone(),
|
||||
parent_span_id.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("Failed to send initialized notification");
|
||||
|
||||
session_id
|
||||
}
|
||||
|
|
@ -648,6 +668,8 @@ impl PipelineProcessor {
|
|||
mut original_request: ProviderRequestType,
|
||||
terminal_agent: &Agent,
|
||||
request_headers: &HeaderMap,
|
||||
trace_id: String,
|
||||
agent_span_id: String,
|
||||
) -> Result<reqwest::Response, PipelineError> {
|
||||
// let mut request = original_request.clone();
|
||||
original_request.set_messages(messages);
|
||||
|
|
@ -658,6 +680,17 @@ impl PipelineProcessor {
|
|||
|
||||
let mut agent_headers = request_headers.clone();
|
||||
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
// Set traceparent header to make the egress span a child of the agent span
|
||||
if !trace_id.is_empty() && !agent_span_id.is_empty() {
|
||||
let trace_parent = format!("00-{}-{}-01", trace_id, agent_span_id);
|
||||
agent_headers.remove(TRACE_PARENT_HEADER);
|
||||
agent_headers.insert(
|
||||
TRACE_PARENT_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
agent_headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&terminal_agent.id)
|
||||
|
|
@ -718,13 +751,7 @@ mod tests {
|
|||
let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]);
|
||||
|
||||
let result = processor
|
||||
.process_filter_chain(
|
||||
&messages,
|
||||
&pipeline,
|
||||
&agent_map,
|
||||
&request_headers,
|
||||
None,
|
||||
)
|
||||
.process_filter_chain(&messages, &pipeline, &agent_map, &request_headers, None, String::new(), String::new())
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
|
|
@ -758,7 +785,7 @@ mod tests {
|
|||
let request_headers = HeaderMap::new();
|
||||
|
||||
let result = processor
|
||||
.execute_filter(&messages, &agent, &request_headers, None)
|
||||
.execute_filter(&messages, &agent, &request_headers, None, "trace-123".to_string(), "span-123".to_string())
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
|
@ -797,7 +824,7 @@ mod tests {
|
|||
let request_headers = HeaderMap::new();
|
||||
|
||||
let result = processor
|
||||
.execute_filter(&messages, &agent, &request_headers, None)
|
||||
.execute_filter(&messages, &agent, &request_headers, None, "trace-456".to_string(), "span-456".to_string())
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
|
@ -849,7 +876,7 @@ mod tests {
|
|||
let request_headers = HeaderMap::new();
|
||||
|
||||
let result = processor
|
||||
.execute_filter(&messages, &agent, &request_headers, None)
|
||||
.execute_filter(&messages, &agent, &request_headers, None, "trace-789".to_string(), "span-789".to_string())
|
||||
.await;
|
||||
|
||||
match result {
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ pub mod operation_component {
|
|||
pub const HANDOFF: &str = "plano(handoff)";
|
||||
|
||||
/// Agent filter execution
|
||||
pub const AGENT_FILTER: &str = "plano(agent filter)";
|
||||
pub const AGENT_FILTER: &str = "plano(filter)";
|
||||
|
||||
/// Agent execution
|
||||
pub const AGENT: &str = "plano(agent)";
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ pub use shapes::{
|
|||
};
|
||||
|
||||
// Re-export new utilities
|
||||
pub use span_builder::{SpanBuilder, SpanKind};
|
||||
pub use span_builder::{SpanBuilder, SpanKind, generate_random_span_id};
|
||||
pub use resource_span_builder::ResourceSpanBuilder;
|
||||
pub use constants::*;
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ pub struct SpanBuilder {
|
|||
end_time: Option<SystemTime>,
|
||||
kind: SpanKind,
|
||||
attributes: HashMap<String, String>,
|
||||
span_id: Option<String>,
|
||||
}
|
||||
|
||||
impl SpanBuilder {
|
||||
|
|
@ -53,6 +54,7 @@ impl SpanBuilder {
|
|||
end_time: None,
|
||||
kind: SpanKind::Internal,
|
||||
attributes: HashMap::new(),
|
||||
span_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -62,6 +64,11 @@ impl SpanBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
|
||||
self.span_id = Some(span_id.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the parent span ID to link this span to its parent
|
||||
pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
|
||||
self.parent_span_id = Some(parent_span_id.into());
|
||||
|
|
@ -125,7 +132,7 @@ impl SpanBuilder {
|
|||
// Build span directly without going through Span::new()
|
||||
Span {
|
||||
trace_id,
|
||||
span_id: generate_random_span_id(),
|
||||
span_id: self.span_id.unwrap_or_else(|| generate_random_span_id()),
|
||||
parent_span_id: self.parent_span_id,
|
||||
name: self.name,
|
||||
start_time_unix_nano: format!("{}", start_nanos),
|
||||
|
|
@ -145,7 +152,7 @@ fn system_time_to_nanos(time: SystemTime) -> u128 {
|
|||
}
|
||||
|
||||
/// Generate a random span ID (16 hex characters = 8 bytes)
|
||||
fn generate_random_span_id() -> String {
|
||||
pub fn generate_random_span_id() -> String {
|
||||
use rand::RngCore;
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut random_bytes = [0u8; 8];
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue