Compare commits

...

4 commits

Author SHA1 Message Date
Syed Hashmi
25da5b4293 Merge remote-tracking branch 'origin/main' into syed/signals_port
Made-with: Cursor

# Conflicts:
#	crates/brightstaff/src/streaming.rs
2026-04-22 21:40:03 -07:00
Syed Hashmi
f7d067269c signals: group summary by taxonomy, factor misalignment_ratio
Addresses #903 review feedback from @nehcgs:

- generate_summary() now renders explicit Interaction / Execution /
  Environment headers so the paper taxonomy is visible at a glance,
  even when no signals fired in a given layer. Quality-driving callouts
  (high misalignment rate, looping detected, escalation requested) are
  appended after the layer summary as an alerts tail.

- repair_ratio (legacy taxonomy name) renamed to misalignment_ratio
  and factored into a single InteractionSignals::misalignment_ratio()
  helper so assess_quality and generate_summary share one source of
  truth instead of recomputing the same divide twice.

Two new unit tests pin the layer headers and the (sev N) severity
suffix. Parity with the python reference is preserved at the Tier-A
level (per-type counts + overall_quality); only the human-readable
summary string diverges, which the parity comparator already classifies
as Tier-C.

Made-with: Cursor
2026-04-22 21:34:32 -07:00
Syed Hashmi
12b6b3d814 style: format parity harness with black
Made-with: Cursor
2026-04-22 21:34:23 -07:00
Adil Hafeez
22f332f62d
Add Prometheus metrics endpoint and Grafana dashboard for brightstaff (#904)
Some checks are pending
CI / pre-commit (push) Waiting to run
CI / plano-tools-tests (push) Waiting to run
CI / native-smoke-test (push) Waiting to run
CI / docker-build (push) Waiting to run
CI / validate-config (push) Waiting to run
CI / security-scan (push) Blocked by required conditions
CI / test-prompt-gateway (push) Blocked by required conditions
CI / test-model-alias-routing (push) Blocked by required conditions
CI / test-responses-api-with-state (push) Blocked by required conditions
CI / e2e-plano-tests (3.10) (push) Blocked by required conditions
CI / e2e-plano-tests (3.11) (push) Blocked by required conditions
CI / e2e-plano-tests (3.12) (push) Blocked by required conditions
CI / e2e-plano-tests (3.13) (push) Blocked by required conditions
CI / e2e-plano-tests (3.14) (push) Blocked by required conditions
CI / e2e-demo-preference (push) Blocked by required conditions
CI / e2e-demo-currency (push) Blocked by required conditions
Publish docker image (latest) / build-arm64 (push) Waiting to run
Publish docker image (latest) / build-amd64 (push) Waiting to run
Publish docker image (latest) / create-manifest (push) Blocked by required conditions
Build and Deploy Documentation / build (push) Waiting to run
2026-04-22 11:19:10 -07:00
22 changed files with 1848 additions and 65 deletions

View file

@ -0,0 +1,541 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"description": "RED, LLM upstream, routing service, and process metrics for brightstaff. Pair with Envoy admin metrics from cluster=bright_staff.",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 1,
"id": null,
"links": [],
"liveNow": false,
"panels": [
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
"id": 100,
"panels": [],
"title": "HTTP RED",
"type": "row"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": {
"axisLabel": "req/s",
"drawStyle": "line",
"fillOpacity": 10,
"lineWidth": 1,
"showPoints": "never"
},
"unit": "reqps"
}
},
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 1 },
"id": 1,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (handler) (rate(brightstaff_http_requests_total[1m]))",
"legendFormat": "{{handler}}",
"refId": "A"
}
],
"title": "Rate — brightstaff RPS by handler",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "5xx fraction over 5m. Page-worthy when sustained above ~1%.",
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 0.01 },
{ "color": "red", "value": 0.05 }
]
},
"unit": "percentunit"
}
},
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 1 },
"id": 2,
"options": {
"colorMode": "background",
"graphMode": "area",
"reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum(rate(brightstaff_http_requests_total{status_class=\"5xx\"}[5m])) / clamp_min(sum(rate(brightstaff_http_requests_total[5m])), 1)",
"legendFormat": "5xx rate",
"refId": "A"
}
],
"title": "Errors — brightstaff 5xx rate",
"type": "stat"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "p50/p95/p99 by handler, computed from histogram buckets over 5m.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 5, "lineWidth": 1, "showPoints": "never" },
"unit": "s"
}
},
"gridPos": { "h": 9, "w": 24, "x": 0, "y": 9 },
"id": 3,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "histogram_quantile(0.50, sum by (le, handler) (rate(brightstaff_http_request_duration_seconds_bucket[5m])))",
"legendFormat": "p50 {{handler}}",
"refId": "A"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "histogram_quantile(0.95, sum by (le, handler) (rate(brightstaff_http_request_duration_seconds_bucket[5m])))",
"legendFormat": "p95 {{handler}}",
"refId": "B"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "histogram_quantile(0.99, sum by (le, handler) (rate(brightstaff_http_request_duration_seconds_bucket[5m])))",
"legendFormat": "p99 {{handler}}",
"refId": "C"
}
],
"title": "Duration — p50 / p95 / p99 by handler",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "In-flight requests by handler. Climbs before latency does when brightstaff is saturated.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 10, "lineWidth": 1, "showPoints": "never" },
"unit": "short"
}
},
"gridPos": { "h": 8, "w": 24, "x": 0, "y": 18 },
"id": 4,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (handler) (brightstaff_http_in_flight_requests)",
"legendFormat": "{{handler}}",
"refId": "A"
}
],
"title": "In-flight requests by handler",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 26 },
"id": 200,
"panels": [],
"title": "LLM upstream",
"type": "row"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 5, "lineWidth": 1, "showPoints": "never" },
"unit": "s"
}
},
"gridPos": { "h": 9, "w": 12, "x": 0, "y": 27 },
"id": 5,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "histogram_quantile(0.95, sum by (le, provider, model) (rate(brightstaff_llm_upstream_duration_seconds_bucket[5m])))",
"legendFormat": "p95 {{provider}}/{{model}}",
"refId": "A"
}
],
"title": "LLM upstream p95 by provider/model",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "All non-success error classes. timeout/connect = network, 5xx/429 = provider, parse = body shape mismatch, stream = mid-stream disconnect.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 30, "lineWidth": 1, "showPoints": "never", "stacking": { "mode": "normal" } },
"unit": "reqps"
}
},
"gridPos": { "h": 9, "w": 12, "x": 12, "y": 27 },
"id": 6,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (provider, error_class) (rate(brightstaff_llm_upstream_requests_total{error_class!=\"none\"}[5m]))",
"legendFormat": "{{provider}} / {{error_class}}",
"refId": "A"
}
],
"title": "LLM upstream errors by provider / class",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "Streaming only. Empty if the route never streams.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 5, "lineWidth": 1, "showPoints": "never" },
"unit": "s"
}
},
"gridPos": { "h": 9, "w": 12, "x": 0, "y": 36 },
"id": 7,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "histogram_quantile(0.95, sum by (le, provider, model) (rate(brightstaff_llm_time_to_first_token_seconds_bucket[5m])))",
"legendFormat": "p95 {{provider}}/{{model}}",
"refId": "A"
}
],
"title": "Time-to-first-token p95 (streaming)",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "Tokens/sec by provider/model/kind — proxy for cost. Stacked.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 30, "lineWidth": 1, "showPoints": "never", "stacking": { "mode": "normal" } },
"unit": "tokens/s"
}
},
"gridPos": { "h": 9, "w": 12, "x": 12, "y": 36 },
"id": 8,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (provider, model, kind) (rate(brightstaff_llm_tokens_total[5m]))",
"legendFormat": "{{provider}}/{{model}} {{kind}}",
"refId": "A"
}
],
"title": "Token throughput by provider / model / kind",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 45 },
"id": 300,
"panels": [],
"title": "Routing service",
"type": "row"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "Which models the orchestrator picked over the last 15 minutes.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"unit": "short"
}
},
"gridPos": { "h": 9, "w": 12, "x": 0, "y": 46 },
"id": 9,
"options": {
"displayMode": "gradient",
"orientation": "horizontal",
"reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (selected_model) (increase(brightstaff_router_decisions_total[15m]))",
"legendFormat": "{{selected_model}}",
"refId": "A"
}
],
"title": "Model selection distribution (last 15m)",
"type": "bargauge"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "Fraction of decisions that fell back (orchestrator returned `none` or errored). High = router can't classify intent or no candidates configured.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 10, "lineWidth": 1, "showPoints": "never" },
"unit": "percentunit"
}
},
"gridPos": { "h": 9, "w": 12, "x": 12, "y": 46 },
"id": 10,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (route) (rate(brightstaff_router_decisions_total{fallback=\"true\"}[5m])) / clamp_min(sum by (route) (rate(brightstaff_router_decisions_total[5m])), 1)",
"legendFormat": "{{route}}",
"refId": "A"
}
],
"title": "Fallback rate by route",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 5, "lineWidth": 1, "showPoints": "never" },
"unit": "s"
}
},
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 55 },
"id": 11,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "histogram_quantile(0.95, sum by (le, route) (rate(brightstaff_router_decision_duration_seconds_bucket[5m])))",
"legendFormat": "p95 {{route}}",
"refId": "A"
}
],
"title": "Router decision p95 latency",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "Hit / (hit + miss). Low ratio = sessions aren't being reused or TTL too short.",
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "red", "value": null },
{ "color": "yellow", "value": 0.5 },
{ "color": "green", "value": 0.8 }
]
},
"unit": "percentunit",
"min": 0,
"max": 1
}
},
"gridPos": { "h": 8, "w": 6, "x": 12, "y": 55 },
"id": 12,
"options": {
"colorMode": "background",
"graphMode": "area",
"reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum(rate(brightstaff_session_cache_events_total{outcome=\"hit\"}[5m])) / clamp_min(sum(rate(brightstaff_session_cache_events_total{outcome=~\"hit|miss\"}[5m])), 1)",
"legendFormat": "hit rate",
"refId": "A"
}
],
"title": "Session cache hit rate",
"type": "stat"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "decision_served = a real model picked. no_candidates = sentinel `none` returned. policy_error = orchestrator failed.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 30, "lineWidth": 1, "showPoints": "never", "stacking": { "mode": "normal" } },
"unit": "reqps"
}
},
"gridPos": { "h": 8, "w": 6, "x": 18, "y": 55 },
"id": 13,
"options": {
"legend": { "displayMode": "list", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum by (outcome) (rate(brightstaff_routing_service_requests_total[5m]))",
"legendFormat": "{{outcome}}",
"refId": "A"
}
],
"title": "/routing/* outcomes",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 63 },
"id": 400,
"panels": [],
"title": "Process & Envoy link",
"type": "row"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"description": "Compare to brightstaff RPS (panel 1) — sustained gap = network or Envoy queueing.",
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 10, "lineWidth": 1, "showPoints": "never" },
"unit": "reqps"
}
},
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 64 },
"id": 14,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum(rate(envoy_cluster_upstream_rq_total{envoy_cluster_name=\"bright_staff\"}[1m]))",
"legendFormat": "envoy → bright_staff",
"refId": "A"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "sum(rate(brightstaff_http_requests_total[1m]))",
"legendFormat": "brightstaff served",
"refId": "B"
}
],
"title": "Envoy → brightstaff link health",
"type": "timeseries"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": { "drawStyle": "line", "fillOpacity": 10, "lineWidth": 1, "showPoints": "never" }
},
"overrides": [
{
"matcher": { "id": "byName", "options": "RSS" },
"properties": [{ "id": "unit", "value": "bytes" }]
},
{
"matcher": { "id": "byName", "options": "CPU" },
"properties": [{ "id": "unit", "value": "percentunit" }]
}
]
},
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 64 },
"id": 15,
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "showLegend": true },
"tooltip": { "mode": "multi" }
},
"targets": [
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "process_resident_memory_bytes{job=\"brightstaff\"}",
"legendFormat": "RSS",
"refId": "A"
},
{
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"expr": "rate(process_cpu_seconds_total{job=\"brightstaff\"}[1m])",
"legendFormat": "CPU",
"refId": "B"
}
],
"title": "Brightstaff process RSS / CPU",
"type": "timeseries"
}
],
"refresh": "30s",
"schemaVersion": 39,
"tags": ["plano", "brightstaff", "llm"],
"templating": {
"list": [
{
"name": "DS_PROMETHEUS",
"label": "Prometheus",
"type": "datasource",
"query": "prometheus",
"current": { "selected": false, "text": "Prometheus", "value": "DS_PROMETHEUS" },
"hide": 0,
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"includeAll": false,
"multi": false
}
]
},
"time": { "from": "now-1h", "to": "now" },
"timepicker": {},
"timezone": "browser",
"title": "Brightstaff (Plano dataplane)",
"uid": "brightstaff",
"version": 1,
"weekStart": ""
}

View file

@ -0,0 +1,43 @@
# One-command Prometheus + Grafana stack for observing a locally-running
# Plano (Envoy admin :9901 + brightstaff :9092 on the host).
#
# cd config/grafana
# docker compose up -d
# open http://localhost:3000 (admin / admin)
#
# Grafana is preloaded with:
# - Prometheus datasource (uid=DS_PROMETHEUS) → http://prometheus:9090
# - Brightstaff dashboard (auto-imported from brightstaff_dashboard.json)
#
# Prometheus scrapes the host's :9092 and :9901 via host.docker.internal.
# On Linux this works because of the `extra_hosts: host-gateway` mapping
# below. On Mac/Win it works natively.
services:
prometheus:
image: prom/prometheus:latest
container_name: plano-prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus_scrape.yaml:/etc/prometheus/prometheus.yml:ro
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
grafana:
image: grafana/grafana:latest
container_name: plano-grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
GF_AUTH_ANONYMOUS_ENABLED: "true"
GF_AUTH_ANONYMOUS_ORG_ROLE: Viewer
volumes:
- ./provisioning:/etc/grafana/provisioning:ro
- ./brightstaff_dashboard.json:/var/lib/grafana/dashboards/brightstaff_dashboard.json:ro
depends_on:
- prometheus
restart: unless-stopped

View file

@ -0,0 +1,44 @@
# Prometheus config that scrapes Plano (Envoy admin + brightstaff). This is
# a complete Prometheus config — mount it directly at
# /etc/prometheus/prometheus.yml. The included docker-compose.yaml does this
# for you.
#
# Targets:
# - envoy:9901 Envoy admin → envoy_cluster_*, envoy_http_*, envoy_server_*.
# - brightstaff:9092 Native dataplane → brightstaff_http_*, brightstaff_llm_*,
# brightstaff_router_*, process_*.
#
# Hostname `host.docker.internal` works on Docker Desktop (Mac/Win) and on
# Linux when the container is started with `--add-host=host.docker.internal:
# host-gateway` (the included compose does this). If Plano runs *inside*
# Docker on the same network as Prometheus, replace it with the container
# name (e.g. `plano:9092`).
#
# This file is unrelated to demos/llm_routing/model_routing_service/prometheus.yaml,
# which scrapes a fake metrics service to feed the routing engine.
global:
scrape_interval: 15s
scrape_timeout: 10s
evaluation_interval: 15s
scrape_configs:
- job_name: envoy
honor_timestamps: true
metrics_path: /stats
params:
format: ["prometheus"]
static_configs:
- targets:
- host.docker.internal:9901
labels:
service: plano
- job_name: brightstaff
honor_timestamps: true
metrics_path: /metrics
static_configs:
- targets:
- host.docker.internal:9092
labels:
service: plano

View file

@ -0,0 +1,15 @@
# Auto-load the brightstaff dashboard JSON on Grafana startup.
apiVersion: 1
providers:
- name: brightstaff
orgId: 1
folder: Plano
type: file
disableDeletion: false
updateIntervalSeconds: 30
allowUiUpdates: true
options:
path: /var/lib/grafana/dashboards
foldersFromFilesStructure: false

View file

@ -0,0 +1,14 @@
# Auto-provision the Prometheus datasource so the bundled dashboard wires up
# without any clicks. The `uid: DS_PROMETHEUS` matches the templated input in
# brightstaff_dashboard.json.
apiVersion: 1
datasources:
- name: Prometheus
uid: DS_PROMETHEUS
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true

332
crates/Cargo.lock generated
View file

@ -23,6 +23,18 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@ -257,6 +269,24 @@ dependencies = [
"vsimd",
]
[[package]]
name = "bindgen"
version = "0.72.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"itertools 0.13.0",
"proc-macro2",
"quote",
"regex",
"rustc-hash 2.1.2",
"shlex",
"syn 2.0.117",
]
[[package]]
name = "bit-set"
version = "0.5.3"
@ -316,6 +346,9 @@ dependencies = [
"hyper 1.9.0",
"hyper-util",
"lru",
"metrics 0.23.1",
"metrics-exporter-prometheus",
"metrics-process",
"mockito",
"opentelemetry",
"opentelemetry-http",
@ -392,6 +425,15 @@ dependencies = [
"shlex",
]
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
version = "1.0.4"
@ -429,6 +471,17 @@ dependencies = [
"windows-link",
]
[[package]]
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "cmov"
version = "0.5.3"
@ -575,6 +628,21 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "crypto-common"
version = "0.1.7"
@ -1071,6 +1139,12 @@ dependencies = [
"wasip3",
]
[[package]]
name = "glob"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "governor"
version = "0.6.3"
@ -1129,7 +1203,7 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25"
dependencies = [
"ahash",
"ahash 0.3.8",
"autocfg",
]
@ -1139,6 +1213,15 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash 0.8.12",
]
[[package]]
name = "hashbrown"
version = "0.15.5"
@ -1190,6 +1273,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "hermit-abi"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "hex"
version = "0.4.3"
@ -1666,6 +1755,27 @@ version = "0.2.185"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f"
[[package]]
name = "libloading"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55"
dependencies = [
"cfg-if",
"windows-link",
]
[[package]]
name = "libproc"
version = "0.14.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a54ad7278b8bc5301d5ffd2a94251c004feb971feba96c971ea4063645990757"
dependencies = [
"bindgen",
"errno",
"libc",
]
[[package]]
name = "libredox"
version = "0.1.16"
@ -1746,6 +1856,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "mach2"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dae608c151f68243f2b000364e1f7b186d9c29845f7d2d85bd31b9ad77ad552b"
[[package]]
name = "matchers"
version = "0.2.0"
@ -1783,6 +1899,77 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "metrics"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3045b4193fbdc5b5681f32f11070da9be3609f189a79f3390706d42587f46bb5"
dependencies = [
"ahash 0.8.12",
"portable-atomic",
]
[[package]]
name = "metrics"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8"
dependencies = [
"ahash 0.8.12",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.15.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6"
dependencies = [
"base64 0.22.1",
"http-body-util",
"hyper 1.9.0",
"hyper-util",
"indexmap 2.14.0",
"ipnet",
"metrics 0.23.1",
"metrics-util",
"quanta",
"thiserror 1.0.69",
"tokio",
"tracing",
]
[[package]]
name = "metrics-process"
version = "2.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4268d87f64a752f5a651314fc683f04da10be65701ea3e721ba4d74f79163cac"
dependencies = [
"libc",
"libproc",
"mach2",
"metrics 0.24.3",
"once_cell",
"procfs",
"rlimit",
"windows",
]
[[package]]
name = "metrics-util"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.14.5",
"metrics 0.23.1",
"num_cpus",
"quanta",
"sketches-ddsketch",
]
[[package]]
name = "mime"
version = "0.3.17"
@ -1936,6 +2123,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "objc2-core-foundation"
version = "0.3.2"
@ -2279,6 +2476,27 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "procfs"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25485360a54d6861439d60facef26de713b1e126bf015ec8f98239467a2b82f7"
dependencies = [
"bitflags",
"procfs-core",
"rustix",
]
[[package]]
name = "procfs-core"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6401bf7b6af22f78b563665d15a22e9aef27775b79b149a66ca022468a4e405"
dependencies = [
"bitflags",
"hex",
]
[[package]]
name = "prompt_gateway"
version = "0.1.0"
@ -2334,6 +2552,21 @@ dependencies = [
"log",
]
[[package]]
name = "quanta"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.1+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quinn"
version = "0.11.9"
@ -2486,6 +2719,15 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69"
[[package]]
name = "raw-cpuid"
version = "11.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
dependencies = [
"bitflags",
]
[[package]]
name = "redis"
version = "0.27.6"
@ -2647,6 +2889,15 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rlimit"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f35ee2729c56bb610f6dba436bf78135f728b7373bdffae2ec815b2d3eb98cc3"
dependencies = [
"libc",
]
[[package]]
name = "rustc-hash"
version = "1.1.0"
@ -3099,6 +3350,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
[[package]]
name = "sketches-ddsketch"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c"
[[package]]
name = "slab"
version = "0.4.12"
@ -4004,6 +4261,49 @@ dependencies = [
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580"
dependencies = [
"windows-collections",
"windows-core",
"windows-future",
"windows-numerics",
]
[[package]]
name = "windows-collections"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610"
dependencies = [
"windows-core",
]
[[package]]
name = "windows-core"
version = "0.62.2"
@ -4017,6 +4317,17 @@ dependencies = [
"windows-strings",
]
[[package]]
name = "windows-future"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb"
dependencies = [
"windows-core",
"windows-link",
"windows-threading",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
@ -4045,6 +4356,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-numerics"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26"
dependencies = [
"windows-core",
"windows-link",
]
[[package]]
name = "windows-registry"
version = "0.6.1"
@ -4134,6 +4455,15 @@ dependencies = [
"windows_x86_64_msvc 0.53.1",
]
[[package]]
name = "windows-threading"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37"
dependencies = [
"windows-link",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"

View file

@ -36,6 +36,9 @@ pretty_assertions = "1.4.1"
rand = "0.9.2"
regex = "1.10"
lru = "0.12"
metrics = "0.23"
metrics-exporter-prometheus = { version = "0.15", default-features = false, features = ["http-listener"] }
metrics-process = "2.1"
redis = { version = "0.27", features = ["tokio-comp"] }
reqwest = { version = "0.12.15", features = ["stream"] }
serde = { version = "1.0.219", features = ["derive"] }

View file

@ -24,13 +24,14 @@ use crate::app_state::AppState;
use crate::handlers::agents::pipeline::PipelineProcessor;
use crate::handlers::extract_request_id;
use crate::handlers::full;
use crate::metrics as bs_metrics;
use crate::state::response_state_processor::ResponsesStateProcessor;
use crate::state::{
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
};
use crate::streaming::{
create_streaming_response, create_streaming_response_with_output_filter, truncate_message,
ObservableStreamProcessor, StreamProcessor,
LlmMetricsCtx, ObservableStreamProcessor, StreamProcessor,
};
use crate::tracing::{
collect_custom_trace_attributes, llm as tracing_llm, operation_component,
@ -686,6 +687,13 @@ async fn send_upstream(
let request_start_time = std::time::Instant::now();
// Labels for LLM upstream metrics. We prefer `resolved_model` (post-routing)
// and derive the provider from its `provider/model` prefix. This matches the
// same model id the cost/latency router keys off.
let (metric_provider_raw, metric_model_raw) = bs_metrics::split_provider_model(resolved_model);
let metric_provider = metric_provider_raw.to_string();
let metric_model = metric_model_raw.to_string();
let llm_response = match http_client
.post(upstream_url)
.headers(request_headers.clone())
@ -695,6 +703,14 @@ async fn send_upstream(
{
Ok(res) => res,
Err(err) => {
let err_class = bs_metrics::llm_error_class_from_reqwest(&err);
bs_metrics::record_llm_upstream(
&metric_provider,
&metric_model,
0,
err_class,
request_start_time.elapsed(),
);
let err_msg = format!("Failed to send request: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
@ -750,7 +766,12 @@ async fn send_upstream(
span_name,
request_start_time,
messages_for_signals,
);
)
.with_llm_metrics(LlmMetricsCtx {
provider: metric_provider.clone(),
model: metric_model.clone(),
upstream_status: upstream_status.as_u16(),
});
let output_filter_request_headers = if filter_pipeline.has_output_filters() {
Some(request_headers.clone())

View file

@ -5,10 +5,24 @@ use hyper::StatusCode;
use std::sync::Arc;
use tracing::{debug, info, warn};
use crate::metrics as bs_metrics;
use crate::metrics::labels as metric_labels;
use crate::router::orchestrator::OrchestratorService;
use crate::streaming::truncate_message;
use crate::tracing::routing;
/// Classify a request path (already stripped of `/agents` or `/routing` by
/// the caller) into the fixed `route` label used on routing metrics.
fn route_label_for_path(request_path: &str) -> &'static str {
if request_path.starts_with("/agents") {
metric_labels::ROUTE_AGENT
} else if request_path.starts_with("/routing") {
metric_labels::ROUTE_ROUTING
} else {
metric_labels::ROUTE_LLM
}
}
pub struct RoutingResult {
/// Primary model to use (first in the ranked list).
pub model_name: String,
@ -106,15 +120,23 @@ pub async fn router_chat_get_upstream_model(
)
.await;
let determination_ms = routing_start_time.elapsed().as_millis() as i64;
let determination_elapsed = routing_start_time.elapsed();
let determination_ms = determination_elapsed.as_millis() as i64;
let current_span = tracing::Span::current();
current_span.record(routing::ROUTE_DETERMINATION_MS, determination_ms);
let route_label = route_label_for_path(request_path);
match routing_result {
Ok(route) => match route {
Some((route_name, ranked_models)) => {
let model_name = ranked_models.first().cloned().unwrap_or_default();
current_span.record("route.selected_model", model_name.as_str());
bs_metrics::record_router_decision(
route_label,
&model_name,
false,
determination_elapsed,
);
Ok(RoutingResult {
model_name,
models: ranked_models,
@ -126,6 +148,12 @@ pub async fn router_chat_get_upstream_model(
// This signals to llm.rs to use the original validated request model
current_span.record("route.selected_model", "none");
info!("no route determined, using default model");
bs_metrics::record_router_decision(
route_label,
"none",
true,
determination_elapsed,
);
Ok(RoutingResult {
model_name: "none".to_string(),
@ -136,6 +164,7 @@ pub async fn router_chat_get_upstream_model(
},
Err(err) => {
current_span.record("route.selected_model", "unknown");
bs_metrics::record_router_decision(route_label, "unknown", true, determination_elapsed);
Err(RoutingError::internal_error(format!(
"Failed to determine route: {}",
err

View file

@ -12,6 +12,8 @@ use tracing::{debug, info, info_span, warn, Instrument};
use super::extract_or_generate_traceparent;
use crate::handlers::llm::model_selection::router_chat_get_upstream_model;
use crate::metrics as bs_metrics;
use crate::metrics::labels as metric_labels;
use crate::router::orchestrator::OrchestratorService;
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
@ -230,6 +232,17 @@ async fn routing_decision_inner(
pinned: false,
};
// Distinguish "decision served" (a concrete model picked) from
// "no_candidates" (the sentinel "none" returned when nothing
// matched). The handler still responds 200 in both cases, so RED
// metrics alone can't tell them apart.
let outcome = if response.models.first().map(|m| m == "none").unwrap_or(true) {
metric_labels::ROUTING_SVC_NO_CANDIDATES
} else {
metric_labels::ROUTING_SVC_DECISION_SERVED
};
bs_metrics::record_routing_service_outcome(outcome);
info!(
primary_model = %response.models.first().map(|s| s.as_str()).unwrap_or("none"),
total_models = response.models.len(),
@ -249,6 +262,7 @@ async fn routing_decision_inner(
.unwrap())
}
Err(err) => {
bs_metrics::record_routing_service_outcome(metric_labels::ROUTING_SVC_POLICY_ERROR);
warn!(error = %err.message, "routing decision failed");
Ok(BrightStaffError::InternalServerError(err.message).into_response())
}

View file

@ -1,5 +1,6 @@
pub mod app_state;
pub mod handlers;
pub mod metrics;
pub mod router;
pub mod session_cache;
pub mod signals;

View file

@ -5,6 +5,8 @@ use brightstaff::handlers::function_calling::function_calling_chat_handler;
use brightstaff::handlers::llm::llm_chat;
use brightstaff::handlers::models::list_models;
use brightstaff::handlers::routing_service::routing_decision;
use brightstaff::metrics as bs_metrics;
use brightstaff::metrics::labels as metric_labels;
use brightstaff::router::model_metrics::ModelMetricsService;
use brightstaff::router::orchestrator::OrchestratorService;
use brightstaff::session_cache::init_session_cache;
@ -384,10 +386,79 @@ async fn init_state_storage(
// Request routing
// ---------------------------------------------------------------------------
/// Normalized method label — limited set so we never emit a free-form string.
fn method_label(method: &Method) -> &'static str {
match *method {
Method::GET => "GET",
Method::POST => "POST",
Method::PUT => "PUT",
Method::DELETE => "DELETE",
Method::PATCH => "PATCH",
Method::HEAD => "HEAD",
Method::OPTIONS => "OPTIONS",
_ => "OTHER",
}
}
/// Compute the fixed `handler` metric label from the request's path+method.
/// Returning `None` for fall-through means `route()` will hand the request to
/// the catch-all 404 branch.
fn handler_label_for(method: &Method, path: &str) -> &'static str {
if let Some(stripped) = path.strip_prefix("/agents") {
if matches!(
stripped,
CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH
) {
return metric_labels::HANDLER_AGENT_CHAT;
}
}
if let Some(stripped) = path.strip_prefix("/routing") {
if matches!(
stripped,
CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH
) {
return metric_labels::HANDLER_ROUTING_DECISION;
}
}
match (method, path) {
(&Method::POST, CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH) => {
metric_labels::HANDLER_LLM_CHAT
}
(&Method::POST, "/function_calling") => metric_labels::HANDLER_FUNCTION_CALLING,
(&Method::GET, "/v1/models" | "/agents/v1/models") => metric_labels::HANDLER_LIST_MODELS,
(&Method::OPTIONS, "/v1/models" | "/agents/v1/models") => {
metric_labels::HANDLER_CORS_PREFLIGHT
}
_ => metric_labels::HANDLER_NOT_FOUND,
}
}
/// Route an incoming HTTP request to the appropriate handler.
async fn route(
req: Request<Incoming>,
state: Arc<AppState>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let handler = handler_label_for(req.method(), req.uri().path());
let method = method_label(req.method());
let started = std::time::Instant::now();
let _in_flight = bs_metrics::InFlightGuard::new(handler);
let result = dispatch(req, state).await;
let status = match &result {
Ok(resp) => resp.status().as_u16(),
// hyper::Error here means the body couldn't be produced; conventionally 500.
Err(_) => 500,
};
bs_metrics::record_http(handler, method, status, started);
result
}
/// Inner dispatcher split out so `route()` can wrap it with metrics without
/// duplicating the match tree.
async fn dispatch(
req: Request<Incoming>,
state: Arc<AppState>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let parent_cx = global::get_text_map_propagator(|p| p.extract(&HeaderExtractor(req.headers())));
let path = req.uri().path().to_string();
@ -503,6 +574,7 @@ async fn run_server(state: Arc<AppState>) -> Result<(), Box<dyn std::error::Erro
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config = load_config()?;
let _tracer_provider = init_tracer(config.tracing.as_ref());
bs_metrics::init();
info!("loaded plano_config.yaml");
let state = Arc::new(init_app_state(&config).await?);
run_server(state).await

View file

@ -0,0 +1,38 @@
//! Fixed label-value constants so callers never emit free-form strings
//! (which would blow up cardinality).
// Handler enum — derived from the path+method match in `route()`.
pub const HANDLER_AGENT_CHAT: &str = "agent_chat";
pub const HANDLER_ROUTING_DECISION: &str = "routing_decision";
pub const HANDLER_LLM_CHAT: &str = "llm_chat";
pub const HANDLER_FUNCTION_CALLING: &str = "function_calling";
pub const HANDLER_LIST_MODELS: &str = "list_models";
pub const HANDLER_CORS_PREFLIGHT: &str = "cors_preflight";
pub const HANDLER_NOT_FOUND: &str = "not_found";
// Router "route" class — which brightstaff endpoint prompted the decision.
pub const ROUTE_AGENT: &str = "agent";
pub const ROUTE_ROUTING: &str = "routing";
pub const ROUTE_LLM: &str = "llm";
// Token kind for brightstaff_llm_tokens_total.
pub const TOKEN_KIND_PROMPT: &str = "prompt";
pub const TOKEN_KIND_COMPLETION: &str = "completion";
// LLM error_class values (match docstring in metrics/mod.rs).
pub const LLM_ERR_NONE: &str = "none";
pub const LLM_ERR_TIMEOUT: &str = "timeout";
pub const LLM_ERR_CONNECT: &str = "connect";
pub const LLM_ERR_PARSE: &str = "parse";
pub const LLM_ERR_OTHER: &str = "other";
pub const LLM_ERR_STREAM: &str = "stream";
// Routing service outcome values.
pub const ROUTING_SVC_DECISION_SERVED: &str = "decision_served";
pub const ROUTING_SVC_NO_CANDIDATES: &str = "no_candidates";
pub const ROUTING_SVC_POLICY_ERROR: &str = "policy_error";
// Session cache outcome values.
pub const SESSION_CACHE_HIT: &str = "hit";
pub const SESSION_CACHE_MISS: &str = "miss";
pub const SESSION_CACHE_STORE: &str = "store";

View file

@ -0,0 +1,377 @@
//! Prometheus metrics for brightstaff.
//!
//! Installs the `metrics` global recorder backed by
//! `metrics-exporter-prometheus` and exposes a `/metrics` HTTP endpoint on a
//! dedicated admin port (default `0.0.0.0:9092`, overridable via
//! `METRICS_BIND_ADDRESS`).
//!
//! Emitted metric families (see `describe_all` for full list):
//! - HTTP RED: `brightstaff_http_requests_total`,
//! `brightstaff_http_request_duration_seconds`,
//! `brightstaff_http_in_flight_requests`.
//! - LLM upstream: `brightstaff_llm_upstream_requests_total`,
//! `brightstaff_llm_upstream_duration_seconds`,
//! `brightstaff_llm_time_to_first_token_seconds`,
//! `brightstaff_llm_tokens_total`,
//! `brightstaff_llm_tokens_usage_missing_total`.
//! - Routing: `brightstaff_router_decisions_total`,
//! `brightstaff_router_decision_duration_seconds`,
//! `brightstaff_routing_service_requests_total`,
//! `brightstaff_session_cache_events_total`.
//! - Process: via `metrics-process`.
//! - Build: `brightstaff_build_info`.
use std::net::SocketAddr;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
use tracing::{info, warn};
pub mod labels;
/// Guard flag so tests don't re-install the global recorder.
static INIT: OnceLock<()> = OnceLock::new();
const DEFAULT_METRICS_BIND: &str = "0.0.0.0:9092";
/// HTTP request duration buckets (seconds). Capped at 60s.
const HTTP_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0,
];
/// LLM upstream / TTFT buckets (seconds). Capped at 120s because provider
/// completions routinely run that long.
const LLM_BUCKETS: &[f64] = &[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0];
/// Router decision buckets (seconds). The orchestrator call itself is usually
/// sub-second but bucketed generously in case of upstream slowness.
const ROUTER_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0,
];
/// Install the global recorder and spawn the `/metrics` HTTP listener.
///
/// Safe to call more than once; subsequent calls are no-ops so tests that
/// construct their own recorder still work.
pub fn init() {
if INIT.get().is_some() {
return;
}
let bind: SocketAddr = std::env::var("METRICS_BIND_ADDRESS")
.unwrap_or_else(|_| DEFAULT_METRICS_BIND.to_string())
.parse()
.unwrap_or_else(|err| {
warn!(error = %err, default = DEFAULT_METRICS_BIND, "invalid METRICS_BIND_ADDRESS, falling back to default");
DEFAULT_METRICS_BIND.parse().expect("default bind parses")
});
let builder = PrometheusBuilder::new()
.with_http_listener(bind)
.set_buckets_for_metric(
Matcher::Full("brightstaff_http_request_duration_seconds".to_string()),
HTTP_BUCKETS,
)
.and_then(|b| {
b.set_buckets_for_metric(Matcher::Prefix("brightstaff_llm_".to_string()), LLM_BUCKETS)
})
.and_then(|b| {
b.set_buckets_for_metric(
Matcher::Full("brightstaff_router_decision_duration_seconds".to_string()),
ROUTER_BUCKETS,
)
});
let builder = match builder {
Ok(b) => b,
Err(err) => {
warn!(error = %err, "failed to configure metrics buckets, using defaults");
PrometheusBuilder::new().with_http_listener(bind)
}
};
if let Err(err) = builder.install() {
warn!(error = %err, "failed to install Prometheus recorder; metrics disabled");
return;
}
let _ = INIT.set(());
describe_all();
emit_build_info();
// Register process-level collector (RSS, CPU, FDs).
let collector = metrics_process::Collector::default();
collector.describe();
// Prime once at startup; subsequent scrapes refresh via the exporter's
// per-scrape render, so we additionally refresh on a short interval to
// keep gauges moving between scrapes without requiring client pull.
collector.collect();
tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_secs(10));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tick.tick().await;
collector.collect();
}
});
info!(address = %bind, "metrics listener started");
}
fn describe_all() {
describe_counter!(
"brightstaff_http_requests_total",
"Total HTTP requests served by brightstaff, by handler and status class."
);
describe_histogram!(
"brightstaff_http_request_duration_seconds",
"Wall-clock duration of HTTP requests served by brightstaff, by handler."
);
describe_gauge!(
"brightstaff_http_in_flight_requests",
"Number of HTTP requests currently being served by brightstaff, by handler."
);
describe_counter!(
"brightstaff_llm_upstream_requests_total",
"LLM upstream request outcomes, by provider, model, status class and error class."
);
describe_histogram!(
"brightstaff_llm_upstream_duration_seconds",
"Wall-clock duration of LLM upstream calls (stream close for streaming), by provider and model."
);
describe_histogram!(
"brightstaff_llm_time_to_first_token_seconds",
"Time from request start to first streamed byte, by provider and model (streaming only)."
);
describe_counter!(
"brightstaff_llm_tokens_total",
"Tokens reported in the provider `usage` field, by provider, model and kind (prompt/completion)."
);
describe_counter!(
"brightstaff_llm_tokens_usage_missing_total",
"LLM responses that completed without a usable `usage` block (so token counts are unknown)."
);
describe_counter!(
"brightstaff_router_decisions_total",
"Routing decisions made by the orchestrator, by route, selected model, and whether a fallback was used."
);
describe_histogram!(
"brightstaff_router_decision_duration_seconds",
"Time spent in the orchestrator deciding a route, by route."
);
describe_counter!(
"brightstaff_routing_service_requests_total",
"Outcomes of /routing/* decision requests: decision_served, no_candidates, policy_error."
);
describe_counter!(
"brightstaff_session_cache_events_total",
"Session affinity cache lookups and stores, by outcome."
);
describe_gauge!(
"brightstaff_build_info",
"Build metadata. Always 1; labels carry version and git SHA."
);
}
fn emit_build_info() {
let version = env!("CARGO_PKG_VERSION");
let git_sha = option_env!("GIT_SHA").unwrap_or("unknown");
gauge!(
"brightstaff_build_info",
"version" => version.to_string(),
"git_sha" => git_sha.to_string(),
)
.set(1.0);
}
/// Split a provider-qualified model id like `"openai/gpt-4o"` into
/// `(provider, model)`. Returns `("unknown", raw)` when there is no `/`.
pub fn split_provider_model(full: &str) -> (&str, &str) {
match full.split_once('/') {
Some((p, m)) => (p, m),
None => ("unknown", full),
}
}
/// Bucket an HTTP status code into `"2xx"` / `"4xx"` / `"5xx"` / `"1xx"` / `"3xx"`.
pub fn status_class(status: u16) -> &'static str {
match status {
100..=199 => "1xx",
200..=299 => "2xx",
300..=399 => "3xx",
400..=499 => "4xx",
500..=599 => "5xx",
_ => "other",
}
}
// ---------------------------------------------------------------------------
// HTTP RED helpers
// ---------------------------------------------------------------------------
/// RAII guard that increments the in-flight gauge on construction and
/// decrements on drop. Pair with [`HttpTimer`] in the `route()` wrapper so the
/// gauge drops even on error paths.
pub struct InFlightGuard {
handler: &'static str,
}
impl InFlightGuard {
pub fn new(handler: &'static str) -> Self {
gauge!(
"brightstaff_http_in_flight_requests",
"handler" => handler,
)
.increment(1.0);
Self { handler }
}
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
gauge!(
"brightstaff_http_in_flight_requests",
"handler" => self.handler,
)
.decrement(1.0);
}
}
/// Record the HTTP request counter + duration histogram.
pub fn record_http(handler: &'static str, method: &'static str, status: u16, started: Instant) {
let class = status_class(status);
counter!(
"brightstaff_http_requests_total",
"handler" => handler,
"method" => method,
"status_class" => class,
)
.increment(1);
histogram!(
"brightstaff_http_request_duration_seconds",
"handler" => handler,
)
.record(started.elapsed().as_secs_f64());
}
// ---------------------------------------------------------------------------
// LLM upstream helpers
// ---------------------------------------------------------------------------
/// Classify an outcome of an LLM upstream call for the `error_class` label.
pub fn llm_error_class_from_reqwest(err: &reqwest::Error) -> &'static str {
if err.is_timeout() {
"timeout"
} else if err.is_connect() {
"connect"
} else if err.is_decode() {
"parse"
} else {
"other"
}
}
/// Record the outcome of an LLM upstream call. `status` is the HTTP status
/// the upstream returned (0 if the call never produced one, e.g. send failure).
/// `error_class` is `"none"` on success, or a discriminated error label.
pub fn record_llm_upstream(
provider: &str,
model: &str,
status: u16,
error_class: &str,
duration: Duration,
) {
let class = if status == 0 {
"error"
} else {
status_class(status)
};
counter!(
"brightstaff_llm_upstream_requests_total",
"provider" => provider.to_string(),
"model" => model.to_string(),
"status_class" => class,
"error_class" => error_class.to_string(),
)
.increment(1);
histogram!(
"brightstaff_llm_upstream_duration_seconds",
"provider" => provider.to_string(),
"model" => model.to_string(),
)
.record(duration.as_secs_f64());
}
pub fn record_llm_ttft(provider: &str, model: &str, ttft: Duration) {
histogram!(
"brightstaff_llm_time_to_first_token_seconds",
"provider" => provider.to_string(),
"model" => model.to_string(),
)
.record(ttft.as_secs_f64());
}
pub fn record_llm_tokens(provider: &str, model: &str, kind: &'static str, count: u64) {
counter!(
"brightstaff_llm_tokens_total",
"provider" => provider.to_string(),
"model" => model.to_string(),
"kind" => kind,
)
.increment(count);
}
pub fn record_llm_tokens_usage_missing(provider: &str, model: &str) {
counter!(
"brightstaff_llm_tokens_usage_missing_total",
"provider" => provider.to_string(),
"model" => model.to_string(),
)
.increment(1);
}
// ---------------------------------------------------------------------------
// Router helpers
// ---------------------------------------------------------------------------
pub fn record_router_decision(
route: &'static str,
selected_model: &str,
fallback: bool,
duration: Duration,
) {
counter!(
"brightstaff_router_decisions_total",
"route" => route,
"selected_model" => selected_model.to_string(),
"fallback" => if fallback { "true" } else { "false" },
)
.increment(1);
histogram!(
"brightstaff_router_decision_duration_seconds",
"route" => route,
)
.record(duration.as_secs_f64());
}
pub fn record_routing_service_outcome(outcome: &'static str) {
counter!(
"brightstaff_routing_service_requests_total",
"outcome" => outcome,
)
.increment(1);
}
pub fn record_session_cache_event(outcome: &'static str) {
counter!(
"brightstaff_session_cache_events_total",
"outcome" => outcome,
)
.increment(1);
}

View file

@ -15,6 +15,8 @@ use super::http::{self, post_and_extract_content};
use super::model_metrics::ModelMetricsService;
use super::orchestrator_model::OrchestratorModel;
use crate::metrics as bs_metrics;
use crate::metrics::labels as metric_labels;
use crate::router::orchestrator_model_v1;
use crate::session_cache::SessionCache;
@ -130,7 +132,13 @@ impl OrchestratorService {
tenant_id: Option<&str>,
) -> Option<CachedRoute> {
let cache = self.session_cache.as_ref()?;
cache.get(&Self::session_key(tenant_id, session_id)).await
let result = cache.get(&Self::session_key(tenant_id, session_id)).await;
bs_metrics::record_session_cache_event(if result.is_some() {
metric_labels::SESSION_CACHE_HIT
} else {
metric_labels::SESSION_CACHE_MISS
});
result
}
pub async fn cache_route(
@ -151,6 +159,7 @@ impl OrchestratorService {
self.session_ttl,
)
.await;
bs_metrics::record_session_cache_event(metric_labels::SESSION_CACHE_STORE);
}
}

View file

@ -265,11 +265,8 @@ fn assess_quality(
if interaction.disengagement.count > 0 {
score -= interaction.disengagement.severity as f32 * 10.0;
}
if interaction.misalignment.severity > 0 {
let denom = user_turns.max(1) as f32;
if interaction.misalignment.count as f32 / denom > 0.3 {
score -= 15.0;
}
if interaction.misalignment.severity > 0 && interaction.misalignment_ratio(user_turns) > 0.3 {
score -= 15.0;
}
if interaction.stagnation.count > 2 {
score -= interaction.stagnation.severity as f32 * 8.0;
@ -301,6 +298,24 @@ fn assess_quality(
(quality, score)
}
/// Render the per-conversation summary string.
///
/// Output is structurally grouped by the paper taxonomy so a reader can see
/// at a glance which layer fired:
///
/// ```text
/// Overall Quality: severe | Turns: 7 (efficiency: 71.4%)
/// | Interaction — misalignment: 2 (sev 1), stagnation: 0, disengagement: 2 (sev 1), satisfaction: 0
/// | Execution — failure: 0, loops: 0
/// | Environment — exhaustion: 0
/// | High misalignment rate: 50.0% of user turns
/// | Escalation requested: 1
/// ```
///
/// Layer headers are always present (even when their counts are all zero) so
/// the taxonomy is visible by inspection. Quality-driving callouts —
/// "high misalignment rate", "looping detected", "escalation requested" —
/// are appended after the layer summary as a separate "alerts" tail.
fn generate_summary(
turn_metrics: &TurnMetrics,
interaction: &InteractionSignals,
@ -311,57 +326,43 @@ fn generate_summary(
let mut parts: Vec<String> = Vec::new();
parts.push(format!("Overall Quality: {}", quality.as_str()));
parts.push(format!(
"Turn Count: {} turns (efficiency: {:.1}%)",
"Turns: {} (efficiency: {:.1}%)",
turn_metrics.total_turns,
turn_metrics.efficiency_score * 100.0
));
parts.push(format!(
"Interaction \u{2014} {}, {}, {}, {}",
fmt_group("misalignment", &interaction.misalignment),
fmt_group("stagnation", &interaction.stagnation),
fmt_group("disengagement", &interaction.disengagement),
fmt_group("satisfaction", &interaction.satisfaction),
));
parts.push(format!(
"Execution \u{2014} {}, {}",
fmt_group("failure", &execution.failure),
fmt_group("loops", &execution.loops),
));
parts.push(format!(
"Environment \u{2014} {}",
fmt_group("exhaustion", &environment.exhaustion),
));
if interaction.misalignment.count > 0 {
let denom = turn_metrics.user_turns.max(1) as f32;
let repair_ratio = interaction.misalignment.count as f32 / denom;
if repair_ratio > 0.3 {
let misalignment_ratio = interaction.misalignment_ratio(turn_metrics.user_turns);
if misalignment_ratio > 0.3 {
parts.push(format!(
"High misalignment rate: {:.1}% of user turns",
repair_ratio * 100.0
misalignment_ratio * 100.0
));
}
}
if interaction.disengagement.count > 0 {
parts.push(format!(
"Disengagement detected: {} indicators (severity: {})",
interaction.disengagement.count, interaction.disengagement.severity
));
}
if interaction.stagnation.count > 2 {
parts.push(format!(
"Looping detected: {} repetitions",
interaction.stagnation.count
));
}
if interaction.satisfaction.count > 0 {
parts.push(format!(
"Positive feedback: {} indicators",
interaction.satisfaction.count
));
}
if execution.failure.count > 0 {
parts.push(format!(
"Execution failures: {} (agent-caused)",
execution.failure.count
));
}
if environment.exhaustion.count > 0 {
parts.push(format!(
"Environment issues: {} (external)",
environment.exhaustion.count
));
}
let escalation_count = interaction
.disengagement
.signals
@ -369,15 +370,22 @@ fn generate_summary(
.filter(|s| matches!(s.signal_type, SignalType::DisengagementEscalation))
.count();
if escalation_count > 0 {
parts.push(format!(
"Escalation requested: {} requests",
escalation_count
));
parts.push(format!("Escalation requested: {}", escalation_count));
}
parts.join(" | ")
}
/// Render `"<name>: <count> (sev <severity>)"`, dropping the severity suffix
/// when the count is zero (keeps the summary readable for clean conversations).
fn fmt_group(name: &str, group: &super::schemas::SignalGroup) -> String {
if group.count == 0 {
format!("{}: 0", name)
} else {
format!("{}: {} (sev {})", name, group.count, group.severity)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -473,6 +481,59 @@ mod tests {
assert!(r.interaction.satisfaction.count > 0);
}
#[test]
fn summary_groups_signals_by_taxonomy() {
// Even on a clean conversation the summary should expose the three
// layer headers so the taxonomy is visible.
let msgs = vec![
user("Hello"),
assistant("Hi! How can I help?"),
user("What's 2 + 2?"),
assistant("4"),
];
let r = SignalAnalyzer::default().analyze_openai(&msgs);
assert!(
r.summary.contains("Interaction \u{2014}"),
"missing Interaction header in: {}",
r.summary
);
assert!(
r.summary.contains("Execution \u{2014}"),
"missing Execution header in: {}",
r.summary
);
assert!(
r.summary.contains("Environment \u{2014}"),
"missing Environment header in: {}",
r.summary
);
assert!(r.summary.contains("misalignment: 0"));
assert!(r.summary.contains("loops: 0"));
assert!(r.summary.contains("exhaustion: 0"));
}
#[test]
fn summary_includes_severity_when_signals_fire() {
let msgs = vec![
user("This isn't helpful at all"),
assistant("I'm sorry, can you tell me more?"),
user("Get me a human, this is useless"),
];
let r = SignalAnalyzer::default().analyze_openai(&msgs);
// Disengagement fires; should render with `(sev N)` and the
// escalation-requested alert tail.
assert!(
r.summary.contains("disengagement:") && r.summary.contains("(sev "),
"expected severity rendered for disengagement: {}",
r.summary
);
assert!(
r.summary.contains("Escalation requested:"),
"expected escalation alert in: {}",
r.summary
);
}
#[test]
fn execution_failures_lower_quality() {
let msgs = vec![ShareGptMessage {

View file

@ -272,6 +272,17 @@ impl Default for InteractionSignals {
}
}
impl InteractionSignals {
/// Ratio of misalignment instances to user turns. Used as a quality
/// scoring input and as a threshold for the "high misalignment rate"
/// summary callout. Mirrors `misalignment.count / max(user_turns, 1)`
/// from the Python reference's `_assess_quality` and `_generate_summary`.
pub fn misalignment_ratio(&self, user_turns: usize) -> f32 {
let denom = user_turns.max(1) as f32;
self.misalignment.count as f32 / denom
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionSignals {
pub failure: SignalGroup,

View file

@ -20,6 +20,8 @@ const STREAM_BUFFER_SIZE: usize = 16;
/// Most chat responses are well under this; pathological ones are dropped without
/// affecting pass-through streaming to the client.
const USAGE_BUFFER_MAX: usize = 2 * 1024 * 1024;
use crate::metrics as bs_metrics;
use crate::metrics::labels as metric_labels;
use crate::signals::otel::emit_signals_to_span;
use crate::signals::{SignalAnalyzer, FLAG_MARKER};
use crate::tracing::{llm, set_service_name};
@ -173,6 +175,18 @@ impl StreamProcessor for Box<dyn StreamProcessor> {
}
}
/// Optional Prometheus-metric context for an LLM upstream call. When present,
/// [`ObservableStreamProcessor`] emits `brightstaff_llm_*` metrics at
/// first-byte / complete / error callbacks.
#[derive(Debug, Clone)]
pub struct LlmMetricsCtx {
pub provider: String,
pub model: String,
/// HTTP status of the upstream response. Used to pick `status_class` and
/// `error_class` on `on_complete`.
pub upstream_status: u16,
}
/// A processor that tracks streaming metrics
pub struct ObservableStreamProcessor {
service_name: String,
@ -186,6 +200,8 @@ pub struct ObservableStreamProcessor {
/// on `on_complete`. Capped at `USAGE_BUFFER_MAX`; excess chunks are dropped
/// from the buffer (they still pass through to the client).
response_buffer: Vec<u8>,
llm_metrics: Option<LlmMetricsCtx>,
metrics_recorded: bool,
}
impl ObservableStreamProcessor {
@ -220,8 +236,17 @@ impl ObservableStreamProcessor {
time_to_first_token: None,
messages,
response_buffer: Vec::new(),
llm_metrics: None,
metrics_recorded: false,
}
}
/// Attach LLM upstream metric context so the processor emits
/// `brightstaff_llm_*` metrics on first-byte / complete / error.
pub fn with_llm_metrics(mut self, ctx: LlmMetricsCtx) -> Self {
self.llm_metrics = Some(ctx);
self
}
}
impl StreamProcessor for ObservableStreamProcessor {
@ -241,7 +266,11 @@ impl StreamProcessor for ObservableStreamProcessor {
fn on_first_bytes(&mut self) {
// Record time to first token (only for streaming)
if self.time_to_first_token.is_none() {
self.time_to_first_token = Some(self.start_time.elapsed().as_millis());
let elapsed = self.start_time.elapsed();
self.time_to_first_token = Some(elapsed.as_millis());
if let Some(ref ctx) = self.llm_metrics {
bs_metrics::record_llm_ttft(&ctx.provider, &ctx.model, elapsed);
}
}
}
@ -300,6 +329,39 @@ impl StreamProcessor for ObservableStreamProcessor {
otel_span.set_attribute(KeyValue::new(llm::MODEL_NAME, resolved));
}
}
// Emit LLM upstream prometheus metrics (duration + tokens) if wired.
// The upstream responded (we have a status), so status_class alone
// carries the non-2xx signal — error_class stays "none".
if let Some(ref ctx) = self.llm_metrics {
bs_metrics::record_llm_upstream(
&ctx.provider,
&ctx.model,
ctx.upstream_status,
metric_labels::LLM_ERR_NONE,
self.start_time.elapsed(),
);
if let Some(v) = usage.prompt_tokens {
bs_metrics::record_llm_tokens(
&ctx.provider,
&ctx.model,
metric_labels::TOKEN_KIND_PROMPT,
v.max(0) as u64,
);
}
if let Some(v) = usage.completion_tokens {
bs_metrics::record_llm_tokens(
&ctx.provider,
&ctx.model,
metric_labels::TOKEN_KIND_COMPLETION,
v.max(0) as u64,
);
}
if usage.prompt_tokens.is_none() && usage.completion_tokens.is_none() {
bs_metrics::record_llm_tokens_usage_missing(&ctx.provider, &ctx.model);
}
self.metrics_recorded = true;
}
// Release the buffered bytes early; nothing downstream needs them.
self.response_buffer.clear();
self.response_buffer.shrink_to_fit();
@ -339,6 +401,18 @@ impl StreamProcessor for ObservableStreamProcessor {
duration_ms = self.start_time.elapsed().as_millis(),
"stream error"
);
if let Some(ref ctx) = self.llm_metrics {
if !self.metrics_recorded {
bs_metrics::record_llm_upstream(
&ctx.provider,
&ctx.model,
ctx.upstream_status,
metric_labels::LLM_ERR_STREAM,
self.start_time.elapsed(),
);
self.metrics_recorded = true;
}
}
}
}

View file

@ -75,3 +75,54 @@ are some sample configuration files for both, respectively.
isDefault: true
access: proxy
editable: true
Brightstaff metrics
~~~~~~~~~~~~~~~~~~~
In addition to Envoy's stats on ``:9901``, the brightstaff dataplane
process exposes its own Prometheus endpoint on ``0.0.0.0:9092`` (override
with ``METRICS_BIND_ADDRESS``). It publishes:
* HTTP RED — ``brightstaff_http_requests_total``,
``brightstaff_http_request_duration_seconds``,
``brightstaff_http_in_flight_requests`` (labels: ``handler``, ``method``,
``status_class``).
* LLM upstream — ``brightstaff_llm_upstream_requests_total``,
``brightstaff_llm_upstream_duration_seconds``,
``brightstaff_llm_time_to_first_token_seconds``,
``brightstaff_llm_tokens_total`` (labels: ``provider``, ``model``,
``error_class``, ``kind``).
* Routing — ``brightstaff_router_decisions_total``,
``brightstaff_router_decision_duration_seconds``,
``brightstaff_routing_service_requests_total``,
``brightstaff_session_cache_events_total``.
* Process & build — ``process_resident_memory_bytes``,
``process_cpu_seconds_total``, ``brightstaff_build_info``.
A self-contained Prometheus + Grafana stack is shipped under
``config/grafana/``. With Plano already running on the host, bring it up
with one command:
.. code-block:: bash
cd config/grafana
docker compose up -d
open http://localhost:3000 # admin / admin (anonymous viewer also enabled)
Grafana auto-loads the Prometheus datasource and the brightstaff
dashboard (look under the *Plano* folder). Prometheus scrapes the host's
``:9092`` and ``:9901`` via ``host.docker.internal``.
Files:
* ``config/grafana/docker-compose.yaml`` — one-command Prom + Grafana
stack with provisioning.
* ``config/grafana/prometheus_scrape.yaml`` — complete Prometheus config
with ``envoy`` and ``brightstaff`` scrape jobs (mounted by the
compose).
* ``config/grafana/brightstaff_dashboard.json`` — 19-panel dashboard
across HTTP RED, LLM upstream, Routing service, and Process & Envoy
link rows. Auto-provisioned by the compose; can also be imported by
hand via *Dashboards → New → Import*.
* ``config/grafana/provisioning/`` — Grafana provisioning files for the
datasource and dashboard provider.

View file

@ -6,6 +6,7 @@ hand-picked set of conversations without touching the lmsys dataset.
Run from this directory:
python _smoke_test.py --rust-binary <path>
"""
from __future__ import annotations
import argparse
@ -38,7 +39,10 @@ SAMPLES = [
"messages": [
{"from": "human", "value": "Book me a flight to NYC for tomorrow"},
{"from": "gpt", "value": "Sure, here are flights to NYC for Friday."},
{"from": "human", "value": "No, I meant flights for Saturday, not tomorrow"},
{
"from": "human",
"value": "No, I meant flights for Saturday, not tomorrow",
},
],
},
{
@ -75,7 +79,9 @@ def main() -> int:
f.write(json.dumps(s) + "\n")
with conv_path.open("rb") as fin, rust_path.open("wb") as fout:
proc = subprocess.run([str(args.rust_binary)], stdin=fin, stdout=fout, stderr=subprocess.PIPE)
proc = subprocess.run(
[str(args.rust_binary)], stdin=fin, stdout=fout, stderr=subprocess.PIPE
)
if proc.returncode != 0:
sys.stderr.write(proc.stderr.decode("utf-8", errors="replace"))
return 2

View file

@ -5,6 +5,7 @@ Diff Rust vs Python signal reports produced by run_parity.py.
See README.md for the tier definitions. Exits non-zero iff any Tier-A
divergence is found.
"""
from __future__ import annotations
import argparse
@ -15,7 +16,12 @@ from pathlib import Path
from typing import Any, Dict, List, Tuple
CATEGORIES_BY_LAYER = {
"interaction_signals": ["misalignment", "stagnation", "disengagement", "satisfaction"],
"interaction_signals": [
"misalignment",
"stagnation",
"disengagement",
"satisfaction",
],
"execution_signals": ["failure", "loops"],
"environment_signals": ["exhaustion"],
}
@ -69,9 +75,7 @@ def per_type_indices(report: Dict[str, Any]) -> Dict[str, List[int]]:
return dict(out)
def diff_counts(
a: Dict[str, int], b: Dict[str, int]
) -> List[Tuple[str, int, int]]:
def diff_counts(a: Dict[str, int], b: Dict[str, int]) -> List[Tuple[str, int, int]]:
"""Return [(signal_type, a_count, b_count)] for entries that differ."""
keys = set(a) | set(b)
out = []
@ -242,7 +246,12 @@ def main() -> int:
write_summary_md(out_dir / "summary.md", metrics, diffs[:20])
print(json.dumps({k: v for k, v in metrics.items() if k != "quality_confusion_matrix"}, indent=2))
print(
json.dumps(
{k: v for k, v in metrics.items() if k != "quality_confusion_matrix"},
indent=2,
)
)
print(f"\ndiffs: {out_dir / 'diffs.jsonl'} metrics: {out_dir / 'metrics.json'}")
print(f"summary: {out_dir / 'summary.md'}")
@ -252,7 +261,9 @@ def main() -> int:
return 0
def write_summary_md(path: Path, metrics: Dict[str, Any], sample_diffs: List[Dict[str, Any]]) -> None:
def write_summary_md(
path: Path, metrics: Dict[str, Any], sample_diffs: List[Dict[str, Any]]
) -> None:
lines: List[str] = []
lines.append("# Signals Parity Report")
lines.append("")
@ -299,7 +310,9 @@ def write_summary_md(path: Path, metrics: Dict[str, Any], sample_diffs: List[Dic
lines.append("| | " + " | ".join(labels) + " |")
lines.append("|---|" + "|".join(["---:"] * len(labels)) + "|")
for r in labels:
lines.append(f"| {r} | " + " | ".join(str(cm[r].get(c, 0)) for c in labels) + " |")
lines.append(
f"| {r} | " + " | ".join(str(cm[r].get(c, 0)) for c in labels) + " |"
)
lines.append("")
if sample_diffs:

View file

@ -14,6 +14,7 @@ Usage:
--rust-binary ../../../crates/target/release/signals_replay \\
--output-dir out/
"""
from __future__ import annotations
import argparse
@ -30,7 +31,10 @@ try:
import pyarrow.parquet as pq
from huggingface_hub import hf_hub_download, list_repo_files
except ImportError:
print("error: install dependencies first: pip install -r requirements.txt", file=sys.stderr)
print(
"error: install dependencies first: pip install -r requirements.txt",
file=sys.stderr,
)
sys.exit(2)
try:
@ -46,6 +50,7 @@ except ImportError:
try:
from tqdm import tqdm
except ImportError:
def tqdm(it, **_kwargs): # type: ignore[no-redef]
return it
@ -154,7 +159,10 @@ def sample_conversations(
pf = pq.ParquetFile(str(p))
shard_row_counts.append(pf.metadata.num_rows)
total_rows = sum(shard_row_counts)
print(f"dataset has {total_rows:,} rows across {len(local_paths)} shards", file=sys.stderr)
print(
f"dataset has {total_rows:,} rows across {len(local_paths)} shards",
file=sys.stderr,
)
rng = random.Random(seed)
global_indices = sorted(rng.sample(range(total_rows), num_samples))
@ -255,9 +263,13 @@ def stamp_metadata(args: argparse.Namespace, output_dir: Path, n_samples: int) -
"""Write the input metadata so compare.py can include it in the report."""
binary_sha = hashlib.sha256(args.rust_binary.read_bytes()).hexdigest()
try:
plano_sha = subprocess.check_output(
["git", "rev-parse", "HEAD"], cwd=Path(__file__).parent
).decode().strip()
plano_sha = (
subprocess.check_output(
["git", "rev-parse", "HEAD"], cwd=Path(__file__).parent
)
.decode()
.strip()
)
except Exception:
plano_sha = "unknown"
try:
@ -265,7 +277,11 @@ def stamp_metadata(args: argparse.Namespace, output_dir: Path, n_samples: int) -
[sys.executable, "-m", "pip", "show", "signals"]
).decode()
signals_version = next(
(l.split(":", 1)[1].strip() for l in signals_version.splitlines() if l.startswith("Version")),
(
l.split(":", 1)[1].strip()
for l in signals_version.splitlines()
if l.startswith("Version")
),
"unknown",
)
except Exception: