mirror of
https://github.com/katanemo/plano.git
synced 2026-06-26 15:39:40 +02:00
fix header injector code
This commit is contained in:
parent
75d15301ad
commit
9bbce61e70
3 changed files with 10 additions and 50 deletions
|
|
@ -11,8 +11,9 @@ use http_body_util::combinators::BoxBody;
|
||||||
use http_body_util::{BodyExt, Full};
|
use http_body_util::{BodyExt, Full};
|
||||||
use hyper::header::{self};
|
use hyper::header::{self};
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
|
use opentelemetry::global;
|
||||||
use opentelemetry::trace::get_active_span;
|
use opentelemetry::trace::get_active_span;
|
||||||
use opentelemetry::{global, propagation::Injector};
|
use opentelemetry_http::HeaderInjector;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
@ -35,19 +36,6 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap
|
|
||||||
struct HeaderMapInjector<'a>(&'a mut header::HeaderMap);
|
|
||||||
|
|
||||||
impl<'a> Injector for HeaderMapInjector<'a> {
|
|
||||||
fn set(&mut self, key: &str, value: String) {
|
|
||||||
if let Ok(name) = header::HeaderName::from_bytes(key.as_bytes()) {
|
|
||||||
if let Ok(val) = header::HeaderValue::from_str(&value) {
|
|
||||||
self.0.insert(name, val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn llm_chat(
|
pub async fn llm_chat(
|
||||||
request: Request<hyper::body::Incoming>,
|
request: Request<hyper::body::Incoming>,
|
||||||
router_service: Arc<RouterService>,
|
router_service: Arc<RouterService>,
|
||||||
|
|
@ -368,7 +356,7 @@ async fn llm_chat_inner(
|
||||||
// Inject current LLM span's trace context so upstream spans are children of plano(llm)
|
// Inject current LLM span's trace context so upstream spans are children of plano(llm)
|
||||||
global::get_text_map_propagator(|propagator| {
|
global::get_text_map_propagator(|propagator| {
|
||||||
let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||||
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut request_headers));
|
propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers));
|
||||||
});
|
});
|
||||||
|
|
||||||
// Capture start time right before sending request to upstream
|
// Capture start time right before sending request to upstream
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ use hermesllm::apis::openai::Message;
|
||||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||||
use hyper::header::HeaderMap;
|
use hyper::header::HeaderMap;
|
||||||
use opentelemetry::global;
|
use opentelemetry::global;
|
||||||
use opentelemetry::propagation::Injector;
|
use opentelemetry_http::HeaderInjector;
|
||||||
use tracing::{debug, info, instrument, warn};
|
use tracing::{debug, info, instrument, warn};
|
||||||
|
|
||||||
use crate::handlers::jsonrpc::{
|
use crate::handlers::jsonrpc::{
|
||||||
|
|
@ -51,19 +51,6 @@ pub enum PipelineError {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap
|
|
||||||
struct HeaderMapInjector<'a>(&'a mut HeaderMap);
|
|
||||||
|
|
||||||
impl<'a> Injector for HeaderMapInjector<'a> {
|
|
||||||
fn set(&mut self, key: &str, value: String) {
|
|
||||||
if let Ok(header_name) = hyper::header::HeaderName::from_bytes(key.as_bytes()) {
|
|
||||||
if let Ok(header_value) = hyper::header::HeaderValue::from_str(&value) {
|
|
||||||
self.0.insert(header_name, header_value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Service for processing agent pipelines
|
/// Service for processing agent pipelines
|
||||||
pub struct PipelineProcessor {
|
pub struct PipelineProcessor {
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
|
|
@ -169,7 +156,7 @@ impl PipelineProcessor {
|
||||||
global::get_text_map_propagator(|propagator| {
|
global::get_text_map_propagator(|propagator| {
|
||||||
let cx =
|
let cx =
|
||||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||||
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut headers));
|
propagator.inject_context(&cx, &mut HeaderInjector(&mut headers));
|
||||||
});
|
});
|
||||||
|
|
||||||
headers.insert(
|
headers.insert(
|
||||||
|
|
@ -546,7 +533,7 @@ impl PipelineProcessor {
|
||||||
global::get_text_map_propagator(|propagator| {
|
global::get_text_map_propagator(|propagator| {
|
||||||
let cx =
|
let cx =
|
||||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||||
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut agent_headers));
|
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
|
||||||
});
|
});
|
||||||
|
|
||||||
agent_headers.insert(
|
agent_headers.insert(
|
||||||
|
|
@ -645,7 +632,7 @@ impl PipelineProcessor {
|
||||||
global::get_text_map_propagator(|propagator| {
|
global::get_text_map_propagator(|propagator| {
|
||||||
let cx =
|
let cx =
|
||||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||||
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut agent_headers));
|
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
|
||||||
});
|
});
|
||||||
|
|
||||||
agent_headers.insert(
|
agent_headers.insert(
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,8 @@ use common::{
|
||||||
};
|
};
|
||||||
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
|
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
|
||||||
use hyper::header;
|
use hyper::header;
|
||||||
use opentelemetry::{global, propagation::Injector};
|
use opentelemetry::global;
|
||||||
|
use opentelemetry_http::HeaderInjector;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
|
|
@ -14,19 +15,6 @@ use crate::router::orchestrator_model_v1::{self};
|
||||||
|
|
||||||
use super::orchestrator_model::OrchestratorModel;
|
use super::orchestrator_model::OrchestratorModel;
|
||||||
|
|
||||||
/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap
|
|
||||||
struct HeaderMapInjector<'a>(&'a mut header::HeaderMap);
|
|
||||||
|
|
||||||
impl<'a> Injector for HeaderMapInjector<'a> {
|
|
||||||
fn set(&mut self, key: &str, value: String) {
|
|
||||||
if let Ok(header_name) = header::HeaderName::from_bytes(key.as_bytes()) {
|
|
||||||
if let Ok(header_value) = header::HeaderValue::from_str(&value) {
|
|
||||||
self.0.insert(header_name, header_value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct OrchestratorService {
|
pub struct OrchestratorService {
|
||||||
orchestrator_url: String,
|
orchestrator_url: String,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
|
|
@ -110,10 +98,7 @@ impl OrchestratorService {
|
||||||
global::get_text_map_propagator(|propagator| {
|
global::get_text_map_propagator(|propagator| {
|
||||||
let cx =
|
let cx =
|
||||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||||
propagator.inject_context(
|
propagator.inject_context(&cx, &mut HeaderInjector(&mut orchestration_request_headers));
|
||||||
&cx,
|
|
||||||
&mut HeaderMapInjector(&mut orchestration_request_headers),
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some(request_id) = request_id {
|
if let Some(request_id) = request_id {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue