ensure that request id is consistent

This commit is contained in:
Adil Hafeez 2026-01-06 14:36:51 -08:00
parent 745b36fdef
commit a3c80e8e90
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
9 changed files with 101 additions and 28 deletions

View file

@ -49,6 +49,6 @@ COPY config/supervisord.conf /etc/supervisor/conf.d/supervisord.conf
RUN mkdir -p /var/log/supervisor && touch /var/log/envoy.log /var/log/supervisor/supervisord.log RUN mkdir -p /var/log/supervisor && touch /var/log/envoy.log /var/log/supervisor/supervisord.log
RUN mkdir -p /var/log && \ RUN mkdir -p /var/log && \
touch /var/log/access_ingress.log /var/log/access_ingress_prompt.log /var/log/access_internal.log /var/log/access_llm.log touch /var/log/access_ingress.log /var/log/access_ingress_prompt.log /var/log/access_internal.log /var/log/access_llm.log /var/log/access_agent.log
ENTRYPOINT ["sh","-c", "/usr/bin/supervisord"] ENTRYPOINT ["sh","-c", "/usr/bin/supervisord"]

View file

@ -311,7 +311,7 @@ static_resources:
- name: envoy.access_loggers.file - name: envoy.access_loggers.file
typed_config: typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/access_llm.log" path: "/var/log/access_agent.log"
route_config: route_config:
name: local_routes name: local_routes
request_headers_to_add: request_headers_to_add:
@ -483,13 +483,13 @@ static_resources:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: otel_proxy stat_prefix: otel_proxy
codec_type: AUTO codec_type: AUTO
access_log: # access_log:
- name: envoy.access_loggers.file # - name: envoy.access_loggers.file
typed_config: # typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog # "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/access_otel.log" # path: "/var/log/access_otel.log"
format: | # format: |
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%" # [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
route_config: route_config:
name: otel_route name: otel_route
virtual_hosts: virtual_hosts:

View file

@ -157,7 +157,33 @@ async fn handle_agent_chat(
.strip_prefix("/agents") .strip_prefix("/agents")
.unwrap() .unwrap()
.to_string(); .to_string();
let request_headers = request.headers().clone();
let (request_headers, request_id) = {
let mut headers = request.headers().clone();
headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER);
if !headers.contains_key(common::consts::REQUEST_ID_HEADER) {
let request_id = uuid::Uuid::new_v4().to_string();
info!(
"Request id not found in headers, generated new request id: {}",
request_id
);
headers.insert(
common::consts::REQUEST_ID_HEADER,
hyper::header::HeaderValue::from_str(&request_id).unwrap(),
);
}
let request_id = headers
.get(common::consts::REQUEST_ID_HEADER)
.and_then(|v| v.to_str().ok())
.unwrap()
.to_string();
(headers, request_id)
};
info!("Processing request with Request ID: {}", request_id);
let chat_request_bytes = request.collect().await?.to_bytes(); let chat_request_bytes = request.collect().await?.to_bytes();
debug!( debug!(

View file

@ -386,7 +386,7 @@ impl PipelineProcessor {
async fn send_mcp_request( async fn send_mcp_request(
&self, &self,
json_rpc_request: &JsonRpcRequest, json_rpc_request: &JsonRpcRequest,
headers: HeaderMap, headers: &HeaderMap,
agent_id: &str, agent_id: &str,
) -> Result<reqwest::Response, PipelineError> { ) -> Result<reqwest::Response, PipelineError> {
let request_body = serde_json::to_string(json_rpc_request)?; let request_body = serde_json::to_string(json_rpc_request)?;
@ -399,7 +399,7 @@ impl PipelineProcessor {
let response = self let response = self
.client .client
.post(format!("{}/mcp", self.url)) .post(format!("{}/mcp", self.url))
.headers(headers) .headers(headers.clone())
.body(request_body) .body(request_body)
.send() .send()
.await?; .await?;
@ -443,7 +443,12 @@ impl PipelineProcessor {
session_id.clone() session_id.clone()
} else { } else {
let session_id = self let session_id = self
.get_new_session_id(&agent.id, trace_id.clone(), filter_span_id.clone()) .get_new_session_id(
&agent.id,
trace_id.clone(),
filter_span_id.clone(),
request_headers,
)
.await; .await;
self.agent_id_session_map self.agent_id_session_map
.insert(agent.id.clone(), session_id.clone()); .insert(agent.id.clone(), session_id.clone());
@ -476,7 +481,7 @@ impl PipelineProcessor {
let start_instant = Instant::now(); let start_instant = Instant::now();
let response = self let response = self
.send_mcp_request(&json_rpc_request, agent_headers, &agent.id) .send_mcp_request(&json_rpc_request, &agent_headers, &agent.id)
.await?; .await?;
let http_status = response.status(); let http_status = response.status();
let response_bytes = response.bytes().await?; let response_bytes = response.bytes().await?;
@ -608,6 +613,7 @@ impl PipelineProcessor {
session_id: &str, session_id: &str,
trace_id: String, trace_id: String,
parent_span_id: String, parent_span_id: String,
request_headers: &HeaderMap,
) -> Result<(), PipelineError> { ) -> Result<(), PipelineError> {
let initialized_notification = JsonRpcNotification { let initialized_notification = JsonRpcNotification {
jsonrpc: JSON_RPC_VERSION.to_string(), jsonrpc: JSON_RPC_VERSION.to_string(),
@ -619,7 +625,7 @@ impl PipelineProcessor {
debug!("Sending initialized notification for agent {}", agent_id); debug!("Sending initialized notification for agent {}", agent_id);
let headers = self.build_mcp_headers( let headers = self.build_mcp_headers(
&HeaderMap::new(), request_headers,
agent_id, agent_id,
Some(session_id), Some(session_id),
trace_id.clone(), trace_id.clone(),
@ -647,13 +653,14 @@ impl PipelineProcessor {
agent_id: &str, agent_id: &str,
trace_id: String, trace_id: String,
parent_span_id: String, parent_span_id: String,
request_headers: &HeaderMap,
) -> String { ) -> String {
info!("Initializing MCP session for agent {}", agent_id); info!("Initializing MCP session for agent {}", agent_id);
let initialize_request = self.build_initialize_request(); let initialize_request = self.build_initialize_request();
let headers = self let headers = self
.build_mcp_headers( .build_mcp_headers(
&HeaderMap::new(), request_headers,
agent_id, agent_id,
None, None,
trace_id.clone(), trace_id.clone(),
@ -661,8 +668,10 @@ impl PipelineProcessor {
) )
.expect("Failed to build headers for initialization"); .expect("Failed to build headers for initialization");
info!("Initialize request headers: {:?}", headers);
let response = self let response = self
.send_mcp_request(&initialize_request, headers, agent_id) .send_mcp_request(&initialize_request, &headers, agent_id)
.await .await
.expect("Failed to initialize MCP session"); .expect("Failed to initialize MCP session");
@ -686,6 +695,7 @@ impl PipelineProcessor {
&session_id, &session_id,
trace_id.clone(), trace_id.clone(),
parent_span_id.clone(), parent_span_id.clone(),
&headers,
) )
.await .await
.expect("Failed to send initialized notification"); .expect("Failed to send initialized notification");

View file

@ -22,6 +22,7 @@ pub const X_ARCH_TOOL_CALL: &str = "x-arch-tool-call-message";
pub const X_ARCH_FC_MODEL_RESPONSE: &str = "x-arch-fc-model-response"; pub const X_ARCH_FC_MODEL_RESPONSE: &str = "x-arch-fc-model-response";
pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function"; pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function";
pub const REQUEST_ID_HEADER: &str = "x-request-id"; pub const REQUEST_ID_HEADER: &str = "x-request-id";
pub const ENVOY_ORIGINAL_PATH_HEADER: &str = "x-envoy-original-path";
pub const TRACE_PARENT_HEADER: &str = "traceparent"; pub const TRACE_PARENT_HEADER: &str = "traceparent";
pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal"; pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal";
pub const ARCH_UPSTREAM_HOST_HEADER: &str = "x-arch-upstream"; pub const ARCH_UPSTREAM_HOST_HEADER: &str = "x-arch-upstream";

View file

@ -57,7 +57,10 @@ def load_knowledge_base():
async def find_relevant_passages( async def find_relevant_passages(
query: str, traceparent: Optional[str] = None, top_k: int = 3 query: str,
traceparent: Optional[str] = None,
request_id: Optional[str] = None,
top_k: int = 3,
) -> List[Dict[str, str]]: ) -> List[Dict[str, str]]:
"""Use the LLM to find the most relevant passages from the knowledge base.""" """Use the LLM to find the most relevant passages from the knowledge base."""
@ -92,7 +95,11 @@ async def find_relevant_passages(
logger.info(f"Calling archgw to find relevant passages for query: '{query}'") logger.info(f"Calling archgw to find relevant passages for query: '{query}'")
# Prepare extra headers if traceparent is provided # Prepare extra headers if traceparent is provided
extra_headers = {"x-envoy-max-retries": "3"} extra_headers = {
"x-envoy-max-retries": "3",
}
if request_id:
extra_headers["x-request-id"] = request_id
if traceparent: if traceparent:
extra_headers["traceparent"] = traceparent extra_headers["traceparent"] = traceparent
@ -129,7 +136,9 @@ async def find_relevant_passages(
async def augment_query_with_context( async def augment_query_with_context(
messages: List[ChatMessage], traceparent: Optional[str] = None messages: List[ChatMessage],
traceparent: Optional[str] = None,
request_id: Optional[str] = None,
) -> List[ChatMessage]: ) -> List[ChatMessage]:
"""Extract user query, find relevant context, and augment the messages.""" """Extract user query, find relevant context, and augment the messages."""
@ -150,7 +159,9 @@ async def augment_query_with_context(
logger.info(f"Processing user query: '{last_user_message}'") logger.info(f"Processing user query: '{last_user_message}'")
# Find relevant passages # Find relevant passages
relevant_passages = await find_relevant_passages(last_user_message, traceparent) relevant_passages = await find_relevant_passages(
last_user_message, traceparent, request_id
)
if not relevant_passages: if not relevant_passages:
logger.info("No relevant passages found, returning original messages") logger.info("No relevant passages found, returning original messages")
@ -191,6 +202,8 @@ async def context_builder(messages: List[ChatMessage]) -> List[ChatMessage]:
# Get traceparent header from MCP request # Get traceparent header from MCP request
headers = get_http_headers() headers = get_http_headers()
traceparent_header = headers.get("traceparent") traceparent_header = headers.get("traceparent")
request_id = headers.get("x-request-id")
logger.info(f"Received request ID: {request_id}")
if traceparent_header: if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}") logger.info(f"Received traceparent header: {traceparent_header}")
@ -198,7 +211,9 @@ async def context_builder(messages: List[ChatMessage]) -> List[ChatMessage]:
logger.info("No traceparent header found") logger.info("No traceparent header found")
# Augment the user query with relevant context # Augment the user query with relevant context
updated_messages = await augment_query_with_context(messages, traceparent_header) updated_messages = await augment_query_with_context(
messages, traceparent_header, request_id
)
# Return as dict to minimize text serialization # Return as dict to minimize text serialization
return [{"role": msg.role, "content": msg.content} for msg in updated_messages] return [{"role": msg.role, "content": msg.content} for msg in updated_messages]

View file

@ -34,7 +34,9 @@ app = FastAPI()
async def validate_query_scope( async def validate_query_scope(
messages: List[ChatMessage], traceparent_header: str messages: List[ChatMessage],
traceparent_header: str,
request_id: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Validate that the user query is within TechCorp's domain. """Validate that the user query is within TechCorp's domain.
@ -94,6 +96,9 @@ Respond in JSON format:
if traceparent_header: if traceparent_header:
extra_headers["traceparent"] = traceparent_header extra_headers["traceparent"] = traceparent_header
if request_id:
extra_headers["x-request-id"] = request_id
logger.info(f"Validating query scope: '{last_user_message}'") logger.info(f"Validating query scope: '{last_user_message}'")
response = await archgw_client.chat.completions.create( response = await archgw_client.chat.completions.create(
model=GUARD_MODEL, model=GUARD_MODEL,
@ -132,6 +137,7 @@ async def input_guards(messages: List[ChatMessage]) -> List[ChatMessage]:
# Get traceparent header from HTTP request using FastMCP's dependency function # Get traceparent header from HTTP request using FastMCP's dependency function
headers = get_http_headers() headers = get_http_headers()
traceparent_header = headers.get("traceparent") traceparent_header = headers.get("traceparent")
request_id = headers.get("x-request-id")
if traceparent_header: if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}") logger.info(f"Received traceparent header: {traceparent_header}")
@ -139,7 +145,9 @@ async def input_guards(messages: List[ChatMessage]) -> List[ChatMessage]:
logger.info("No traceparent header found") logger.info("No traceparent header found")
# Validate the query scope # Validate the query scope
validation_result = await validate_query_scope(messages, traceparent_header) validation_result = await validate_query_scope(
messages, traceparent_header, request_id
)
if not validation_result.get("is_valid", True): if not validation_result.get("is_valid", True):
reason = validation_result.get("reason", "Query is outside TechCorp's domain") reason = validation_result.get("reason", "Query is outside TechCorp's domain")

View file

@ -33,7 +33,9 @@ app = FastAPI()
async def rewrite_query_with_archgw( async def rewrite_query_with_archgw(
messages: List[ChatMessage], traceparent_header: str messages: List[ChatMessage],
traceparent_header: str,
request_id: Optional[str] = None,
) -> str: ) -> str:
"""Rewrite the user query using LLM for better retrieval.""" """Rewrite the user query using LLM for better retrieval."""
system_prompt = """You are a query rewriter that improves user queries for better retrieval. system_prompt = """You are a query rewriter that improves user queries for better retrieval.
@ -59,6 +61,8 @@ async def rewrite_query_with_archgw(
extra_headers = {"x-envoy-max-retries": "3"} extra_headers = {"x-envoy-max-retries": "3"}
if traceparent_header: if traceparent_header:
extra_headers["traceparent"] = traceparent_header extra_headers["traceparent"] = traceparent_header
if request_id:
extra_headers["x-request-id"] = request_id
logger.info(f"Calling archgw at {LLM_GATEWAY_ENDPOINT} to rewrite query") logger.info(f"Calling archgw at {LLM_GATEWAY_ENDPOINT} to rewrite query")
response = await archgw_client.chat.completions.create( response = await archgw_client.chat.completions.create(
model=QUERY_REWRITE_MODEL, model=QUERY_REWRITE_MODEL,
@ -93,6 +97,7 @@ async def query_rewriter(messages: List[ChatMessage]) -> List[ChatMessage]:
# Get traceparent header from HTTP request using FastMCP's dependency function # Get traceparent header from HTTP request using FastMCP's dependency function
headers = get_http_headers() headers = get_http_headers()
traceparent_header = headers.get("traceparent") traceparent_header = headers.get("traceparent")
request_id = headers.get("x-request-id")
if traceparent_header: if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}") logger.info(f"Received traceparent header: {traceparent_header}")
@ -100,7 +105,9 @@ async def query_rewriter(messages: List[ChatMessage]) -> List[ChatMessage]:
logger.info("No traceparent header found") logger.info("No traceparent header found")
# Call archgw to rewrite the last user query # Call archgw to rewrite the last user query
rewritten_query = await rewrite_query_with_archgw(messages, traceparent_header) rewritten_query = await rewrite_query_with_archgw(
messages, traceparent_header, request_id
)
# Create updated messages with the rewritten query # Create updated messages with the rewritten query
updated_messages = messages.copy() updated_messages = messages.copy()

View file

@ -68,6 +68,7 @@ async def chat_completion_http(request: Request, request_body: ChatCompletionReq
# Get traceparent header from HTTP request # Get traceparent header from HTTP request
traceparent_header = request.headers.get("traceparent") traceparent_header = request.headers.get("traceparent")
request_id = request.headers.get("x-request-id")
if traceparent_header: if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}") logger.info(f"Received traceparent header: {traceparent_header}")
@ -75,7 +76,7 @@ async def chat_completion_http(request: Request, request_body: ChatCompletionReq
logger.info("No traceparent header found") logger.info("No traceparent header found")
return StreamingResponse( return StreamingResponse(
stream_chat_completions(request_body, traceparent_header), stream_chat_completions(request_body, traceparent_header, request_id),
media_type="text/plain", media_type="text/plain",
headers={ headers={
"content-type": "text/event-stream", "content-type": "text/event-stream",
@ -84,7 +85,9 @@ async def chat_completion_http(request: Request, request_body: ChatCompletionReq
async def stream_chat_completions( async def stream_chat_completions(
request_body: ChatCompletionRequest, traceparent_header: str = None request_body: ChatCompletionRequest,
traceparent_header: str = None,
request_id: str = None,
): ):
"""Generate streaming chat completions.""" """Generate streaming chat completions."""
# Prepare messages for response generation # Prepare messages for response generation
@ -96,8 +99,11 @@ async def stream_chat_completions(
f"Calling archgw at {LLM_GATEWAY_ENDPOINT} to generate streaming response" f"Calling archgw at {LLM_GATEWAY_ENDPOINT} to generate streaming response"
) )
logger.info(f"rag_agent - request_id: {request_id}")
# Prepare extra headers if traceparent is provided # Prepare extra headers if traceparent is provided
extra_headers = {"x-envoy-max-retries": "3"} extra_headers = {"x-envoy-max-retries": "3"}
if request_id:
extra_headers["x-request-id"] = request_id
if traceparent_header: if traceparent_header:
extra_headers["traceparent"] = traceparent_header extra_headers["traceparent"] = traceparent_header