From d30018cf35a2e439b9c9b2dbb380dc28f7509ff0 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Fri, 17 Apr 2026 00:52:46 -0700 Subject: [PATCH] add planoai obs: live LLM observability TUI --- cli/planoai/main.py | 2 + cli/planoai/obs/__init__.py | 6 + cli/planoai/obs/collector.py | 281 +++++++++++++++ cli/planoai/obs/pricing.py | 276 +++++++++++++++ cli/planoai/obs/render.py | 324 ++++++++++++++++++ cli/planoai/obs_cmd.py | 99 ++++++ cli/planoai/rich_click_config.py | 2 +- cli/test/test_obs_collector.py | 141 ++++++++ cli/test/test_obs_pricing.py | 103 ++++++ cli/test/test_obs_render.py | 73 ++++ crates/brightstaff/src/handlers/llm/mod.rs | 70 +++- crates/brightstaff/src/streaming.rs | 231 +++++++++++++ crates/brightstaff/src/tracing/constants.rs | 28 ++ crates/brightstaff/src/tracing/mod.rs | 2 +- crates/hermesllm/src/apis/anthropic.rs | 6 + crates/hermesllm/src/apis/openai.rs | 12 + crates/hermesllm/src/apis/openai_responses.rs | 12 + crates/hermesllm/src/providers/response.rs | 37 ++ docs/source/get_started/quickstart.rst | 36 ++ 19 files changed, 1736 insertions(+), 5 deletions(-) create mode 100644 cli/planoai/obs/__init__.py create mode 100644 cli/planoai/obs/collector.py create mode 100644 cli/planoai/obs/pricing.py create mode 100644 cli/planoai/obs/render.py create mode 100644 cli/planoai/obs_cmd.py create mode 100644 cli/test/test_obs_collector.py create mode 100644 cli/test/test_obs_pricing.py create mode 100644 cli/test/test_obs_render.py diff --git a/cli/planoai/main.py b/cli/planoai/main.py index 3e094a69..5686b0ff 100644 --- a/cli/planoai/main.py +++ b/cli/planoai/main.py @@ -37,6 +37,7 @@ from planoai.core import ( ) from planoai.init_cmd import init as init_cmd from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_background +from planoai.obs_cmd import obs as obs_cmd from planoai.consts import ( DEFAULT_OTEL_TRACING_GRPC_ENDPOINT, DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT, @@ -714,6 +715,7 @@ main.add_command(cli_agent) main.add_command(generate_prompt_targets) main.add_command(init_cmd, name="init") main.add_command(trace_cmd, name="trace") +main.add_command(obs_cmd, name="obs") if __name__ == "__main__": main() diff --git a/cli/planoai/obs/__init__.py b/cli/planoai/obs/__init__.py new file mode 100644 index 00000000..2f4e14af --- /dev/null +++ b/cli/planoai/obs/__init__.py @@ -0,0 +1,6 @@ +"""Plano observability console: in-memory live view of LLM traffic.""" + +from planoai.obs.collector import LLMCall, LLMCallStore, ObsCollector +from planoai.obs.pricing import PricingCatalog + +__all__ = ["LLMCall", "LLMCallStore", "ObsCollector", "PricingCatalog"] diff --git a/cli/planoai/obs/collector.py b/cli/planoai/obs/collector.py new file mode 100644 index 00000000..9308c4b6 --- /dev/null +++ b/cli/planoai/obs/collector.py @@ -0,0 +1,281 @@ +"""In-memory collector for LLM calls, fed by OTLP/gRPC spans from brightstaff.""" + +from __future__ import annotations + +import threading +from collections import deque +from concurrent import futures +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Iterable + +import grpc +from opentelemetry.proto.collector.trace.v1 import ( + trace_service_pb2, + trace_service_pb2_grpc, +) + + +DEFAULT_GRPC_PORT = 4317 +DEFAULT_CAPACITY = 1000 + + +@dataclass +class LLMCall: + """One LLM call as reconstructed from a brightstaff LLM span. + + Fields default to ``None`` when the underlying span attribute was absent. + """ + + request_id: str + timestamp: datetime + model: str + provider: str | None = None + request_model: str | None = None + session_id: str | None = None + route_name: str | None = None + is_streaming: bool | None = None + status_code: int | None = None + prompt_tokens: int | None = None + completion_tokens: int | None = None + total_tokens: int | None = None + cached_input_tokens: int | None = None + cache_creation_tokens: int | None = None + reasoning_tokens: int | None = None + ttft_ms: float | None = None + duration_ms: float | None = None + routing_strategy: str | None = None + routing_reason: str | None = None + cost_usd: float | None = None + + @property + def tpt_ms(self) -> float | None: + if self.duration_ms is None or self.completion_tokens in (None, 0): + return None + ttft = self.ttft_ms or 0.0 + generate_ms = max(0.0, self.duration_ms - ttft) + if generate_ms <= 0: + return None + return generate_ms / self.completion_tokens + + @property + def tokens_per_sec(self) -> float | None: + tpt = self.tpt_ms + if tpt is None or tpt <= 0: + return None + return 1000.0 / tpt + + +class LLMCallStore: + """Thread-safe ring buffer of recent LLM calls.""" + + def __init__(self, capacity: int = DEFAULT_CAPACITY) -> None: + self._capacity = capacity + self._calls: deque[LLMCall] = deque(maxlen=capacity) + self._lock = threading.Lock() + + @property + def capacity(self) -> int: + return self._capacity + + def add(self, call: LLMCall) -> None: + with self._lock: + self._calls.append(call) + + def clear(self) -> None: + with self._lock: + self._calls.clear() + + def snapshot(self) -> list[LLMCall]: + with self._lock: + return list(self._calls) + + def __len__(self) -> int: + with self._lock: + return len(self._calls) + + +# Attribute keys mirror crates/brightstaff/src/tracing/constants.rs. +_LLM_MODEL = "llm.model" +_LLM_PROVIDER = "llm.provider" +_LLM_IS_STREAMING = "llm.is_streaming" +_LLM_DURATION_MS = "llm.duration_ms" +_LLM_TTFT_MS = "llm.time_to_first_token" +_LLM_PROMPT_TOKENS = "llm.usage.prompt_tokens" +_LLM_COMPLETION_TOKENS = "llm.usage.completion_tokens" +_LLM_TOTAL_TOKENS = "llm.usage.total_tokens" +_LLM_CACHED_INPUT_TOKENS = "llm.usage.cached_input_tokens" +_LLM_CACHE_CREATION_TOKENS = "llm.usage.cache_creation_tokens" +_LLM_REASONING_TOKENS = "llm.usage.reasoning_tokens" +_HTTP_STATUS = "http.status_code" +_MODEL_REQUESTED = "model.requested" +_PLANO_SESSION_ID = "plano.session_id" +_PLANO_ROUTE_NAME = "plano.route.name" +_ROUTING_STRATEGY = "routing.strategy" +_ROUTING_SELECTION_REASON = "routing.selection_reason" +_REQUEST_ID_KEYS = ("request_id", "http.request_id") + + +def _anyvalue_to_python(value: Any) -> Any: # AnyValue from OTLP + kind = value.WhichOneof("value") + if kind == "string_value": + return value.string_value + if kind == "bool_value": + return value.bool_value + if kind == "int_value": + return value.int_value + if kind == "double_value": + return value.double_value + return None + + +def _attrs_to_dict(attrs: Iterable[Any]) -> dict[str, Any]: + out: dict[str, Any] = {} + for kv in attrs: + py = _anyvalue_to_python(kv.value) + if py is not None: + out[kv.key] = py + return out + + +def _maybe_int(value: Any) -> int | None: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _maybe_float(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def span_to_llm_call( + span: Any, service_name: str, pricing: Any | None = None +) -> LLMCall | None: + """Convert an OTLP span into an LLMCall, or return None if it isn't one. + + A span is considered an LLM call iff it carries the ``llm.model`` attribute. + """ + attrs = _attrs_to_dict(span.attributes) + model = attrs.get(_LLM_MODEL) + if not model: + return None + + # Prefer explicit span attributes; fall back to likely aliases. + request_id = next( + ( + str(attrs[key]) + for key in _REQUEST_ID_KEYS + if key in attrs and attrs[key] is not None + ), + span.span_id.hex() if span.span_id else "", + ) + start_ns = span.start_time_unix_nano or 0 + ts = ( + datetime.fromtimestamp(start_ns / 1_000_000_000, tz=timezone.utc).astimezone() + if start_ns + else datetime.now().astimezone() + ) + + call = LLMCall( + request_id=str(request_id), + timestamp=ts, + model=str(model), + provider=str(attrs[_LLM_PROVIDER]) if _LLM_PROVIDER in attrs else service_name, + request_model=( + str(attrs[_MODEL_REQUESTED]) if _MODEL_REQUESTED in attrs else None + ), + session_id=( + str(attrs[_PLANO_SESSION_ID]) if _PLANO_SESSION_ID in attrs else None + ), + route_name=( + str(attrs[_PLANO_ROUTE_NAME]) if _PLANO_ROUTE_NAME in attrs else None + ), + is_streaming=bool(attrs[_LLM_IS_STREAMING]) + if _LLM_IS_STREAMING in attrs + else None, + status_code=_maybe_int(attrs.get(_HTTP_STATUS)), + prompt_tokens=_maybe_int(attrs.get(_LLM_PROMPT_TOKENS)), + completion_tokens=_maybe_int(attrs.get(_LLM_COMPLETION_TOKENS)), + total_tokens=_maybe_int(attrs.get(_LLM_TOTAL_TOKENS)), + cached_input_tokens=_maybe_int(attrs.get(_LLM_CACHED_INPUT_TOKENS)), + cache_creation_tokens=_maybe_int(attrs.get(_LLM_CACHE_CREATION_TOKENS)), + reasoning_tokens=_maybe_int(attrs.get(_LLM_REASONING_TOKENS)), + ttft_ms=_maybe_float(attrs.get(_LLM_TTFT_MS)), + duration_ms=_maybe_float(attrs.get(_LLM_DURATION_MS)), + routing_strategy=( + str(attrs[_ROUTING_STRATEGY]) if _ROUTING_STRATEGY in attrs else None + ), + routing_reason=( + str(attrs[_ROUTING_SELECTION_REASON]) + if _ROUTING_SELECTION_REASON in attrs + else None + ), + ) + + if pricing is not None: + call.cost_usd = pricing.cost_for_call(call) + + return call + + +class _ObsServicer(trace_service_pb2_grpc.TraceServiceServicer): + def __init__(self, store: LLMCallStore, pricing: Any | None) -> None: + self._store = store + self._pricing = pricing + + def Export(self, request, context): # noqa: N802 — gRPC generated name + for resource_spans in request.resource_spans: + service_name = "unknown" + for attr in resource_spans.resource.attributes: + if attr.key == "service.name": + val = _anyvalue_to_python(attr.value) + if val is not None: + service_name = str(val) + break + for scope_spans in resource_spans.scope_spans: + for span in scope_spans.spans: + call = span_to_llm_call(span, service_name, self._pricing) + if call is not None: + self._store.add(call) + return trace_service_pb2.ExportTraceServiceResponse() + + +@dataclass +class ObsCollector: + """Owns the OTLP/gRPC server and the in-memory LLMCall ring buffer.""" + + store: LLMCallStore = field(default_factory=LLMCallStore) + pricing: Any | None = None + host: str = "0.0.0.0" + port: int = DEFAULT_GRPC_PORT + _server: grpc.Server | None = field(default=None, init=False, repr=False) + + def start(self) -> None: + if self._server is not None: + return + server = grpc.server(futures.ThreadPoolExecutor(max_workers=4)) + trace_service_pb2_grpc.add_TraceServiceServicer_to_server( + _ObsServicer(self.store, self.pricing), server + ) + address = f"{self.host}:{self.port}" + bound = server.add_insecure_port(address) + if bound == 0: + raise OSError( + f"Failed to bind OTLP listener on {address}: port already in use. " + "Stop `planoai trace listen` or pick another port with --port." + ) + server.start() + self._server = server + + def stop(self, grace: float = 2.0) -> None: + if self._server is not None: + self._server.stop(grace) + self._server = None diff --git a/cli/planoai/obs/pricing.py b/cli/planoai/obs/pricing.py new file mode 100644 index 00000000..5c36c751 --- /dev/null +++ b/cli/planoai/obs/pricing.py @@ -0,0 +1,276 @@ +"""DigitalOcean Gradient pricing catalog for the obs console. + +Ported loosely from ``crates/brightstaff/src/router/model_metrics.rs::fetch_do_pricing``. +Single-source: one fetch at startup, cached for the life of the process. +""" + +from __future__ import annotations + +import logging +import threading +from dataclasses import dataclass +from typing import Any + +import requests + + +DEFAULT_PRICING_URL = "https://api.digitalocean.com/v2/gen-ai/models/catalog" +FETCH_TIMEOUT_SECS = 5.0 + + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class ModelPrice: + """Input/output $/token rates. Token counts are multiplied by these.""" + + input_per_token_usd: float + output_per_token_usd: float + cached_input_per_token_usd: float | None = None + + +class PricingCatalog: + """In-memory pricing lookup keyed by model id. + + DO's catalog uses ids like ``openai-gpt-5.4``; Plano's resolved model names + may arrive as ``do/openai-gpt-5.4`` or bare ``openai-gpt-5.4``. We strip the + leading provider prefix when looking up. + """ + + def __init__(self, prices: dict[str, ModelPrice] | None = None) -> None: + self._prices: dict[str, ModelPrice] = prices or {} + self._lock = threading.Lock() + + def __len__(self) -> int: + with self._lock: + return len(self._prices) + + def sample_models(self, n: int = 5) -> list[str]: + with self._lock: + return list(self._prices.keys())[:n] + + @classmethod + def fetch( + cls, + url: str = DEFAULT_PRICING_URL, + api_key: str | None = None, + ) -> "PricingCatalog": + """Fetch pricing from DO's catalog endpoint. On failure, returns an + empty catalog (cost column will be blank). + + The catalog endpoint requires a DigitalOcean Personal Access Token — + this is *not* the same as the inference ``MODEL_ACCESS_KEY`` used at + runtime. We check ``DIGITALOCEAN_TOKEN`` first (standard DO CLI env + var), then ``DO_PAT``, then fall back to ``DO_API_KEY``. + """ + import os + + headers = {} + token = ( + api_key + or os.environ.get("DIGITALOCEAN_TOKEN") + or os.environ.get("DO_PAT") + or os.environ.get("DO_API_KEY") + ) + if token: + headers["Authorization"] = f"Bearer {token}" + + try: + resp = requests.get(url, headers=headers, timeout=FETCH_TIMEOUT_SECS) + resp.raise_for_status() + data = resp.json() + except Exception as exc: # noqa: BLE001 — best-effort; never fatal + logger.warning( + "DO pricing fetch failed: %s; cost column will be blank. " + "Set DIGITALOCEAN_TOKEN with a DO Personal Access Token to " + "enable cost.", + exc, + ) + return cls() + + prices = _parse_do_pricing(data) + if not prices: + # Dump the first entry's raw shape so we can see which fields DO + # actually returned — helps when the catalog adds new fields or + # the response doesn't match our parser. + import json as _json + + sample_items = _coerce_items(data) + sample = sample_items[0] if sample_items else data + logger.warning( + "DO pricing response had no parseable entries; cost column " + "will be blank. Sample entry: %s", + _json.dumps(sample, default=str)[:400], + ) + return cls(prices) + + def price_for(self, model_name: str | None) -> ModelPrice | None: + if not model_name: + return None + with self._lock: + # Try the full name first, then stripped prefix, then lowercased variants. + for candidate in _model_key_candidates(model_name): + hit = self._prices.get(candidate) + if hit is not None: + return hit + return None + + def cost_for_call(self, call: Any) -> float | None: + """Compute USD cost for an LLMCall. Returns None when pricing is unknown.""" + price = self.price_for(getattr(call, "model", None)) or self.price_for( + getattr(call, "request_model", None) + ) + if price is None: + return None + prompt = int(getattr(call, "prompt_tokens", 0) or 0) + completion = int(getattr(call, "completion_tokens", 0) or 0) + cached = int(getattr(call, "cached_input_tokens", 0) or 0) + + # Cached input tokens are priced separately at the cached rate when known; + # otherwise they're already counted in prompt tokens at the regular rate. + fresh_prompt = prompt + if price.cached_input_per_token_usd is not None and cached: + fresh_prompt = max(0, prompt - cached) + cost_cached = cached * price.cached_input_per_token_usd + else: + cost_cached = 0.0 + + cost = ( + fresh_prompt * price.input_per_token_usd + + completion * price.output_per_token_usd + + cost_cached + ) + return round(cost, 6) + + +def _model_key_candidates(model_name: str) -> list[str]: + base = model_name.strip() + out = [base] + if "/" in base: + out.append(base.split("/", 1)[1]) + out.extend([v.lower() for v in list(out)]) + # Dedup while preserving order. + seen: set[str] = set() + uniq = [] + for key in out: + if key not in seen: + seen.add(key) + uniq.append(key) + return uniq + + +def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]: + """Parse DO catalog response into a ModelPrice map keyed by model id. + + DO's shape (as of 2026-04): + { + "data": [ + {"model_id": "openai-gpt-5.4", + "pricing": {"input_price_per_million": 5.0, + "output_price_per_million": 15.0}}, + ... + ] + } + + Older/alternate shapes are also accepted (flat top-level fields, or the + ``id``/``model``/``name`` key). + """ + prices: dict[str, ModelPrice] = {} + items = _coerce_items(data) + for item in items: + model_id = ( + item.get("model_id") + or item.get("id") + or item.get("model") + or item.get("name") + ) + if not model_id: + continue + + # DO nests rates under `pricing`; try that first, then fall back to + # top-level fields for alternate response shapes. + sources = [item] + if isinstance(item.get("pricing"), dict): + sources.insert(0, item["pricing"]) + + input_rate = _extract_rate_from_sources( + sources, + ["input_per_token", "input_token_price", "price_input"], + ["input_price_per_million", "input_per_million", "input_per_mtok"], + ) + output_rate = _extract_rate_from_sources( + sources, + ["output_per_token", "output_token_price", "price_output"], + ["output_price_per_million", "output_per_million", "output_per_mtok"], + ) + cached_rate = _extract_rate_from_sources( + sources, + [ + "cached_input_per_token", + "cached_input_token_price", + "prompt_cache_read_per_token", + ], + [ + "cached_input_price_per_million", + "cached_input_per_million", + "cached_input_per_mtok", + ], + ) + + if input_rate is None or output_rate is None: + continue + # Treat 0-rate entries as "unknown" so cost falls back to `—` rather + # than showing a misleading $0.0000. DO's catalog sometimes omits + # rates for promo/open-weight models. + if input_rate == 0 and output_rate == 0: + continue + prices[str(model_id)] = ModelPrice( + input_per_token_usd=input_rate, + output_per_token_usd=output_rate, + cached_input_per_token_usd=cached_rate, + ) + return prices + + +def _coerce_items(data: Any) -> list[dict]: + if isinstance(data, list): + return [x for x in data if isinstance(x, dict)] + if isinstance(data, dict): + for key in ("data", "models", "pricing", "items"): + val = data.get(key) + if isinstance(val, list): + return [x for x in val if isinstance(x, dict)] + return [] + + +def _extract_rate_from_sources( + sources: list[dict], + per_token_keys: list[str], + per_million_keys: list[str], +) -> float | None: + """Return a per-token rate in USD, or None if unknown. + + Some DO catalog responses put per-token values under a field whose name + says ``_per_million`` (e.g. ``input_price_per_million: 5E-8`` — that's + $5e-8 per token, not per million). Heuristic: values < 1 are already + per-token (real per-million rates are ~0.1 to ~100); values >= 1 are + treated as per-million and divided by 1,000,000. + """ + for src in sources: + for key in per_token_keys: + if key in src and src[key] is not None: + try: + return float(src[key]) + except (TypeError, ValueError): + continue + for key in per_million_keys: + if key in src and src[key] is not None: + try: + v = float(src[key]) + except (TypeError, ValueError): + continue + if v >= 1: + return v / 1_000_000 + return v + return None diff --git a/cli/planoai/obs/render.py b/cli/planoai/obs/render.py new file mode 100644 index 00000000..47a3742e --- /dev/null +++ b/cli/planoai/obs/render.py @@ -0,0 +1,324 @@ +"""Rich TUI renderer for the observability console.""" + +from __future__ import annotations + +from collections import Counter +from dataclasses import dataclass +from datetime import datetime, timezone + +from rich.box import SIMPLE +from rich.columns import Columns +from rich.console import Group +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from planoai.obs.collector import LLMCall + + +@dataclass +class AggregateStats: + count: int + total_cost_usd: float + total_input_tokens: int + total_output_tokens: int + distinct_sessions: int + current_session: str | None + + +@dataclass +class ModelRollup: + model: str + requests: int + input_tokens: int + output_tokens: int + cache_write: int + cache_read: int + cost_usd: float + + +def _now() -> datetime: + return datetime.now(tz=timezone.utc).astimezone() + + +def aggregates(calls: list[LLMCall]) -> AggregateStats: + total_cost = sum((c.cost_usd or 0.0) for c in calls) + total_input = sum(int(c.prompt_tokens or 0) for c in calls) + total_output = sum(int(c.completion_tokens or 0) for c in calls) + session_ids = {c.session_id for c in calls if c.session_id} + current = next( + (c.session_id for c in reversed(calls) if c.session_id is not None), None + ) + return AggregateStats( + count=len(calls), + total_cost_usd=total_cost, + total_input_tokens=total_input, + total_output_tokens=total_output, + distinct_sessions=len(session_ids), + current_session=current, + ) + + +def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]: + buckets: dict[str, dict[str, float | int]] = {} + for c in calls: + key = c.model + b = buckets.setdefault( + key, + { + "requests": 0, + "input": 0, + "output": 0, + "cache_write": 0, + "cache_read": 0, + "cost": 0.0, + }, + ) + b["requests"] = int(b["requests"]) + 1 + b["input"] = int(b["input"]) + int(c.prompt_tokens or 0) + b["output"] = int(b["output"]) + int(c.completion_tokens or 0) + b["cache_write"] = int(b["cache_write"]) + int(c.cache_creation_tokens or 0) + b["cache_read"] = int(b["cache_read"]) + int(c.cached_input_tokens or 0) + b["cost"] = float(b["cost"]) + (c.cost_usd or 0.0) + + rollups: list[ModelRollup] = [] + for model, b in buckets.items(): + rollups.append( + ModelRollup( + model=model, + requests=int(b["requests"]), + input_tokens=int(b["input"]), + output_tokens=int(b["output"]), + cache_write=int(b["cache_write"]), + cache_read=int(b["cache_read"]), + cost_usd=float(b["cost"]), + ) + ) + rollups.sort(key=lambda r: r.cost_usd, reverse=True) + return rollups + + +def route_hits(calls: list[LLMCall]) -> list[tuple[str, int, float]]: + counts: Counter[str] = Counter() + for c in calls: + if c.route_name: + counts[c.route_name] += 1 + total = sum(counts.values()) + if total == 0: + return [] + return [(r, n, (n / total) * 100.0) for r, n in counts.most_common()] + + +def _fmt_cost(v: float | None) -> str: + if v is None: + return "—" + if v == 0: + return "$0" + # Adaptive precision so tiny costs ($3.8e-5) remain readable. + if abs(v) < 0.0001: + return f"${v:.8f}".rstrip("0").rstrip(".") + if abs(v) < 0.01: + return f"${v:.6f}".rstrip("0").rstrip(".") + return f"${v:.4f}" + + +def _fmt_ms(v: float | None) -> str: + if v is None: + return "—" + if v >= 1000: + return f"{v / 1000:.1f}s" + return f"{v:.0f}ms" + + +def _fmt_int(v: int | None) -> str: + if v is None or v == 0: + return "—" + return f"{v:,}" + + +def _fmt_tokens(v: int | None) -> str: + if v is None: + return "—" + return f"{v:,}" + + +def _request_panel(last: LLMCall | None) -> Panel: + if last is None: + body = Text("no requests yet", style="dim") + else: + t = Table.grid(padding=(0, 1)) + t.add_column(style="bold cyan") + t.add_column() + t.add_row("Endpoint", "chat/completions") + status = "—" if last.status_code is None else str(last.status_code) + t.add_row("Status", status) + t.add_row("Model", last.model) + if last.request_model and last.request_model != last.model: + t.add_row("Req model", last.request_model) + if last.route_name: + t.add_row("Route", last.route_name) + body = t + return Panel(body, title="[bold]Request[/]", border_style="cyan", box=SIMPLE) + + +def _cost_panel(last: LLMCall | None) -> Panel: + if last is None: + body = Text("—", style="dim") + else: + t = Table.grid(padding=(0, 1)) + t.add_column(style="bold green") + t.add_column() + t.add_row("Request", _fmt_cost(last.cost_usd)) + t.add_row("Input", _fmt_tokens(last.prompt_tokens)) + t.add_row("Output", _fmt_tokens(last.completion_tokens)) + if last.cached_input_tokens: + t.add_row("Cached", _fmt_tokens(last.cached_input_tokens)) + body = t + return Panel(body, title="[bold]Cost[/]", border_style="green", box=SIMPLE) + + +def _totals_panel(stats: AggregateStats) -> Panel: + t = Table.grid(padding=(0, 1)) + t.add_column(style="bold magenta") + t.add_column() + t.add_column(style="bold magenta") + t.add_column() + t.add_row( + "Total cost", + _fmt_cost(stats.total_cost_usd), + "Requests", + str(stats.count), + ) + t.add_row( + "Input", + _fmt_tokens(stats.total_input_tokens), + "Output", + _fmt_tokens(stats.total_output_tokens), + ) + t.add_row( + "Sessions", + str(stats.distinct_sessions), + "Current session", + stats.current_session or "—", + ) + return Panel(t, title="[bold]Totals[/]", border_style="magenta", box=SIMPLE) + + +def _model_rollup_table(rollups: list[ModelRollup]) -> Table: + table = Table( + title="Totals by model", + box=SIMPLE, + header_style="bold", + expand=True, + ) + table.add_column("Model", style="cyan") + table.add_column("Req", justify="right") + table.add_column("Input", justify="right") + table.add_column("Output", justify="right", style="green") + table.add_column("Cache write", justify="right", style="yellow") + table.add_column("Cache read", justify="right", style="yellow") + table.add_column("Cost", justify="right", style="green") + if not rollups: + table.add_row("—", "—", "—", "—", "—", "—", "—") + for r in rollups: + table.add_row( + r.model, + str(r.requests), + _fmt_tokens(r.input_tokens), + _fmt_tokens(r.output_tokens), + _fmt_int(r.cache_write), + _fmt_int(r.cache_read), + _fmt_cost(r.cost_usd), + ) + return table + + +def _route_hit_table(hits: list[tuple[str, int, float]]) -> Table: + table = Table( + title="Route hit %", + box=SIMPLE, + header_style="bold", + expand=True, + ) + table.add_column("Route", style="cyan") + table.add_column("Hits", justify="right") + table.add_column("%", justify="right") + for route, n, pct in hits: + table.add_row(route, str(n), f"{pct:.1f}") + return table + + +def _recent_table(calls: list[LLMCall], limit: int = 15) -> Table: + show_route = any(c.route_name for c in calls) + table = Table( + title="Recent requests", + box=SIMPLE, + header_style="bold", + expand=True, + ) + table.add_column("time") + table.add_column("model", style="cyan") + if show_route: + table.add_column("route", style="yellow") + table.add_column("in", justify="right") + table.add_column("cache", justify="right", style="yellow") + table.add_column("out", justify="right", style="green") + table.add_column("rsn", justify="right") + table.add_column("cost", justify="right", style="green") + table.add_column("TTFT", justify="right") + table.add_column("lat", justify="right") + table.add_column("st") + + recent = list(reversed(calls))[:limit] + for c in recent: + status_cell = "ok" if c.status_code and 200 <= c.status_code < 400 else str(c.status_code or "—") + row = [ + c.timestamp.strftime("%H:%M:%S"), + c.model, + ] + if show_route: + row.append(c.route_name or "—") + row.extend( + [ + _fmt_tokens(c.prompt_tokens), + _fmt_int(c.cached_input_tokens), + _fmt_tokens(c.completion_tokens), + _fmt_int(c.reasoning_tokens), + _fmt_cost(c.cost_usd), + _fmt_ms(c.ttft_ms), + _fmt_ms(c.duration_ms), + status_cell, + ] + ) + table.add_row(*row) + if not recent: + table.add_row(*(["no requests yet"] + ["—"] * (10 if show_route else 9))) + return table + + +def render(calls: list[LLMCall]) -> Group: + last = calls[-1] if calls else None + stats = aggregates(calls) + rollups = model_rollups(calls) + hits = route_hits(calls) + + header = Columns( + [_request_panel(last), _cost_panel(last), _totals_panel(stats)], + expand=True, + equal=True, + ) + parts = [ + header, + _model_rollup_table(rollups), + ] + if hits: + parts.append(_route_hit_table(hits)) + parts.append(_recent_table(calls)) + parts.append( + Text( + "q quit · c clear · waiting for spans on OTLP :4317 — brightstaff needs " + "tracing.opentracing_grpc_endpoint=localhost:4317", + style="dim", + ) + ) + return Group(*parts) diff --git a/cli/planoai/obs_cmd.py b/cli/planoai/obs_cmd.py new file mode 100644 index 00000000..2bd51fbc --- /dev/null +++ b/cli/planoai/obs_cmd.py @@ -0,0 +1,99 @@ +"""`planoai obs` — live observability TUI.""" + +from __future__ import annotations + +import time + +import rich_click as click +from rich.console import Console +from rich.live import Live + +from planoai.consts import PLANO_COLOR +from planoai.obs.collector import ( + DEFAULT_CAPACITY, + DEFAULT_GRPC_PORT, + LLMCallStore, + ObsCollector, +) +from planoai.obs.pricing import PricingCatalog +from planoai.obs.render import render + + +@click.command(name="obs", help="Live observability console for Plano LLM traffic.") +@click.option( + "--port", + type=int, + default=DEFAULT_GRPC_PORT, + show_default=True, + help="OTLP/gRPC port to listen on. Must match the brightstaff tracing endpoint.", +) +@click.option( + "--host", + type=str, + default="0.0.0.0", + show_default=True, + help="Host to bind the OTLP listener.", +) +@click.option( + "--capacity", + type=int, + default=DEFAULT_CAPACITY, + show_default=True, + help="Max LLM calls kept in memory; older calls evicted FIFO.", +) +@click.option( + "--refresh-ms", + type=int, + default=500, + show_default=True, + help="TUI refresh interval.", +) +def obs(port: int, host: str, capacity: int, refresh_ms: int) -> None: + console = Console() + console.print( + f"[bold {PLANO_COLOR}]planoai obs[/] — loading DO pricing catalog...", + end="", + ) + pricing = PricingCatalog.fetch() + if len(pricing): + sample = ", ".join(pricing.sample_models(3)) + console.print( + f" [green]{len(pricing)} models loaded[/] [dim]({sample}, ...)[/]" + ) + else: + console.print( + " [yellow]no pricing loaded[/] — " + "[dim]set DIGITALOCEAN_TOKEN (DO Personal Access Token) to enable cost[/]" + ) + + store = LLMCallStore(capacity=capacity) + collector = ObsCollector(store=store, pricing=pricing, host=host, port=port) + try: + collector.start() + except OSError as exc: + console.print(f"[red]{exc}[/]") + raise SystemExit(1) + + console.print( + f"Listening for OTLP spans on [bold]{host}:{port}[/]. " + "Ensure plano config has [cyan]tracing.opentracing_grpc_endpoint: http://localhost:4317[/] " + "and [cyan]tracing.random_sampling: 100[/] (or run [bold]planoai up[/] " + "with no config — it wires this automatically)." + ) + console.print("Press [bold]Ctrl-C[/] to exit.\n") + + refresh = max(0.05, refresh_ms / 1000.0) + try: + with Live( + render(store.snapshot()), + console=console, + refresh_per_second=1.0 / refresh, + screen=False, + ) as live: + while True: + time.sleep(refresh) + live.update(render(store.snapshot())) + except KeyboardInterrupt: + console.print("\n[dim]obs stopped[/]") + finally: + collector.stop() diff --git a/cli/planoai/rich_click_config.py b/cli/planoai/rich_click_config.py index ba75bc23..fe90dcf1 100644 --- a/cli/planoai/rich_click_config.py +++ b/cli/planoai/rich_click_config.py @@ -61,7 +61,7 @@ def configure_rich_click(plano_color: str) -> None: }, { "name": "Observability", - "commands": ["trace"], + "commands": ["trace", "obs"], }, { "name": "Utilities", diff --git a/cli/test/test_obs_collector.py b/cli/test/test_obs_collector.py new file mode 100644 index 00000000..6a503337 --- /dev/null +++ b/cli/test/test_obs_collector.py @@ -0,0 +1,141 @@ +import time +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from planoai.obs.collector import LLMCall, LLMCallStore, span_to_llm_call + + +def _mk_attr(key: str, value): + v = MagicMock() + if isinstance(value, bool): + v.WhichOneof.return_value = "bool_value" + v.bool_value = value + elif isinstance(value, int): + v.WhichOneof.return_value = "int_value" + v.int_value = value + elif isinstance(value, float): + v.WhichOneof.return_value = "double_value" + v.double_value = value + else: + v.WhichOneof.return_value = "string_value" + v.string_value = str(value) + kv = MagicMock() + kv.key = key + kv.value = v + return kv + + +def _mk_span(attrs: dict, start_ns: int | None = None, span_id_hex: str = "ab") -> MagicMock: + span = MagicMock() + span.attributes = [_mk_attr(k, v) for k, v in attrs.items()] + span.start_time_unix_nano = start_ns or int(time.time() * 1_000_000_000) + span.span_id.hex.return_value = span_id_hex + return span + + +def test_span_without_llm_model_is_ignored(): + span = _mk_span({"http.method": "POST"}) + assert span_to_llm_call(span, "plano(llm)") is None + + +def test_span_with_full_llm_attrs_produces_call(): + span = _mk_span( + { + "llm.model": "openai-gpt-5.4", + "model.requested": "router:software-engineering", + "plano.session_id": "sess-abc", + "plano.route.name": "software-engineering", + "llm.is_streaming": False, + "llm.duration_ms": 1234, + "llm.time_to_first_token": 210, + "llm.usage.prompt_tokens": 100, + "llm.usage.completion_tokens": 50, + "llm.usage.total_tokens": 150, + "llm.usage.cached_input_tokens": 30, + "llm.usage.cache_creation_tokens": 5, + "llm.usage.reasoning_tokens": 200, + "http.status_code": 200, + "request_id": "req-42", + } + ) + call = span_to_llm_call(span, "plano(llm)") + assert call is not None + assert call.request_id == "req-42" + assert call.model == "openai-gpt-5.4" + assert call.request_model == "router:software-engineering" + assert call.session_id == "sess-abc" + assert call.route_name == "software-engineering" + assert call.is_streaming is False + assert call.duration_ms == 1234.0 + assert call.ttft_ms == 210.0 + assert call.prompt_tokens == 100 + assert call.completion_tokens == 50 + assert call.total_tokens == 150 + assert call.cached_input_tokens == 30 + assert call.cache_creation_tokens == 5 + assert call.reasoning_tokens == 200 + assert call.status_code == 200 + + +def test_pricing_lookup_attaches_cost(): + class StubPricing: + def cost_for_call(self, call): + # Simple: 2 * prompt + 3 * completion, in cents + return 0.02 * (call.prompt_tokens or 0) + 0.03 * (call.completion_tokens or 0) + + span = _mk_span( + { + "llm.model": "do/openai-gpt-5.4", + "llm.usage.prompt_tokens": 10, + "llm.usage.completion_tokens": 2, + } + ) + call = span_to_llm_call(span, "plano(llm)", pricing=StubPricing()) + assert call is not None + assert call.cost_usd == pytest.approx(0.26) + + +def test_tpt_and_tokens_per_sec_derived(): + call = LLMCall( + request_id="x", + timestamp=datetime.now(tz=timezone.utc), + model="m", + duration_ms=1000, + ttft_ms=200, + completion_tokens=80, + ) + # (1000 - 200) / 80 = 10ms per token => 100 tokens/sec + assert call.tpt_ms == 10.0 + assert call.tokens_per_sec == 100.0 + + +def test_tpt_returns_none_when_no_completion_tokens(): + call = LLMCall( + request_id="x", + timestamp=datetime.now(tz=timezone.utc), + model="m", + duration_ms=1000, + ttft_ms=200, + completion_tokens=0, + ) + assert call.tpt_ms is None + assert call.tokens_per_sec is None + + +def test_store_evicts_fifo_at_capacity(): + store = LLMCallStore(capacity=3) + now = datetime.now(tz=timezone.utc) + for i in range(5): + store.add( + LLMCall( + request_id=f"r{i}", + timestamp=now, + model="m", + ) + ) + snap = store.snapshot() + assert len(snap) == 3 + assert [c.request_id for c in snap] == ["r2", "r3", "r4"] diff --git a/cli/test/test_obs_pricing.py b/cli/test/test_obs_pricing.py new file mode 100644 index 00000000..95f9a2da --- /dev/null +++ b/cli/test/test_obs_pricing.py @@ -0,0 +1,103 @@ +from datetime import datetime, timezone + +from planoai.obs.collector import LLMCall +from planoai.obs.pricing import ModelPrice, PricingCatalog + + +def _call(model: str, prompt: int, completion: int, cached: int = 0) -> LLMCall: + return LLMCall( + request_id="r", + timestamp=datetime.now(tz=timezone.utc), + model=model, + prompt_tokens=prompt, + completion_tokens=completion, + cached_input_tokens=cached, + ) + + +def test_lookup_matches_bare_and_prefixed(): + prices = { + "openai-gpt-5.4": ModelPrice( + input_per_token_usd=0.000001, output_per_token_usd=0.000002 + ) + } + catalog = PricingCatalog(prices) + assert catalog.price_for("openai-gpt-5.4") is not None + # do/openai-gpt-5.4 should resolve after stripping the provider prefix. + assert catalog.price_for("do/openai-gpt-5.4") is not None + assert catalog.price_for("unknown-model") is None + + +def test_cost_computation_without_cache(): + prices = { + "m": ModelPrice(input_per_token_usd=0.000001, output_per_token_usd=0.000002) + } + cost = PricingCatalog(prices).cost_for_call(_call("m", 1000, 500)) + assert cost == 0.002 # 1000 * 1e-6 + 500 * 2e-6 + + +def test_cost_computation_with_cached_discount(): + prices = { + "m": ModelPrice( + input_per_token_usd=0.000001, + output_per_token_usd=0.000002, + cached_input_per_token_usd=0.0000001, + ) + } + # 800 fresh @ 1e-6 = 8e-4; 200 cached @ 1e-7 = 2e-5; 500 out @ 2e-6 = 1e-3 + cost = PricingCatalog(prices).cost_for_call(_call("m", 1000, 500, cached=200)) + assert cost == round(0.0008 + 0.00002 + 0.001, 6) + + +def test_empty_catalog_returns_none(): + assert PricingCatalog().cost_for_call(_call("m", 100, 50)) is None + + +def test_parse_do_catalog_treats_small_values_as_per_token(): + """DO's real catalog uses per-token values under the `_per_million` key + (e.g. 5E-8 for GPT-oss-20b). We treat values < 1 as already per-token.""" + from planoai.obs.pricing import _parse_do_pricing + + sample = { + "data": [ + { + "model_id": "openai-gpt-oss-20b", + "pricing": { + "input_price_per_million": 5e-8, + "output_price_per_million": 4.5e-7, + }, + }, + { + "model_id": "openai-gpt-oss-120b", + "pricing": { + "input_price_per_million": 1e-7, + "output_price_per_million": 7e-7, + }, + }, + ] + } + prices = _parse_do_pricing(sample) + # Values < 1 are assumed to already be per-token — no extra division. + assert prices["openai-gpt-oss-20b"].input_per_token_usd == 5e-8 + assert prices["openai-gpt-oss-20b"].output_per_token_usd == 4.5e-7 + assert prices["openai-gpt-oss-120b"].input_per_token_usd == 1e-7 + + +def test_parse_do_catalog_divides_large_values_as_per_million(): + """A provider that genuinely reports $5-per-million in that field gets divided.""" + from planoai.obs.pricing import _parse_do_pricing + + sample = { + "data": [ + { + "model_id": "mystery-model", + "pricing": { + "input_price_per_million": 5.0, # > 1 → treated as per-million + "output_price_per_million": 15.0, + }, + }, + ] + } + prices = _parse_do_pricing(sample) + assert prices["mystery-model"].input_per_token_usd == 5.0 / 1_000_000 + assert prices["mystery-model"].output_per_token_usd == 15.0 / 1_000_000 diff --git a/cli/test/test_obs_render.py b/cli/test/test_obs_render.py new file mode 100644 index 00000000..b6438e63 --- /dev/null +++ b/cli/test/test_obs_render.py @@ -0,0 +1,73 @@ +from datetime import datetime, timedelta, timezone + +from planoai.obs.collector import LLMCall +from planoai.obs.render import aggregates, model_rollups, route_hits + + +def _call(model: str, ts: datetime, prompt=0, completion=0, cost=None, route=None, session=None, cache_read=0, cache_write=0): + return LLMCall( + request_id="r", + timestamp=ts, + model=model, + prompt_tokens=prompt, + completion_tokens=completion, + cached_input_tokens=cache_read, + cache_creation_tokens=cache_write, + cost_usd=cost, + route_name=route, + session_id=session, + ) + + +def test_aggregates_sum_and_session_counts(): + now = datetime.now(tz=timezone.utc).astimezone() + calls = [ + _call("m1", now - timedelta(seconds=50), prompt=10, completion=5, cost=0.001, session="s1"), + _call("m2", now - timedelta(seconds=40), prompt=20, completion=10, cost=0.002, session="s1"), + _call("m1", now - timedelta(seconds=30), prompt=30, completion=15, cost=0.003, session="s2"), + ] + stats = aggregates(calls) + assert stats.count == 3 + assert stats.total_cost_usd == 0.006 + assert stats.total_input_tokens == 60 + assert stats.total_output_tokens == 30 + assert stats.distinct_sessions == 2 + assert stats.current_session == "s2" + + +def test_rollups_split_by_model_and_cache(): + now = datetime.now(tz=timezone.utc).astimezone() + calls = [ + _call("m1", now, prompt=10, completion=5, cost=0.001, cache_write=3, cache_read=7), + _call("m1", now, prompt=20, completion=10, cost=0.002, cache_read=1), + _call("m2", now, prompt=30, completion=15, cost=0.004), + ] + rollups = model_rollups(calls) + by_model = {r.model: r for r in rollups} + assert by_model["m1"].requests == 2 + assert by_model["m1"].input_tokens == 30 + assert by_model["m1"].cache_write == 3 + assert by_model["m1"].cache_read == 8 + assert by_model["m2"].input_tokens == 30 + + +def test_route_hits_only_for_routed_calls(): + now = datetime.now(tz=timezone.utc).astimezone() + calls = [ + _call("m", now, route="code"), + _call("m", now, route="code"), + _call("m", now, route="summarization"), + _call("m", now), # no route + ] + hits = route_hits(calls) + # Only calls with route names are counted. + assert sum(n for _, n, _ in hits) == 3 + hits_by_name = {name: (n, pct) for name, n, pct in hits} + assert hits_by_name["code"][0] == 2 + assert hits_by_name["summarization"][0] == 1 + + +def test_route_hits_empty_when_no_routes(): + now = datetime.now(tz=timezone.utc).astimezone() + calls = [_call("m", now), _call("m", now)] + assert route_hits(calls) == [] diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 8f00e4b6..719c048d 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -33,7 +33,8 @@ use crate::streaming::{ ObservableStreamProcessor, StreamProcessor, }; use crate::tracing::{ - collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name, + collect_custom_trace_attributes, llm as tracing_llm, operation_component, + plano as tracing_plano, set_service_name, }; use model_selection::router_chat_get_upstream_model; @@ -102,15 +103,36 @@ async fn llm_chat_inner( .and_then(|hdr| request_headers.get(hdr)) .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()); - let pinned_model: Option = if let Some(ref sid) = session_id { + let cached_route = if let Some(ref sid) = session_id { state .orchestrator_service .get_cached_route(sid, tenant_id.as_deref()) .await - .map(|c| c.model_name) } else { None }; + let (pinned_model, pinned_route_name): (Option, Option) = match cached_route { + Some(c) => (Some(c.model_name), c.route_name), + None => (None, None), + }; + + // Record session id on the LLM span for the observability console. + if let Some(ref sid) = session_id { + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + tracing_plano::SESSION_ID, + sid.clone(), + )); + }); + } + if let Some(ref route_name) = pinned_route_name { + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + tracing_plano::ROUTE_NAME, + route_name.clone(), + )); + }); + } let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path); @@ -311,6 +333,18 @@ async fn llm_chat_inner( alias_resolved_model.clone() }; + // Record route name on the LLM span (only when the orchestrator produced one). + if let Some(ref rn) = route_name { + if !rn.is_empty() && rn != "none" { + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + tracing_plano::ROUTE_NAME, + rn.clone(), + )); + }); + } + } + if let Some(ref sid) = session_id { state .orchestrator_service @@ -671,6 +705,36 @@ async fn send_upstream( // Propagate upstream headers and status let response_headers = llm_response.headers().clone(); let upstream_status = llm_response.status(); + + // Upstream routers (e.g. DigitalOcean Gradient) may return an + // `x-model-router-selected-route` header indicating which task-level + // route the request was classified into (e.g. "Code Generation"). Surface + // it as `plano.route.name` so the obs console's Route hit % panel can + // show the breakdown even when Plano's own orchestrator wasn't in the + // routing path. Any value from Plano's orchestrator already set earlier + // takes precedence — this only fires when the span doesn't already have + // a route name. + if let Some(upstream_route) = response_headers + .get("x-model-router-selected-route") + .and_then(|v| v.to_str().ok()) + { + if !upstream_route.is_empty() { + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + crate::tracing::plano::ROUTE_NAME, + upstream_route.to_string(), + )); + }); + } + } + // Record the upstream HTTP status on the span for the obs console. + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + crate::tracing::http::STATUS_CODE, + upstream_status.as_u16() as i64, + )); + }); + let mut response = Response::builder().status(upstream_status); if let Some(headers) = response.headers_mut() { for (name, value) in response_headers.iter() { diff --git a/crates/brightstaff/src/streaming.rs b/crates/brightstaff/src/streaming.rs index f7af8ae0..40cbbe7c 100644 --- a/crates/brightstaff/src/streaming.rs +++ b/crates/brightstaff/src/streaming.rs @@ -16,10 +16,131 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::handlers::agents::pipeline::{PipelineError, PipelineProcessor}; const STREAM_BUFFER_SIZE: usize = 16; +/// Cap on accumulated response bytes kept for usage extraction. +/// Most chat responses are well under this; pathological ones are dropped without +/// affecting pass-through streaming to the client. +const USAGE_BUFFER_MAX: usize = 2 * 1024 * 1024; use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER}; use crate::tracing::{llm, set_service_name, signals as signal_constants}; use hermesllm::apis::openai::Message; +/// Parsed usage + resolved-model details from a provider response. +#[derive(Debug, Default, Clone)] +struct ExtractedUsage { + prompt_tokens: Option, + completion_tokens: Option, + total_tokens: Option, + cached_input_tokens: Option, + cache_creation_tokens: Option, + reasoning_tokens: Option, + /// The model the upstream actually used. For router aliases (e.g. + /// `router:software-engineering`), this differs from the request model. + resolved_model: Option, +} + +impl ExtractedUsage { + fn is_empty(&self) -> bool { + self.prompt_tokens.is_none() + && self.completion_tokens.is_none() + && self.total_tokens.is_none() + && self.resolved_model.is_none() + } + + fn from_json(value: &serde_json::Value) -> Self { + let mut out = Self::default(); + if let Some(model) = value.get("model").and_then(|v| v.as_str()) { + if !model.is_empty() { + out.resolved_model = Some(model.to_string()); + } + } + if let Some(u) = value.get("usage") { + // OpenAI-shape usage + out.prompt_tokens = u.get("prompt_tokens").and_then(|v| v.as_i64()); + out.completion_tokens = u.get("completion_tokens").and_then(|v| v.as_i64()); + out.total_tokens = u.get("total_tokens").and_then(|v| v.as_i64()); + out.cached_input_tokens = u + .get("prompt_tokens_details") + .and_then(|d| d.get("cached_tokens")) + .and_then(|v| v.as_i64()); + out.reasoning_tokens = u + .get("completion_tokens_details") + .and_then(|d| d.get("reasoning_tokens")) + .and_then(|v| v.as_i64()); + + // Anthropic-shape fallbacks + if out.prompt_tokens.is_none() { + out.prompt_tokens = u.get("input_tokens").and_then(|v| v.as_i64()); + } + if out.completion_tokens.is_none() { + out.completion_tokens = u.get("output_tokens").and_then(|v| v.as_i64()); + } + if out.total_tokens.is_none() { + if let (Some(p), Some(c)) = (out.prompt_tokens, out.completion_tokens) { + out.total_tokens = Some(p + c); + } + } + if out.cached_input_tokens.is_none() { + out.cached_input_tokens = u.get("cache_read_input_tokens").and_then(|v| v.as_i64()); + } + if out.cached_input_tokens.is_none() { + out.cached_input_tokens = + u.get("cached_content_token_count").and_then(|v| v.as_i64()); + } + out.cache_creation_tokens = u + .get("cache_creation_input_tokens") + .and_then(|v| v.as_i64()); + if out.reasoning_tokens.is_none() { + out.reasoning_tokens = u.get("thoughts_token_count").and_then(|v| v.as_i64()); + } + } + out + } +} + +/// Try to pull usage out of an accumulated response body. +/// Handles both a single JSON object (non-streaming) and SSE streams where the +/// final `data: {...}` event carries the `usage` field. +fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage { + if buf.is_empty() { + return ExtractedUsage::default(); + } + + // Fast path: full-body JSON (non-streaming). + if let Ok(value) = serde_json::from_slice::(buf) { + let u = ExtractedUsage::from_json(&value); + if !u.is_empty() { + return u; + } + } + + // SSE path: scan from the end for a `data:` line containing a usage object. + let text = match std::str::from_utf8(buf) { + Ok(t) => t, + Err(_) => return ExtractedUsage::default(), + }; + for line in text.lines().rev() { + let trimmed = line.trim_start(); + let payload = match trimmed.strip_prefix("data:") { + Some(p) => p.trim_start(), + None => continue, + }; + if payload == "[DONE]" || payload.is_empty() { + continue; + } + if !payload.contains("\"usage\"") { + continue; + } + if let Ok(value) = serde_json::from_str::(payload) { + let u = ExtractedUsage::from_json(&value); + if !u.is_empty() { + return u; + } + } + } + + ExtractedUsage::default() +} + /// Trait for processing streaming chunks /// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging) pub trait StreamProcessor: Send + 'static { @@ -60,6 +181,10 @@ pub struct ObservableStreamProcessor { start_time: Instant, time_to_first_token: Option, messages: Option>, + /// Accumulated response bytes used only for best-effort usage extraction + /// on `on_complete`. Capped at `USAGE_BUFFER_MAX`; excess chunks are dropped + /// from the buffer (they still pass through to the client). + response_buffer: Vec, } impl ObservableStreamProcessor { @@ -93,6 +218,7 @@ impl ObservableStreamProcessor { start_time, time_to_first_token: None, messages, + response_buffer: Vec::new(), } } } @@ -101,6 +227,13 @@ impl StreamProcessor for ObservableStreamProcessor { fn process_chunk(&mut self, chunk: Bytes) -> Result, String> { self.total_bytes += chunk.len(); self.chunk_count += 1; + // Accumulate for best-effort usage extraction; drop further chunks once + // the cap is reached so we don't retain huge response bodies in memory. + if self.response_buffer.len() < USAGE_BUFFER_MAX { + let remaining = USAGE_BUFFER_MAX - self.response_buffer.len(); + let take = chunk.len().min(remaining); + self.response_buffer.extend_from_slice(&chunk[..take]); + } Ok(Some(chunk)) } @@ -124,6 +257,52 @@ impl StreamProcessor for ObservableStreamProcessor { ); } + // Record total duration on the span for the observability console. + let duration_ms = self.start_time.elapsed().as_millis() as i64; + { + let span = tracing::Span::current(); + let otel_context = span.context(); + let otel_span = otel_context.span(); + otel_span.set_attribute(KeyValue::new(llm::DURATION_MS, duration_ms)); + otel_span.set_attribute(KeyValue::new(llm::RESPONSE_BYTES, self.total_bytes as i64)); + } + + // Best-effort usage extraction + emission (works for both streaming + // SSE and non-streaming JSON responses that include a `usage` object). + let usage = extract_usage_from_bytes(&self.response_buffer); + if !usage.is_empty() { + let span = tracing::Span::current(); + let otel_context = span.context(); + let otel_span = otel_context.span(); + if let Some(v) = usage.prompt_tokens { + otel_span.set_attribute(KeyValue::new(llm::PROMPT_TOKENS, v)); + } + if let Some(v) = usage.completion_tokens { + otel_span.set_attribute(KeyValue::new(llm::COMPLETION_TOKENS, v)); + } + if let Some(v) = usage.total_tokens { + otel_span.set_attribute(KeyValue::new(llm::TOTAL_TOKENS, v)); + } + if let Some(v) = usage.cached_input_tokens { + otel_span.set_attribute(KeyValue::new(llm::CACHED_INPUT_TOKENS, v)); + } + if let Some(v) = usage.cache_creation_tokens { + otel_span.set_attribute(KeyValue::new(llm::CACHE_CREATION_TOKENS, v)); + } + if let Some(v) = usage.reasoning_tokens { + otel_span.set_attribute(KeyValue::new(llm::REASONING_TOKENS, v)); + } + // Override `llm.model` with the model the upstream actually ran + // (e.g. `openai-gpt-5.4` resolved from `router:software-engineering`). + // Cost lookup keys off the real model, not the alias. + if let Some(resolved) = usage.resolved_model.clone() { + otel_span.set_attribute(KeyValue::new(llm::MODEL_NAME, resolved)); + } + } + // Release the buffered bytes early; nothing downstream needs them. + self.response_buffer.clear(); + self.response_buffer.shrink_to_fit(); + // Analyze signals if messages are available and record as span attributes if let Some(ref messages) = self.messages { let analyzer: Box = Box::new(TextBasedSignalAnalyzer::new()); @@ -404,3 +583,55 @@ pub fn truncate_message(message: &str, max_length: usize) -> String { message.to_string() } } + +#[cfg(test)] +mod usage_extraction_tests { + use super::*; + + #[test] + fn non_streaming_openai_with_cached() { + let body = br#"{"id":"x","model":"gpt-4o","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":34,"total_tokens":46,"prompt_tokens_details":{"cached_tokens":5}}}"#; + let u = extract_usage_from_bytes(body); + assert_eq!(u.prompt_tokens, Some(12)); + assert_eq!(u.completion_tokens, Some(34)); + assert_eq!(u.total_tokens, Some(46)); + assert_eq!(u.cached_input_tokens, Some(5)); + assert_eq!(u.reasoning_tokens, None); + } + + #[test] + fn non_streaming_anthropic_with_cache_creation() { + let body = br#"{"id":"x","model":"claude","usage":{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":20,"cache_read_input_tokens":30}}"#; + let u = extract_usage_from_bytes(body); + assert_eq!(u.prompt_tokens, Some(100)); + assert_eq!(u.completion_tokens, Some(50)); + assert_eq!(u.total_tokens, Some(150)); + assert_eq!(u.cached_input_tokens, Some(30)); + assert_eq!(u.cache_creation_tokens, Some(20)); + } + + #[test] + fn streaming_openai_final_chunk_has_usage() { + let sse = b"data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]} + +data: {\"choices\":[{\"delta\":{}, \"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":7,\"completion_tokens\":3,\"total_tokens\":10}} + +data: [DONE] + +"; + let u = extract_usage_from_bytes(sse); + assert_eq!(u.prompt_tokens, Some(7)); + assert_eq!(u.completion_tokens, Some(3)); + assert_eq!(u.total_tokens, Some(10)); + } + + #[test] + fn empty_returns_default() { + assert!(extract_usage_from_bytes(b"").is_empty()); + } + + #[test] + fn no_usage_in_body_returns_default() { + assert!(extract_usage_from_bytes(br#"{"ok":true}"#).is_empty()); + } +} diff --git a/crates/brightstaff/src/tracing/constants.rs b/crates/brightstaff/src/tracing/constants.rs index 15e3cf57..79a40401 100644 --- a/crates/brightstaff/src/tracing/constants.rs +++ b/crates/brightstaff/src/tracing/constants.rs @@ -80,6 +80,18 @@ pub mod llm { /// Total tokens used (prompt + completion) pub const TOTAL_TOKENS: &str = "llm.usage.total_tokens"; + /// Tokens served from a prompt cache read + /// (OpenAI `prompt_tokens_details.cached_tokens`, Anthropic `cache_read_input_tokens`, + /// Google `cached_content_token_count`) + pub const CACHED_INPUT_TOKENS: &str = "llm.usage.cached_input_tokens"; + + /// Tokens used to write a prompt cache entry (Anthropic `cache_creation_input_tokens`) + pub const CACHE_CREATION_TOKENS: &str = "llm.usage.cache_creation_tokens"; + + /// Reasoning tokens for reasoning models + /// (OpenAI `completion_tokens_details.reasoning_tokens`, Google `thoughts_token_count`) + pub const REASONING_TOKENS: &str = "llm.usage.reasoning_tokens"; + /// Temperature parameter used pub const TEMPERATURE: &str = "llm.temperature"; @@ -119,6 +131,22 @@ pub mod routing { pub const SELECTION_REASON: &str = "routing.selection_reason"; } +// ============================================================================= +// Span Attributes - Plano-specific +// ============================================================================= + +/// Attributes specific to Plano (session affinity, routing decisions). +pub mod plano { + /// Session identifier propagated via the `x-model-affinity` header. + /// Absent when the client did not send the header. + pub const SESSION_ID: &str = "plano.session_id"; + + /// Matched route name from routing (e.g. "code", "summarization", + /// "software-engineering"). Absent when the client routed directly + /// to a concrete model. + pub const ROUTE_NAME: &str = "plano.route.name"; +} + // ============================================================================= // Span Attributes - Error Handling // ============================================================================= diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs index 644db31a..8e09a21c 100644 --- a/crates/brightstaff/src/tracing/mod.rs +++ b/crates/brightstaff/src/tracing/mod.rs @@ -4,7 +4,7 @@ mod init; mod service_name_exporter; pub use constants::{ - error, http, llm, operation_component, routing, signals, OperationNameBuilder, + error, http, llm, operation_component, plano, routing, signals, OperationNameBuilder, }; pub use custom_attributes::collect_custom_trace_attributes; pub use init::init_tracer; diff --git a/crates/hermesllm/src/apis/anthropic.rs b/crates/hermesllm/src/apis/anthropic.rs index 4df4bb00..ee572268 100644 --- a/crates/hermesllm/src/apis/anthropic.rs +++ b/crates/hermesllm/src/apis/anthropic.rs @@ -435,6 +435,12 @@ impl TokenUsage for MessagesResponse { fn total_tokens(&self) -> usize { (self.usage.input_tokens + self.usage.output_tokens) as usize } + fn cached_input_tokens(&self) -> Option { + self.usage.cache_read_input_tokens.map(|t| t as usize) + } + fn cache_creation_tokens(&self) -> Option { + self.usage.cache_creation_input_tokens.map(|t| t as usize) + } } impl ProviderResponse for MessagesResponse { diff --git a/crates/hermesllm/src/apis/openai.rs b/crates/hermesllm/src/apis/openai.rs index d22ff756..bb93fd34 100644 --- a/crates/hermesllm/src/apis/openai.rs +++ b/crates/hermesllm/src/apis/openai.rs @@ -596,6 +596,18 @@ impl TokenUsage for Usage { fn total_tokens(&self) -> usize { self.total_tokens as usize } + + fn cached_input_tokens(&self) -> Option { + self.prompt_tokens_details + .as_ref() + .and_then(|d| d.cached_tokens.map(|t| t as usize)) + } + + fn reasoning_tokens(&self) -> Option { + self.completion_tokens_details + .as_ref() + .and_then(|d| d.reasoning_tokens.map(|t| t as usize)) + } } /// Implementation of ProviderRequest for ChatCompletionsRequest diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index eac8a452..92d362b2 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -710,6 +710,18 @@ impl crate::providers::response::TokenUsage for ResponseUsage { fn total_tokens(&self) -> usize { self.total_tokens as usize } + + fn cached_input_tokens(&self) -> Option { + self.input_tokens_details + .as_ref() + .map(|d| d.cached_tokens.max(0) as usize) + } + + fn reasoning_tokens(&self) -> Option { + self.output_tokens_details + .as_ref() + .map(|d| d.reasoning_tokens.max(0) as usize) + } } /// Token details diff --git a/crates/hermesllm/src/providers/response.rs b/crates/hermesllm/src/providers/response.rs index 5f46f97b..b8565ddf 100644 --- a/crates/hermesllm/src/providers/response.rs +++ b/crates/hermesllm/src/providers/response.rs @@ -23,6 +23,31 @@ pub trait TokenUsage { fn completion_tokens(&self) -> usize; fn prompt_tokens(&self) -> usize; fn total_tokens(&self) -> usize; + /// Tokens served from a prompt cache read (OpenAI `prompt_tokens_details.cached_tokens`, + /// Anthropic `cache_read_input_tokens`, Google `cached_content_token_count`). + fn cached_input_tokens(&self) -> Option { + None + } + /// Tokens used to write a cache entry (Anthropic `cache_creation_input_tokens`). + fn cache_creation_tokens(&self) -> Option { + None + } + /// Reasoning tokens for reasoning models (OpenAI `completion_tokens_details.reasoning_tokens`, + /// Google `thoughts_token_count`). + fn reasoning_tokens(&self) -> Option { + None + } +} + +/// Rich usage breakdown extracted from a provider response. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct UsageDetails { + pub prompt_tokens: usize, + pub completion_tokens: usize, + pub total_tokens: usize, + pub cached_input_tokens: Option, + pub cache_creation_tokens: Option, + pub reasoning_tokens: Option, } pub trait ProviderResponse: Send + Sync { @@ -34,6 +59,18 @@ pub trait ProviderResponse: Send + Sync { self.usage() .map(|u| (u.prompt_tokens(), u.completion_tokens(), u.total_tokens())) } + + /// Extract a rich usage breakdown including cached/cache-creation/reasoning tokens. + fn extract_usage_details(&self) -> Option { + self.usage().map(|u| UsageDetails { + prompt_tokens: u.prompt_tokens(), + completion_tokens: u.completion_tokens(), + total_tokens: u.total_tokens(), + cached_input_tokens: u.cached_input_tokens(), + cache_creation_tokens: u.cache_creation_tokens(), + reasoning_tokens: u.reasoning_tokens(), + }) + } } impl ProviderResponse for ProviderResponseType { diff --git a/docs/source/get_started/quickstart.rst b/docs/source/get_started/quickstart.rst index 6f1a86ac..fa9e4e22 100644 --- a/docs/source/get_started/quickstart.rst +++ b/docs/source/get_started/quickstart.rst @@ -340,6 +340,42 @@ And to get the list of supported currencies: "Here is a list of the currencies that are supported for conversion from USD, along with their symbols:\n\n1. AUD - Australian Dollar\n2. BGN - Bulgarian Lev\n3. BRL - Brazilian Real\n4. CAD - Canadian Dollar\n5. CHF - Swiss Franc\n6. CNY - Chinese Renminbi Yuan\n7. CZK - Czech Koruna\n8. DKK - Danish Krone\n9. EUR - Euro\n10. GBP - British Pound\n11. HKD - Hong Kong Dollar\n12. HUF - Hungarian Forint\n13. IDR - Indonesian Rupiah\n14. ILS - Israeli New Sheqel\n15. INR - Indian Rupee\n16. ISK - Icelandic Króna\n17. JPY - Japanese Yen\n18. KRW - South Korean Won\n19. MXN - Mexican Peso\n20. MYR - Malaysian Ringgit\n21. NOK - Norwegian Krone\n22. NZD - New Zealand Dollar\n23. PHP - Philippine Peso\n24. PLN - Polish Złoty\n25. RON - Romanian Leu\n26. SEK - Swedish Krona\n27. SGD - Singapore Dollar\n28. THB - Thai Baht\n29. TRY - Turkish Lira\n30. USD - United States Dollar\n31. ZAR - South African Rand\n\nIf you want to convert USD to any of these currencies, you can select the one you are interested in." +Observability Console +--------------------- + +Run ``planoai obs`` in a second terminal for a live, in-memory view of LLM traffic: per-request tokens, cached/cache-creation/reasoning tokens, TTFT, latency, cost (when DO Gradient pricing is available), session grouping, and route distribution. + +.. code-block:: console + + $ planoai obs + # In another terminal, start the proxy — with no config, planoai synthesizes + # a pass-through config for all known providers and auto-wires OTel export + # to localhost:4317 so the console receives spans automatically. + $ planoai up + +With no API keys set, every provider runs in pass-through mode — supply the ``Authorization`` header yourself on each request. For example, using DigitalOcean Gradient: + +.. code-block:: console + + $ curl localhost:12000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $DO_API_KEY" \ + -d '{"model":"do/router:software-engineering", + "messages":[{"role":"user","content":"write code to print prime numbers in python"}], + "stream":false}' + +When you do export ``OPENAI_API_KEY`` / ``ANTHROPIC_API_KEY`` / ``DO_API_KEY`` / etc. before ``planoai up``, Plano picks them up automatically and clients no longer need to send ``Authorization``. + +If you already use your own ``plano_config.yaml``, add this block so spans flow to the console: + +.. code-block:: yaml + + tracing: + random_sampling: 100 + opentracing_grpc_endpoint: http://localhost:4317 + +Press ``Ctrl-C`` in the obs terminal to exit. Data lives in memory only — nothing is persisted to disk. + Next Steps ==========