diff --git a/Dockerfile b/Dockerfile index a61cc825..8a0ab81e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -49,6 +49,6 @@ COPY config/supervisord.conf /etc/supervisor/conf.d/supervisord.conf RUN mkdir -p /var/log/supervisor && touch /var/log/envoy.log /var/log/supervisor/supervisord.log RUN mkdir -p /var/log && \ - touch /var/log/access_ingress.log /var/log/access_ingress_prompt.log /var/log/access_internal.log /var/log/access_llm.log + touch /var/log/access_ingress.log /var/log/access_ingress_prompt.log /var/log/access_internal.log /var/log/access_llm.log /var/log/access_agent.log ENTRYPOINT ["sh","-c", "/usr/bin/supervisord"] diff --git a/config/envoy.template.yaml b/config/envoy.template.yaml index 4ead29c2..00006444 100644 --- a/config/envoy.template.yaml +++ b/config/envoy.template.yaml @@ -311,7 +311,7 @@ static_resources: - name: envoy.access_loggers.file typed_config: "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - path: "/var/log/access_llm.log" + path: "/var/log/access_agent.log" route_config: name: local_routes request_headers_to_add: @@ -483,13 +483,13 @@ static_resources: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager stat_prefix: otel_proxy codec_type: AUTO - access_log: - - name: envoy.access_loggers.file - typed_config: - "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - path: "/var/log/access_otel.log" - format: | - [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%" + # access_log: + # - name: envoy.access_loggers.file + # typed_config: + # "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + # path: "/var/log/access_otel.log" + # format: | + # [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%" route_config: name: otel_route virtual_hosts: diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index df358190..46e9ead1 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -157,7 +157,26 @@ async fn handle_agent_chat( .strip_prefix("/agents") .unwrap() .to_string(); - let request_headers = request.headers().clone(); + + let request_headers = { + let mut headers = request.headers().clone(); + headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER); + + if !headers.contains_key(common::consts::REQUEST_ID_HEADER) { + let request_id = uuid::Uuid::new_v4().to_string(); + info!( + "Request id not found in headers, generated new request id: {}", + request_id + ); + headers.insert( + common::consts::REQUEST_ID_HEADER, + hyper::header::HeaderValue::from_str(&request_id).unwrap(), + ); + } + + headers + }; + let chat_request_bytes = request.collect().await?.to_bytes(); debug!( diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index 2c1d9859..8b9bf21a 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -386,7 +386,7 @@ impl PipelineProcessor { async fn send_mcp_request( &self, json_rpc_request: &JsonRpcRequest, - headers: HeaderMap, + headers: &HeaderMap, agent_id: &str, ) -> Result { let request_body = serde_json::to_string(json_rpc_request)?; @@ -399,7 +399,7 @@ impl PipelineProcessor { let response = self .client .post(format!("{}/mcp", self.url)) - .headers(headers) + .headers(headers.clone()) .body(request_body) .send() .await?; @@ -443,7 +443,12 @@ impl PipelineProcessor { session_id.clone() } else { let session_id = self - .get_new_session_id(&agent.id, trace_id.clone(), filter_span_id.clone()) + .get_new_session_id( + &agent.id, + trace_id.clone(), + filter_span_id.clone(), + request_headers, + ) .await; self.agent_id_session_map .insert(agent.id.clone(), session_id.clone()); @@ -476,7 +481,7 @@ impl PipelineProcessor { let start_instant = Instant::now(); let response = self - .send_mcp_request(&json_rpc_request, agent_headers, &agent.id) + .send_mcp_request(&json_rpc_request, &agent_headers, &agent.id) .await?; let http_status = response.status(); let response_bytes = response.bytes().await?; @@ -608,6 +613,7 @@ impl PipelineProcessor { session_id: &str, trace_id: String, parent_span_id: String, + request_headers: &HeaderMap, ) -> Result<(), PipelineError> { let initialized_notification = JsonRpcNotification { jsonrpc: JSON_RPC_VERSION.to_string(), @@ -619,7 +625,7 @@ impl PipelineProcessor { debug!("Sending initialized notification for agent {}", agent_id); let headers = self.build_mcp_headers( - &HeaderMap::new(), + request_headers, agent_id, Some(session_id), trace_id.clone(), @@ -647,13 +653,14 @@ impl PipelineProcessor { agent_id: &str, trace_id: String, parent_span_id: String, + request_headers: &HeaderMap, ) -> String { info!("Initializing MCP session for agent {}", agent_id); let initialize_request = self.build_initialize_request(); let headers = self .build_mcp_headers( - &HeaderMap::new(), + request_headers, agent_id, None, trace_id.clone(), @@ -662,7 +669,7 @@ impl PipelineProcessor { .expect("Failed to build headers for initialization"); let response = self - .send_mcp_request(&initialize_request, headers, agent_id) + .send_mcp_request(&initialize_request, &headers, agent_id) .await .expect("Failed to initialize MCP session"); @@ -686,6 +693,7 @@ impl PipelineProcessor { &session_id, trace_id.clone(), parent_span_id.clone(), + &headers, ) .await .expect("Failed to send initialized notification"); diff --git a/crates/common/src/consts.rs b/crates/common/src/consts.rs index e1b5dc1a..23260747 100644 --- a/crates/common/src/consts.rs +++ b/crates/common/src/consts.rs @@ -22,6 +22,7 @@ pub const X_ARCH_TOOL_CALL: &str = "x-arch-tool-call-message"; pub const X_ARCH_FC_MODEL_RESPONSE: &str = "x-arch-fc-model-response"; pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function"; pub const REQUEST_ID_HEADER: &str = "x-request-id"; +pub const ENVOY_ORIGINAL_PATH_HEADER: &str = "x-envoy-original-path"; pub const TRACE_PARENT_HEADER: &str = "traceparent"; pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal"; pub const ARCH_UPSTREAM_HOST_HEADER: &str = "x-arch-upstream"; diff --git a/demos/use_cases/mcp_filter/src/rag_agent/context_builder.py b/demos/use_cases/mcp_filter/src/rag_agent/context_builder.py index 5512fcc0..bc1438d2 100644 --- a/demos/use_cases/mcp_filter/src/rag_agent/context_builder.py +++ b/demos/use_cases/mcp_filter/src/rag_agent/context_builder.py @@ -57,7 +57,10 @@ def load_knowledge_base(): async def find_relevant_passages( - query: str, traceparent: Optional[str] = None, top_k: int = 3 + query: str, + traceparent: Optional[str] = None, + request_id: Optional[str] = None, + top_k: int = 3, ) -> List[Dict[str, str]]: """Use the LLM to find the most relevant passages from the knowledge base.""" @@ -92,7 +95,11 @@ async def find_relevant_passages( logger.info(f"Calling archgw to find relevant passages for query: '{query}'") # Prepare extra headers if traceparent is provided - extra_headers = {"x-envoy-max-retries": "3"} + extra_headers = { + "x-envoy-max-retries": "3", + } + if request_id: + extra_headers["x-request-id"] = request_id if traceparent: extra_headers["traceparent"] = traceparent @@ -129,7 +136,9 @@ async def find_relevant_passages( async def augment_query_with_context( - messages: List[ChatMessage], traceparent: Optional[str] = None + messages: List[ChatMessage], + traceparent: Optional[str] = None, + request_id: Optional[str] = None, ) -> List[ChatMessage]: """Extract user query, find relevant context, and augment the messages.""" @@ -150,7 +159,9 @@ async def augment_query_with_context( logger.info(f"Processing user query: '{last_user_message}'") # Find relevant passages - relevant_passages = await find_relevant_passages(last_user_message, traceparent) + relevant_passages = await find_relevant_passages( + last_user_message, traceparent, request_id + ) if not relevant_passages: logger.info("No relevant passages found, returning original messages") @@ -191,6 +202,8 @@ async def context_builder(messages: List[ChatMessage]) -> List[ChatMessage]: # Get traceparent header from MCP request headers = get_http_headers() traceparent_header = headers.get("traceparent") + request_id = headers.get("x-request-id") + logger.info(f"Received request ID: {request_id}") if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") @@ -198,7 +211,9 @@ async def context_builder(messages: List[ChatMessage]) -> List[ChatMessage]: logger.info("No traceparent header found") # Augment the user query with relevant context - updated_messages = await augment_query_with_context(messages, traceparent_header) + updated_messages = await augment_query_with_context( + messages, traceparent_header, request_id + ) # Return as dict to minimize text serialization return [{"role": msg.role, "content": msg.content} for msg in updated_messages] diff --git a/demos/use_cases/mcp_filter/src/rag_agent/input_guards.py b/demos/use_cases/mcp_filter/src/rag_agent/input_guards.py index 633ab5d0..fce2c470 100644 --- a/demos/use_cases/mcp_filter/src/rag_agent/input_guards.py +++ b/demos/use_cases/mcp_filter/src/rag_agent/input_guards.py @@ -34,7 +34,9 @@ app = FastAPI() async def validate_query_scope( - messages: List[ChatMessage], traceparent_header: str + messages: List[ChatMessage], + traceparent_header: str, + request_id: Optional[str] = None, ) -> Dict[str, Any]: """Validate that the user query is within TechCorp's domain. @@ -94,6 +96,9 @@ Respond in JSON format: if traceparent_header: extra_headers["traceparent"] = traceparent_header + if request_id: + extra_headers["x-request-id"] = request_id + logger.info(f"Validating query scope: '{last_user_message}'") response = await archgw_client.chat.completions.create( model=GUARD_MODEL, @@ -132,6 +137,7 @@ async def input_guards(messages: List[ChatMessage]) -> List[ChatMessage]: # Get traceparent header from HTTP request using FastMCP's dependency function headers = get_http_headers() traceparent_header = headers.get("traceparent") + request_id = headers.get("x-request-id") if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") @@ -139,7 +145,9 @@ async def input_guards(messages: List[ChatMessage]) -> List[ChatMessage]: logger.info("No traceparent header found") # Validate the query scope - validation_result = await validate_query_scope(messages, traceparent_header) + validation_result = await validate_query_scope( + messages, traceparent_header, request_id + ) if not validation_result.get("is_valid", True): reason = validation_result.get("reason", "Query is outside TechCorp's domain") diff --git a/demos/use_cases/mcp_filter/src/rag_agent/query_rewriter.py b/demos/use_cases/mcp_filter/src/rag_agent/query_rewriter.py index 81362e1f..050b49de 100644 --- a/demos/use_cases/mcp_filter/src/rag_agent/query_rewriter.py +++ b/demos/use_cases/mcp_filter/src/rag_agent/query_rewriter.py @@ -33,7 +33,9 @@ app = FastAPI() async def rewrite_query_with_archgw( - messages: List[ChatMessage], traceparent_header: str + messages: List[ChatMessage], + traceparent_header: str, + request_id: Optional[str] = None, ) -> str: """Rewrite the user query using LLM for better retrieval.""" system_prompt = """You are a query rewriter that improves user queries for better retrieval. @@ -59,6 +61,8 @@ async def rewrite_query_with_archgw( extra_headers = {"x-envoy-max-retries": "3"} if traceparent_header: extra_headers["traceparent"] = traceparent_header + if request_id: + extra_headers["x-request-id"] = request_id logger.info(f"Calling archgw at {LLM_GATEWAY_ENDPOINT} to rewrite query") response = await archgw_client.chat.completions.create( model=QUERY_REWRITE_MODEL, @@ -93,6 +97,7 @@ async def query_rewriter(messages: List[ChatMessage]) -> List[ChatMessage]: # Get traceparent header from HTTP request using FastMCP's dependency function headers = get_http_headers() traceparent_header = headers.get("traceparent") + request_id = headers.get("x-request-id") if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") @@ -100,7 +105,9 @@ async def query_rewriter(messages: List[ChatMessage]) -> List[ChatMessage]: logger.info("No traceparent header found") # Call archgw to rewrite the last user query - rewritten_query = await rewrite_query_with_archgw(messages, traceparent_header) + rewritten_query = await rewrite_query_with_archgw( + messages, traceparent_header, request_id + ) # Create updated messages with the rewritten query updated_messages = messages.copy() diff --git a/demos/use_cases/mcp_filter/src/rag_agent/rag_agent.py b/demos/use_cases/mcp_filter/src/rag_agent/rag_agent.py index db7270f0..1a387ab7 100644 --- a/demos/use_cases/mcp_filter/src/rag_agent/rag_agent.py +++ b/demos/use_cases/mcp_filter/src/rag_agent/rag_agent.py @@ -68,6 +68,7 @@ async def chat_completion_http(request: Request, request_body: ChatCompletionReq # Get traceparent header from HTTP request traceparent_header = request.headers.get("traceparent") + request_id = request.headers.get("x-request-id") if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") @@ -75,7 +76,7 @@ async def chat_completion_http(request: Request, request_body: ChatCompletionReq logger.info("No traceparent header found") return StreamingResponse( - stream_chat_completions(request_body, traceparent_header), + stream_chat_completions(request_body, traceparent_header, request_id), media_type="text/plain", headers={ "content-type": "text/event-stream", @@ -84,7 +85,9 @@ async def chat_completion_http(request: Request, request_body: ChatCompletionReq async def stream_chat_completions( - request_body: ChatCompletionRequest, traceparent_header: str = None + request_body: ChatCompletionRequest, + traceparent_header: str = None, + request_id: str = None, ): """Generate streaming chat completions.""" # Prepare messages for response generation @@ -96,8 +99,11 @@ async def stream_chat_completions( f"Calling archgw at {LLM_GATEWAY_ENDPOINT} to generate streaming response" ) + logger.info(f"rag_agent - request_id: {request_id}") # Prepare extra headers if traceparent is provided extra_headers = {"x-envoy-max-retries": "3"} + if request_id: + extra_headers["x-request-id"] = request_id if traceparent_header: extra_headers["traceparent"] = traceparent_header