mirror of
https://github.com/katanemo/plano.git
synced 2026-04-25 00:36:34 +02:00
stream access logs and improve access log format (#581)
This commit is contained in:
parent
43fceffd93
commit
f4d65e2469
9 changed files with 73 additions and 11 deletions
|
|
@ -40,4 +40,7 @@ COPY arch/supervisord.conf /etc/supervisor/conf.d/supervisord.conf
|
||||||
RUN pip install requests
|
RUN pip install requests
|
||||||
RUN mkdir -p /var/log/supervisor && touch /var/log/envoy.log /var/log/supervisor/supervisord.log
|
RUN mkdir -p /var/log/supervisor && touch /var/log/envoy.log /var/log/supervisor/supervisord.log
|
||||||
|
|
||||||
|
RUN mkdir -p /var/log && \
|
||||||
|
touch /var/log/access_ingress.log /var/log/access_ingress_prompt.log /var/log/access_internal.log /var/log/access_llm.log
|
||||||
|
|
||||||
ENTRYPOINT ["sh","-c", "/usr/bin/supervisord"]
|
ENTRYPOINT ["sh","-c", "/usr/bin/supervisord"]
|
||||||
|
|
|
||||||
|
|
@ -64,6 +64,8 @@ static_resources:
|
||||||
typed_config:
|
typed_config:
|
||||||
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
||||||
path: "/var/log/access_ingress.log"
|
path: "/var/log/access_ingress.log"
|
||||||
|
format: |
|
||||||
|
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
|
||||||
route_config:
|
route_config:
|
||||||
name: local_routes
|
name: local_routes
|
||||||
virtual_hosts:
|
virtual_hosts:
|
||||||
|
|
@ -117,6 +119,8 @@ static_resources:
|
||||||
typed_config:
|
typed_config:
|
||||||
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
||||||
path: "/var/log/access_ingress_prompt.log"
|
path: "/var/log/access_ingress_prompt.log"
|
||||||
|
format: |
|
||||||
|
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
|
||||||
route_config:
|
route_config:
|
||||||
name: local_routes
|
name: local_routes
|
||||||
virtual_hosts:
|
virtual_hosts:
|
||||||
|
|
@ -249,6 +253,8 @@ static_resources:
|
||||||
typed_config:
|
typed_config:
|
||||||
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
||||||
path: "/var/log/access_internal.log"
|
path: "/var/log/access_internal.log"
|
||||||
|
format: |
|
||||||
|
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
|
||||||
route_config:
|
route_config:
|
||||||
name: local_routes
|
name: local_routes
|
||||||
virtual_hosts:
|
virtual_hosts:
|
||||||
|
|
@ -321,6 +327,8 @@ static_resources:
|
||||||
typed_config:
|
typed_config:
|
||||||
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
||||||
path: "/var/log/access_llm.log"
|
path: "/var/log/access_llm.log"
|
||||||
|
format: |
|
||||||
|
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
|
||||||
route_config:
|
route_config:
|
||||||
name: local_routes
|
name: local_routes
|
||||||
virtual_hosts:
|
virtual_hosts:
|
||||||
|
|
@ -411,6 +419,8 @@ static_resources:
|
||||||
typed_config:
|
typed_config:
|
||||||
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
||||||
path: "/var/log/access_llm.log"
|
path: "/var/log/access_llm.log"
|
||||||
|
format: |
|
||||||
|
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
|
||||||
route_config:
|
route_config:
|
||||||
name: local_routes
|
name: local_routes
|
||||||
virtual_hosts:
|
virtual_hosts:
|
||||||
|
|
|
||||||
|
|
@ -2,14 +2,21 @@
|
||||||
nodaemon=true
|
nodaemon=true
|
||||||
|
|
||||||
[program:brightstaff]
|
[program:brightstaff]
|
||||||
command=sh -c "RUST_LOG=debug /app/brightstaff 2>&1 | tee /var/log/brightstaff.log"
|
command=sh -c "RUST_LOG=info /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done"
|
||||||
stdout_logfile=/dev/stdout
|
stdout_logfile=/dev/stdout
|
||||||
redirect_stderr=true
|
redirect_stderr=true
|
||||||
stdout_logfile_maxbytes=0
|
stdout_logfile_maxbytes=0
|
||||||
stderr_logfile_maxbytes=0
|
stderr_logfile_maxbytes=0
|
||||||
|
|
||||||
[program:envoy]
|
[program:envoy]
|
||||||
command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log//envoy.log"
|
command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[envoy_logs] ' \"$line\"; done"
|
||||||
|
stdout_logfile=/dev/stdout
|
||||||
|
redirect_stderr=true
|
||||||
|
stdout_logfile_maxbytes=0
|
||||||
|
stderr_logfile_maxbytes=0
|
||||||
|
|
||||||
|
[program:tail_access_logs]
|
||||||
|
command=/bin/sh -c "tail -F /var/log/access_*.log | while IFS= read -r line; do echo '[access_logs]' \"$line\"; done"
|
||||||
stdout_logfile=/dev/stdout
|
stdout_logfile=/dev/stdout
|
||||||
redirect_stderr=true
|
redirect_stderr=true
|
||||||
stdout_logfile_maxbytes=0
|
stdout_logfile_maxbytes=0
|
||||||
|
|
|
||||||
2
crates/Cargo.lock
generated
2
crates/Cargo.lock
generated
|
|
@ -175,6 +175,7 @@ dependencies = [
|
||||||
"serde_with",
|
"serde_with",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
"thiserror 2.0.12",
|
"thiserror 2.0.12",
|
||||||
|
"time",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
|
@ -2684,6 +2685,7 @@ dependencies = [
|
||||||
"sharded-slab",
|
"sharded-slab",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"thread_local",
|
"thread_local",
|
||||||
|
"time",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
"tracing-log",
|
"tracing-log",
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ serde_yaml = "0.9.34"
|
||||||
thiserror = "2.0.12"
|
thiserror = "2.0.12"
|
||||||
tokio = { version = "1.44.2", features = ["full"] }
|
tokio = { version = "1.44.2", features = ["full"] }
|
||||||
tokio-stream = "0.1.17"
|
tokio-stream = "0.1.17"
|
||||||
|
time = { version = "0.3", features = ["formatting", "macros"] }
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
tracing-opentelemetry = "0.30.0"
|
tracing-opentelemetry = "0.30.0"
|
||||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt"] }
|
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt", "time"] }
|
||||||
|
|
|
||||||
|
|
@ -153,7 +153,7 @@ pub async fn chat(
|
||||||
Ok(route) => match route {
|
Ok(route) => match route {
|
||||||
Some((_, model_name)) => model_name,
|
Some((_, model_name)) => model_name,
|
||||||
None => {
|
None => {
|
||||||
debug!(
|
info!(
|
||||||
"No route determined, using default model from request: {}",
|
"No route determined, using default model from request: {}",
|
||||||
chat_completions_request_for_arch_router.model
|
chat_completions_request_for_arch_router.model
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,46 @@
|
||||||
use std::sync::OnceLock;
|
use std::sync::OnceLock;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
use opentelemetry::global;
|
use opentelemetry::global;
|
||||||
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider};
|
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider};
|
||||||
use opentelemetry_stdout::SpanExporter;
|
use opentelemetry_stdout::SpanExporter;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
use tracing_subscriber::fmt::{format, time::FormatTime, FmtContext, FormatEvent, FormatFields};
|
||||||
|
use tracing::{Event, Subscriber};
|
||||||
|
use time::macros::format_description;
|
||||||
|
|
||||||
|
struct BracketedTime;
|
||||||
|
|
||||||
|
impl FormatTime for BracketedTime {
|
||||||
|
fn format_time(&self, w: &mut format::Writer<'_>) -> fmt::Result {
|
||||||
|
let now = time::OffsetDateTime::now_utc();
|
||||||
|
write!(w, "[{}]", now.format(&format_description!("[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:3]")).unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BracketedFormatter;
|
||||||
|
|
||||||
|
impl<S, N> FormatEvent<S, N> for BracketedFormatter
|
||||||
|
where
|
||||||
|
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
|
||||||
|
N: for<'a> FormatFields<'a> + 'static,
|
||||||
|
{
|
||||||
|
fn format_event(
|
||||||
|
&self,
|
||||||
|
ctx: &FmtContext<'_, S, N>,
|
||||||
|
mut writer: format::Writer<'_>,
|
||||||
|
event: &Event<'_>,
|
||||||
|
) -> fmt::Result {
|
||||||
|
let timer = BracketedTime;
|
||||||
|
timer.format_time(&mut writer)?;
|
||||||
|
|
||||||
|
write!(writer, "[{}] ", event.metadata().level().to_string().to_lowercase())?;
|
||||||
|
|
||||||
|
ctx.field_format().format_fields(writer.by_ref(), event)?;
|
||||||
|
|
||||||
|
writeln!(writer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static INIT_LOGGER: OnceLock<SdkTracerProvider> = OnceLock::new();
|
static INIT_LOGGER: OnceLock<SdkTracerProvider> = OnceLock::new();
|
||||||
|
|
||||||
|
|
@ -22,6 +59,7 @@ pub fn init_tracer() -> &'static SdkTracerProvider {
|
||||||
.with_env_filter(
|
.with_env_filter(
|
||||||
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
|
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
|
||||||
)
|
)
|
||||||
|
.event_format(BracketedFormatter)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
provider
|
provider
|
||||||
|
|
|
||||||
|
|
@ -204,7 +204,7 @@ impl StreamContext {
|
||||||
// Tokenize and record token count.
|
// Tokenize and record token count.
|
||||||
let token_count = tokenizer::token_count(model, json_string).unwrap_or(0);
|
let token_count = tokenizer::token_count(model, json_string).unwrap_or(0);
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"[ARCHGW_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}",
|
"[ARCHGW_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}",
|
||||||
self.request_identifier(),
|
self.request_identifier(),
|
||||||
model,
|
model,
|
||||||
|
|
@ -492,7 +492,7 @@ impl StreamContext {
|
||||||
body: &[u8],
|
body: &[u8],
|
||||||
provider_id: ProviderId,
|
provider_id: ProviderId,
|
||||||
) -> Result<Vec<u8>, Action> {
|
) -> Result<Vec<u8>, Action> {
|
||||||
info!(
|
debug!(
|
||||||
"[ARCHGW_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}",
|
"[ARCHGW_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}",
|
||||||
self.request_identifier(),
|
self.request_identifier(),
|
||||||
provider_id,
|
provider_id,
|
||||||
|
|
@ -531,7 +531,7 @@ impl StreamContext {
|
||||||
if let Some((prompt_tokens, completion_tokens, total_tokens)) =
|
if let Some((prompt_tokens, completion_tokens, total_tokens)) =
|
||||||
response.extract_usage_counts()
|
response.extract_usage_counts()
|
||||||
{
|
{
|
||||||
info!(
|
debug!(
|
||||||
"[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}",
|
"[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}",
|
||||||
self.request_identifier(),
|
self.request_identifier(),
|
||||||
prompt_tokens,
|
prompt_tokens,
|
||||||
|
|
@ -697,7 +697,7 @@ impl HttpContext for StreamContext {
|
||||||
//We need to deserialize the request body based on the resolved API
|
//We need to deserialize the request body based on the resolved API
|
||||||
let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() {
|
let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() {
|
||||||
Some(the_client_api) => {
|
Some(the_client_api) => {
|
||||||
info!(
|
debug!(
|
||||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}",
|
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}",
|
||||||
self.request_identifier(),
|
self.request_identifier(),
|
||||||
the_client_api,
|
the_client_api,
|
||||||
|
|
@ -785,7 +785,7 @@ impl HttpContext for StreamContext {
|
||||||
// Extract user message for tracing
|
// Extract user message for tracing
|
||||||
self.user_message = deserialized_client_request.get_recent_user_message();
|
self.user_message = deserialized_client_request.get_recent_user_message();
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}",
|
"[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}",
|
||||||
self.request_identifier(),
|
self.request_identifier(),
|
||||||
model_requested,
|
model_requested,
|
||||||
|
|
@ -871,7 +871,7 @@ impl HttpContext for StreamContext {
|
||||||
if let Ok(status_code) = status_str.parse::<u16>() {
|
if let Ok(status_code) = status_str.parse::<u16>() {
|
||||||
self.upstream_status_code = StatusCode::from_u16(status_code).ok();
|
self.upstream_status_code = StatusCode::from_u16(status_code).ok();
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_STATUS: {}",
|
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_STATUS: {}",
|
||||||
self.request_identifier(),
|
self.request_identifier(),
|
||||||
status_code
|
status_code
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,8 @@ Content-Type: application/json
|
||||||
"role": "user",
|
"role": "user",
|
||||||
"content": "hello"
|
"content": "hello"
|
||||||
}
|
}
|
||||||
]
|
],
|
||||||
|
"model": "gpt-4o-mini"
|
||||||
}
|
}
|
||||||
|
|
||||||
### llm gateway request (streaming)
|
### llm gateway request (streaming)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue