Fix request closures during long-running streaming

This commit is contained in:
adilhafeez 2026-04-18 18:10:23 -07:00
parent ffea891dba
commit c8c6b87d1e
7 changed files with 637 additions and 149 deletions

View file

@ -7,6 +7,7 @@ Single-source: one fetch at startup, cached for the life of the process.
from __future__ import annotations
import logging
import re
import threading
from dataclasses import dataclass
from typing import Any
@ -123,13 +124,28 @@ class PricingCatalog:
return round(cost, 6)
_DATE_SUFFIX_RE = re.compile(r"-\d{8}$")
_PROVIDER_PREFIXES = ("anthropic", "openai", "google", "meta", "cohere", "mistral")
_ANTHROPIC_FAMILIES = {"opus", "sonnet", "haiku"}
def _model_key_candidates(model_name: str) -> list[str]:
"""Lookup-side variants of a Plano-emitted model name.
Plano resolves names like ``claude-haiku-4-5-20251001``; the catalog stores
them as ``anthropic-claude-haiku-4.5``. We strip the date suffix and the
``provider/`` prefix here; the catalog itself registers the dash/dot and
family-order aliases at parse time (see :func:`_expand_aliases`).
"""
base = model_name.strip()
out = [base]
if "/" in base:
out.append(base.split("/", 1)[1])
for k in list(out):
stripped = _DATE_SUFFIX_RE.sub("", k)
if stripped != k:
out.append(stripped)
out.extend([v.lower() for v in list(out)])
# Dedup while preserving order.
seen: set[str] = set()
uniq = []
for key in out:
@ -139,6 +155,54 @@ def _model_key_candidates(model_name: str) -> list[str]:
return uniq
def _expand_aliases(model_id: str) -> set[str]:
"""Catalog-side variants of a DO model id.
DO publishes Anthropic models under ids like ``anthropic-claude-opus-4.7``
or ``anthropic-claude-4.6-sonnet`` while Plano emits ``claude-opus-4-7`` /
``claude-sonnet-4-6``. Generate a set covering provider-prefix stripping,
dashdot in version segments, and familyversion word order so a single
catalog entry matches every name shape we'll see at lookup.
"""
aliases: set[str] = set()
def add(name: str) -> None:
if not name:
return
aliases.add(name)
aliases.add(name.lower())
add(model_id)
base = model_id
head, _, rest = base.partition("-")
if head.lower() in _PROVIDER_PREFIXES and rest:
add(rest)
base = rest
for key in list(aliases):
if "." in key:
add(key.replace(".", "-"))
parts = base.split("-")
if len(parts) >= 3 and parts[0].lower() == "claude":
rest_parts = parts[1:]
for i, p in enumerate(rest_parts):
if p.lower() in _ANTHROPIC_FAMILIES:
others = rest_parts[:i] + rest_parts[i + 1 :]
if not others:
break
family_last = "claude-" + "-".join(others) + "-" + p
family_first = "claude-" + p + "-" + "-".join(others)
add(family_last)
add(family_first)
add(family_last.replace(".", "-"))
add(family_first.replace(".", "-"))
break
return aliases
def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]:
"""Parse DO catalog response into a ModelPrice map keyed by model id.
@ -204,11 +268,13 @@ def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]:
# rates for promo/open-weight models.
if input_rate == 0 and output_rate == 0:
continue
prices[str(model_id)] = ModelPrice(
price = ModelPrice(
input_per_token_usd=input_rate,
output_per_token_usd=output_rate,
cached_input_per_token_usd=cached_rate,
)
for alias in _expand_aliases(str(model_id)):
prices.setdefault(alias, price)
return prices

View file

@ -4,15 +4,18 @@ from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import datetime
from http import HTTPStatus
from rich.box import SIMPLE
from rich.columns import Columns
from rich.align import Align
from rich.box import SIMPLE, SIMPLE_HEAVY
from rich.console import Group
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
MAX_WIDTH = 160
from planoai.obs.collector import LLMCall
@ -24,6 +27,16 @@ class AggregateStats:
total_output_tokens: int
distinct_sessions: int
current_session: str | None
p50_latency_ms: float | None = None
p95_latency_ms: float | None = None
p99_latency_ms: float | None = None
p50_ttft_ms: float | None = None
p95_ttft_ms: float | None = None
p99_ttft_ms: float | None = None
error_count: int = 0
errors_4xx: int = 0
errors_5xx: int = 0
has_cost: bool = False
@dataclass
@ -35,10 +48,16 @@ class ModelRollup:
cache_write: int
cache_read: int
cost_usd: float
has_cost: bool = False
avg_tokens_per_sec: float | None = None
def _now() -> datetime:
return datetime.now(tz=timezone.utc).astimezone()
def _percentile(values: list[float], pct: float) -> float | None:
if not values:
return None
s = sorted(values)
k = max(0, min(len(s) - 1, int(round((pct / 100.0) * (len(s) - 1)))))
return s[k]
def aggregates(calls: list[LLMCall]) -> AggregateStats:
@ -49,6 +68,15 @@ def aggregates(calls: list[LLMCall]) -> AggregateStats:
current = next(
(c.session_id for c in reversed(calls) if c.session_id is not None), None
)
durations = [c.duration_ms for c in calls if c.duration_ms is not None]
ttfts = [c.ttft_ms for c in calls if c.ttft_ms is not None]
errors_4xx = sum(
1 for c in calls if c.status_code is not None and 400 <= c.status_code < 500
)
errors_5xx = sum(
1 for c in calls if c.status_code is not None and c.status_code >= 500
)
has_cost = any(c.cost_usd is not None for c in calls)
return AggregateStats(
count=len(calls),
total_cost_usd=total_cost,
@ -56,11 +84,22 @@ def aggregates(calls: list[LLMCall]) -> AggregateStats:
total_output_tokens=total_output,
distinct_sessions=len(session_ids),
current_session=current,
p50_latency_ms=_percentile(durations, 50),
p95_latency_ms=_percentile(durations, 95),
p99_latency_ms=_percentile(durations, 99),
p50_ttft_ms=_percentile(ttfts, 50),
p95_ttft_ms=_percentile(ttfts, 95),
p99_ttft_ms=_percentile(ttfts, 99),
error_count=errors_4xx + errors_5xx,
errors_4xx=errors_4xx,
errors_5xx=errors_5xx,
has_cost=has_cost,
)
def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
buckets: dict[str, dict[str, float | int]] = {}
buckets: dict[str, dict[str, float | int | bool]] = {}
tps_samples: dict[str, list[float]] = {}
for c in calls:
key = c.model
b = buckets.setdefault(
@ -72,6 +111,7 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
"cache_write": 0,
"cache_read": 0,
"cost": 0.0,
"has_cost": False,
},
)
b["requests"] = int(b["requests"]) + 1
@ -80,9 +120,16 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
b["cache_write"] = int(b["cache_write"]) + int(c.cache_creation_tokens or 0)
b["cache_read"] = int(b["cache_read"]) + int(c.cached_input_tokens or 0)
b["cost"] = float(b["cost"]) + (c.cost_usd or 0.0)
if c.cost_usd is not None:
b["has_cost"] = True
tps = c.tokens_per_sec
if tps is not None:
tps_samples.setdefault(key, []).append(tps)
rollups: list[ModelRollup] = []
for model, b in buckets.items():
samples = tps_samples.get(model)
avg_tps = (sum(samples) / len(samples)) if samples else None
rollups.append(
ModelRollup(
model=model,
@ -92,34 +139,62 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
cache_write=int(b["cache_write"]),
cache_read=int(b["cache_read"]),
cost_usd=float(b["cost"]),
has_cost=bool(b["has_cost"]),
avg_tokens_per_sec=avg_tps,
)
)
rollups.sort(key=lambda r: r.cost_usd, reverse=True)
rollups.sort(key=lambda r: (r.cost_usd, r.requests), reverse=True)
return rollups
def route_hits(calls: list[LLMCall]) -> list[tuple[str, int, float]]:
@dataclass
class RouteHit:
route: str
hits: int
pct: float
p95_latency_ms: float | None
error_count: int
def route_hits(calls: list[LLMCall]) -> list[RouteHit]:
counts: Counter[str] = Counter()
per_route_latency: dict[str, list[float]] = {}
per_route_errors: dict[str, int] = {}
for c in calls:
if c.route_name:
counts[c.route_name] += 1
if not c.route_name:
continue
counts[c.route_name] += 1
if c.duration_ms is not None:
per_route_latency.setdefault(c.route_name, []).append(c.duration_ms)
if c.status_code is not None and c.status_code >= 400:
per_route_errors[c.route_name] = per_route_errors.get(c.route_name, 0) + 1
total = sum(counts.values())
if total == 0:
return []
return [(r, n, (n / total) * 100.0) for r, n in counts.most_common()]
return [
RouteHit(
route=r,
hits=n,
pct=(n / total) * 100.0,
p95_latency_ms=_percentile(per_route_latency.get(r, []), 95),
error_count=per_route_errors.get(r, 0),
)
for r, n in counts.most_common()
]
def _fmt_cost(v: float | None) -> str:
def _fmt_cost(v: float | None, *, zero: str = "") -> str:
if v is None:
return ""
if v == 0:
return "$0"
# Adaptive precision so tiny costs ($3.8e-5) remain readable.
return zero
if abs(v) < 0.0001:
return f"${v:.8f}".rstrip("0").rstrip(".")
if abs(v) < 0.01:
return f"${v:.6f}".rstrip("0").rstrip(".")
return f"${v:.4f}"
if abs(v) < 1:
return f"${v:.4f}"
return f"${v:,.2f}"
def _fmt_ms(v: float | None) -> str:
@ -142,187 +217,412 @@ def _fmt_tokens(v: int | None) -> str:
return f"{v:,}"
def _request_panel(last: LLMCall | None) -> Panel:
def _fmt_tps(v: float | None) -> str:
if v is None or v <= 0:
return ""
if v >= 100:
return f"{v:.0f}/s"
return f"{v:.1f}/s"
def _latency_style(v: float | None) -> str:
if v is None:
return "dim"
if v < 500:
return "green"
if v < 2000:
return "yellow"
return "red"
def _ttft_style(v: float | None) -> str:
if v is None:
return "dim"
if v < 300:
return "green"
if v < 1000:
return "yellow"
return "red"
def _truncate_model(name: str, limit: int = 32) -> str:
if len(name) <= limit:
return name
return name[: limit - 1] + ""
def _status_text(code: int | None) -> Text:
if code is None:
return Text("", style="dim")
if 200 <= code < 300:
return Text("● ok", style="green")
if 300 <= code < 400:
return Text(f"{code}", style="yellow")
if 400 <= code < 500:
return Text(f"{code}", style="yellow bold")
return Text(f"{code}", style="red bold")
def _summary_panel(last: LLMCall | None, stats: AggregateStats) -> Panel:
# Content-sized columns with a fixed gutter keep the two blocks close
# together instead of stretching across the full terminal on wide screens.
grid = Table.grid(padding=(0, 4))
grid.add_column(no_wrap=True)
grid.add_column(no_wrap=True)
# Left: latest request snapshot.
left = Table.grid(padding=(0, 1))
left.add_column(style="dim", no_wrap=True)
left.add_column(no_wrap=True)
if last is None:
body = Text("no requests yet", style="dim")
left.add_row("latest", Text("waiting for spans…", style="dim italic"))
else:
t = Table.grid(padding=(0, 1))
t.add_column(style="bold cyan")
t.add_column()
t.add_row("Endpoint", "chat/completions")
status = "" if last.status_code is None else str(last.status_code)
t.add_row("Status", status)
t.add_row("Model", last.model)
model_text = Text(_truncate_model(last.model, 48), style="bold cyan")
if last.is_streaming:
model_text.append(" ⟳ stream", style="dim")
left.add_row("model", model_text)
if last.request_model and last.request_model != last.model:
t.add_row("Req model", last.request_model)
left.add_row(
"requested", Text(_truncate_model(last.request_model, 48), style="cyan")
)
if last.route_name:
t.add_row("Route", last.route_name)
body = t
return Panel(body, title="[bold]Request[/]", border_style="cyan", box=SIMPLE)
def _cost_panel(last: LLMCall | None) -> Panel:
if last is None:
body = Text("", style="dim")
else:
t = Table.grid(padding=(0, 1))
t.add_column(style="bold green")
t.add_column()
t.add_row("Request", _fmt_cost(last.cost_usd))
t.add_row("Input", _fmt_tokens(last.prompt_tokens))
t.add_row("Output", _fmt_tokens(last.completion_tokens))
left.add_row("route", Text(last.route_name, style="yellow"))
left.add_row("status", _status_text(last.status_code))
tokens = Text()
tokens.append(_fmt_tokens(last.prompt_tokens))
tokens.append(" in", style="dim")
tokens.append(" · ", style="dim")
tokens.append(_fmt_tokens(last.completion_tokens), style="green")
tokens.append(" out", style="dim")
if last.cached_input_tokens:
t.add_row("Cached", _fmt_tokens(last.cached_input_tokens))
body = t
return Panel(body, title="[bold]Cost[/]", border_style="green", box=SIMPLE)
tokens.append(" · ", style="dim")
tokens.append(_fmt_tokens(last.cached_input_tokens), style="yellow")
tokens.append(" cached", style="dim")
left.add_row("tokens", tokens)
timing = Text()
timing.append("TTFT ", style="dim")
timing.append(_fmt_ms(last.ttft_ms), style=_ttft_style(last.ttft_ms))
timing.append(" · ", style="dim")
timing.append("lat ", style="dim")
timing.append(
_fmt_ms(last.duration_ms), style=_latency_style(last.duration_ms)
)
tps = last.tokens_per_sec
if tps:
timing.append(" · ", style="dim")
timing.append(_fmt_tps(tps), style="green")
left.add_row("timing", timing)
left.add_row("cost", Text(_fmt_cost(last.cost_usd), style="green bold"))
# Right: lifetime totals.
right = Table.grid(padding=(0, 1))
right.add_column(style="dim", no_wrap=True)
right.add_column(no_wrap=True)
right.add_row(
"requests",
Text(f"{stats.count:,}", style="bold"),
)
if stats.error_count:
err_text = Text()
err_text.append(f"{stats.error_count:,}", style="red bold")
parts: list[str] = []
if stats.errors_4xx:
parts.append(f"{stats.errors_4xx} 4xx")
if stats.errors_5xx:
parts.append(f"{stats.errors_5xx} 5xx")
if parts:
err_text.append(f" ({' · '.join(parts)})", style="dim")
right.add_row("errors", err_text)
cost_str = _fmt_cost(stats.total_cost_usd) if stats.has_cost else ""
right.add_row("total cost", Text(cost_str, style="green bold"))
tokens_total = Text()
tokens_total.append(_fmt_tokens(stats.total_input_tokens))
tokens_total.append(" in", style="dim")
tokens_total.append(" · ", style="dim")
tokens_total.append(_fmt_tokens(stats.total_output_tokens), style="green")
tokens_total.append(" out", style="dim")
right.add_row("tokens", tokens_total)
lat_text = Text()
lat_text.append("p50 ", style="dim")
lat_text.append(_fmt_ms(stats.p50_latency_ms), style=_latency_style(stats.p50_latency_ms))
lat_text.append(" · ", style="dim")
lat_text.append("p95 ", style="dim")
lat_text.append(_fmt_ms(stats.p95_latency_ms), style=_latency_style(stats.p95_latency_ms))
lat_text.append(" · ", style="dim")
lat_text.append("p99 ", style="dim")
lat_text.append(_fmt_ms(stats.p99_latency_ms), style=_latency_style(stats.p99_latency_ms))
right.add_row("latency", lat_text)
ttft_text = Text()
ttft_text.append("p50 ", style="dim")
ttft_text.append(_fmt_ms(stats.p50_ttft_ms), style=_ttft_style(stats.p50_ttft_ms))
ttft_text.append(" · ", style="dim")
ttft_text.append("p95 ", style="dim")
ttft_text.append(_fmt_ms(stats.p95_ttft_ms), style=_ttft_style(stats.p95_ttft_ms))
ttft_text.append(" · ", style="dim")
ttft_text.append("p99 ", style="dim")
ttft_text.append(_fmt_ms(stats.p99_ttft_ms), style=_ttft_style(stats.p99_ttft_ms))
right.add_row("TTFT", ttft_text)
sess = Text()
sess.append(f"{stats.distinct_sessions}")
if stats.current_session:
sess.append(" · current ", style="dim")
sess.append(stats.current_session, style="magenta")
right.add_row("sessions", sess)
def _totals_panel(stats: AggregateStats) -> Panel:
t = Table.grid(padding=(0, 1))
t.add_column(style="bold magenta")
t.add_column()
t.add_column(style="bold magenta")
t.add_column()
t.add_row(
"Total cost",
_fmt_cost(stats.total_cost_usd),
"Requests",
str(stats.count),
grid.add_row(left, right)
return Panel(
grid,
title="[bold]live LLM traffic[/]",
border_style="cyan",
box=SIMPLE_HEAVY,
padding=(0, 1),
)
t.add_row(
"Input",
_fmt_tokens(stats.total_input_tokens),
"Output",
_fmt_tokens(stats.total_output_tokens),
)
t.add_row(
"Sessions",
str(stats.distinct_sessions),
"Current session",
stats.current_session or "",
)
return Panel(t, title="[bold]Totals[/]", border_style="magenta", box=SIMPLE)
def _model_rollup_table(rollups: list[ModelRollup]) -> Table:
table = Table(
title="Totals by model",
title="by model",
title_justify="left",
title_style="bold dim",
caption="cost via DigitalOcean Gradient catalog",
caption_justify="left",
caption_style="dim italic",
box=SIMPLE,
header_style="bold",
expand=True,
pad_edge=False,
padding=(0, 1),
)
table.add_column("Model", style="cyan")
table.add_column("Req", justify="right")
table.add_column("Input", justify="right")
table.add_column("Output", justify="right", style="green")
table.add_column("Cache write", justify="right", style="yellow")
table.add_column("Cache read", justify="right", style="yellow")
table.add_column("Cost", justify="right", style="green")
table.add_column("model", style="cyan", no_wrap=True)
table.add_column("req", justify="right")
table.add_column("input", justify="right")
table.add_column("output", justify="right", style="green")
table.add_column("cache wr", justify="right", style="yellow")
table.add_column("cache rd", justify="right", style="yellow")
table.add_column("tok/s", justify="right")
table.add_column("cost", justify="right", style="green")
if not rollups:
table.add_row("", "", "", "", "", "", "")
for r in rollups:
table.add_row(
r.model,
str(r.requests),
Text("no requests yet", style="dim italic"),
*([""] * 7),
)
return table
for r in rollups:
cost_cell = _fmt_cost(r.cost_usd) if r.has_cost else ""
table.add_row(
_truncate_model(r.model),
f"{r.requests:,}",
_fmt_tokens(r.input_tokens),
_fmt_tokens(r.output_tokens),
_fmt_int(r.cache_write),
_fmt_int(r.cache_read),
_fmt_cost(r.cost_usd),
_fmt_tps(r.avg_tokens_per_sec),
cost_cell,
)
return table
def _route_hit_table(hits: list[tuple[str, int, float]]) -> Table:
def _route_hit_table(hits: list[RouteHit]) -> Table:
table = Table(
title="Route hit %",
title="route share",
title_justify="left",
title_style="bold dim",
box=SIMPLE,
header_style="bold",
expand=True,
pad_edge=False,
padding=(0, 1),
)
table.add_column("Route", style="cyan")
table.add_column("Hits", justify="right")
table.add_column("route", style="cyan")
table.add_column("hits", justify="right")
table.add_column("%", justify="right")
for route, n, pct in hits:
table.add_row(route, str(n), f"{pct:.1f}")
table.add_column("p95", justify="right")
table.add_column("err", justify="right")
for h in hits:
err_cell = (
Text(f"{h.error_count:,}", style="red bold") if h.error_count else ""
)
table.add_row(
h.route,
f"{h.hits:,}",
f"{h.pct:5.1f}%",
Text(_fmt_ms(h.p95_latency_ms), style=_latency_style(h.p95_latency_ms)),
err_cell,
)
return table
def _recent_table(calls: list[LLMCall], limit: int = 15) -> Table:
show_route = any(c.route_name for c in calls)
show_cache = any((c.cached_input_tokens or 0) > 0 for c in calls)
show_rsn = any((c.reasoning_tokens or 0) > 0 for c in calls)
caption_parts = ["in·new = fresh prompt tokens"]
if show_cache:
caption_parts.append("in·cache = cached read")
if show_rsn:
caption_parts.append("rsn = reasoning")
caption_parts.append("lat = total latency")
table = Table(
title="Recent requests",
title=f"recent · last {min(limit, len(calls)) if calls else 0}",
title_justify="left",
title_style="bold dim",
caption=" · ".join(caption_parts),
caption_justify="left",
caption_style="dim italic",
box=SIMPLE,
header_style="bold",
expand=True,
pad_edge=False,
padding=(0, 1),
)
table.add_column("time")
table.add_column("model", style="cyan")
table.add_column("time", no_wrap=True)
table.add_column("model", style="cyan", no_wrap=True)
if show_route:
table.add_column("route", style="yellow")
table.add_column("in", justify="right")
table.add_column("cache", justify="right", style="yellow")
table.add_column("route", style="yellow", no_wrap=True)
table.add_column("in·new", justify="right")
if show_cache:
table.add_column("in·cache", justify="right", style="yellow")
table.add_column("out", justify="right", style="green")
table.add_column("rsn", justify="right")
table.add_column("cost", justify="right", style="green")
if show_rsn:
table.add_column("rsn", justify="right")
table.add_column("tok/s", justify="right")
table.add_column("TTFT", justify="right")
table.add_column("lat", justify="right")
table.add_column("st")
table.add_column("cost", justify="right", style="green")
table.add_column("status")
if not calls:
cols = len(table.columns)
table.add_row(
Text("waiting for spans…", style="dim italic"),
*([""] * (cols - 1)),
)
return table
recent = list(reversed(calls))[:limit]
for c in recent:
status_cell = (
"ok"
if c.status_code and 200 <= c.status_code < 400
else str(c.status_code or "")
)
row = [
c.timestamp.strftime("%H:%M:%S"),
c.model,
for idx, c in enumerate(recent):
is_newest = idx == 0
time_style = "bold white" if is_newest else None
model_style = "bold cyan" if is_newest else "cyan"
row: list[object] = [
Text(c.timestamp.strftime("%H:%M:%S"), style=time_style)
if time_style
else c.timestamp.strftime("%H:%M:%S"),
Text(_truncate_model(c.model), style=model_style),
]
if show_route:
row.append(c.route_name or "")
row.append(_fmt_tokens(c.prompt_tokens))
if show_cache:
row.append(_fmt_int(c.cached_input_tokens))
row.append(_fmt_tokens(c.completion_tokens))
if show_rsn:
row.append(_fmt_int(c.reasoning_tokens))
row.extend(
[
_fmt_tokens(c.prompt_tokens),
_fmt_int(c.cached_input_tokens),
_fmt_tokens(c.completion_tokens),
_fmt_int(c.reasoning_tokens),
_fmt_tps(c.tokens_per_sec),
Text(_fmt_ms(c.ttft_ms), style=_ttft_style(c.ttft_ms)),
Text(_fmt_ms(c.duration_ms), style=_latency_style(c.duration_ms)),
_fmt_cost(c.cost_usd),
_fmt_ms(c.ttft_ms),
_fmt_ms(c.duration_ms),
status_cell,
_status_text(c.status_code),
]
)
table.add_row(*row)
if not recent:
table.add_row(*(["no requests yet"] + [""] * (10 if show_route else 9)))
return table
def render(calls: list[LLMCall]) -> Group:
def _last_error(calls: list[LLMCall]) -> LLMCall | None:
for c in reversed(calls):
if c.status_code is not None and c.status_code >= 400:
return c
return None
def _http_reason(code: int) -> str:
try:
return HTTPStatus(code).phrase
except ValueError:
return ""
def _fmt_ago(ts: datetime) -> str:
# `ts` is produced in collector.py via datetime.now(tz=...), but fall back
# gracefully if a naive timestamp ever sneaks in.
now = datetime.now(tz=ts.tzinfo) if ts.tzinfo else datetime.now()
delta = (now - ts).total_seconds()
if delta < 0:
delta = 0
if delta < 60:
return f"{int(delta)}s ago"
if delta < 3600:
return f"{int(delta // 60)}m ago"
return f"{int(delta // 3600)}h ago"
def _error_banner(call: LLMCall) -> Panel:
code = call.status_code or 0
border = "red" if code >= 500 else "yellow"
header = Text()
header.append(f"{code}", style=f"{border} bold")
reason = _http_reason(code)
if reason:
header.append(f" {reason}", style=border)
header.append(" · ", style="dim")
header.append(_truncate_model(call.model, 48), style="cyan")
if call.route_name:
header.append(" · ", style="dim")
header.append(call.route_name, style="yellow")
header.append(" · ", style="dim")
header.append(_fmt_ago(call.timestamp), style="dim")
if call.request_id:
header.append(" · req ", style="dim")
header.append(call.request_id, style="magenta")
return Panel(
header,
title="[bold]last error[/]",
title_align="left",
border_style=border,
box=SIMPLE,
padding=(0, 1),
)
def _footer(stats: AggregateStats) -> Text:
waiting = stats.count == 0
text = Text()
text.append("Ctrl-C ", style="bold")
text.append("exit", style="dim")
text.append(" · OTLP :4317", style="dim")
text.append(" · pricing: DigitalOcean ", style="dim")
if waiting:
text.append("waiting for spans", style="yellow")
text.append(
" — set tracing.opentracing_grpc_endpoint=localhost:4317", style="dim"
)
else:
text.append(f"receiving · {stats.count:,} call(s) buffered", style="green")
return text
def render(calls: list[LLMCall]) -> Align:
last = calls[-1] if calls else None
stats = aggregates(calls)
rollups = model_rollups(calls)
hits = route_hits(calls)
header = Columns(
[_request_panel(last), _cost_panel(last), _totals_panel(stats)],
expand=True,
equal=True,
)
parts = [
header,
_model_rollup_table(rollups),
]
parts: list[object] = [_summary_panel(last, stats)]
err = _last_error(calls)
if err is not None:
parts.append(_error_banner(err))
if hits:
parts.append(_route_hit_table(hits))
split = Table.grid(padding=(0, 2))
split.add_column(no_wrap=False)
split.add_column(no_wrap=False)
split.add_row(_model_rollup_table(rollups), _route_hit_table(hits))
parts.append(split)
else:
parts.append(_model_rollup_table(rollups))
parts.append(_recent_table(calls))
parts.append(
Text(
"q quit · c clear · waiting for spans on OTLP :4317 — brightstaff needs "
"tracing.opentracing_grpc_endpoint=localhost:4317",
style="dim",
)
)
return Group(*parts)
parts.append(_footer(stats))
# Cap overall width so wide terminals don't stretch the layout into a
# mostly-whitespace gap between columns.
return Align.left(Group(*parts), width=MAX_WIDTH)

View file

@ -91,7 +91,12 @@ def convert_legacy_listeners(
"type": "model",
"port": 12000,
"address": "0.0.0.0",
"timeout": "30s",
# LLM streaming responses routinely exceed 30s (extended thinking,
# long tool reasoning, large completions). Match the 300s ceiling
# used by the direct upstream-provider routes so Envoy doesn't
# abort streams with UT mid-response. Users can override via their
# plano_config.yaml `listeners.timeout` field.
"timeout": "300s",
"model_providers": model_providers or [],
}
@ -100,7 +105,7 @@ def convert_legacy_listeners(
"type": "prompt",
"port": 10000,
"address": "0.0.0.0",
"timeout": "30s",
"timeout": "300s",
}
# Handle None case

View file

@ -83,6 +83,49 @@ def test_parse_do_catalog_treats_small_values_as_per_token():
assert prices["openai-gpt-oss-120b"].input_per_token_usd == 1e-7
def test_anthropic_aliases_match_plano_emitted_names():
"""DO publishes 'anthropic-claude-opus-4.7' and 'anthropic-claude-haiku-4.5';
Plano emits 'claude-opus-4-7' and 'claude-haiku-4-5-20251001'. Aliases
registered at parse time should bridge the gap."""
from planoai.obs.pricing import _parse_do_pricing
sample = {
"data": [
{
"model_id": "anthropic-claude-opus-4.7",
"pricing": {
"input_price_per_million": 15.0,
"output_price_per_million": 75.0,
},
},
{
"model_id": "anthropic-claude-haiku-4.5",
"pricing": {
"input_price_per_million": 1.0,
"output_price_per_million": 5.0,
},
},
{
"model_id": "anthropic-claude-4.6-sonnet",
"pricing": {
"input_price_per_million": 3.0,
"output_price_per_million": 15.0,
},
},
]
}
catalog = PricingCatalog(_parse_do_pricing(sample))
# Family-last shapes Plano emits.
assert catalog.price_for("claude-opus-4-7") is not None
assert catalog.price_for("claude-haiku-4-5") is not None
# Date-suffixed name (Anthropic API style).
assert catalog.price_for("claude-haiku-4-5-20251001") is not None
# Word-order swap: DO has 'claude-4.6-sonnet', Plano emits 'claude-sonnet-4-6'.
assert catalog.price_for("claude-sonnet-4-6") is not None
# Original DO ids still resolve.
assert catalog.price_for("anthropic-claude-opus-4.7") is not None
def test_parse_do_catalog_divides_large_values_as_per_million():
"""A provider that genuinely reports $5-per-million in that field gets divided."""
from planoai.obs.pricing import _parse_do_pricing

View file

@ -94,10 +94,10 @@ def test_route_hits_only_for_routed_calls():
]
hits = route_hits(calls)
# Only calls with route names are counted.
assert sum(n for _, n, _ in hits) == 3
hits_by_name = {name: (n, pct) for name, n, pct in hits}
assert hits_by_name["code"][0] == 2
assert hits_by_name["summarization"][0] == 1
assert sum(h.hits for h in hits) == 3
hits_by_name = {h.route: h for h in hits}
assert hits_by_name["code"].hits == 2
assert hits_by_name["summarization"].hits == 1
def test_route_hits_empty_when_no_routes():

View file

@ -100,6 +100,12 @@ impl ExtractedUsage {
/// Try to pull usage out of an accumulated response body.
/// Handles both a single JSON object (non-streaming) and SSE streams where the
/// final `data: {...}` event carries the `usage` field.
///
/// Resolved model is captured independently from usage: many providers (DO
/// Inference Cloud, OpenAI in some modes) emit `model` on every SSE chunk but
/// `usage` only on the terminal one — and that terminal chunk may omit the
/// model field entirely. Scanning both fields separately means we still
/// surface the real upstream model even when usage extraction fails.
fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
if buf.is_empty() {
return ExtractedUsage::default();
@ -113,11 +119,14 @@ fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
}
}
// SSE path: scan from the end for a `data:` line containing a usage object.
let text = match std::str::from_utf8(buf) {
Ok(t) => t,
Err(_) => return ExtractedUsage::default(),
};
let mut out = ExtractedUsage::default();
let mut found_usage = false;
for line in text.lines().rev() {
let trimmed = line.trim_start();
let payload = match trimmed.strip_prefix("data:") {
@ -127,18 +136,50 @@ fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
if payload == "[DONE]" || payload.is_empty() {
continue;
}
if !payload.contains("\"usage\"") {
let has_model_field = out.resolved_model.is_none() && payload.contains("\"model\"");
let has_usage_field = !found_usage && payload.contains("\"usage\"");
if !has_model_field && !has_usage_field {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload) {
let value = match serde_json::from_str::<serde_json::Value>(payload) {
Ok(v) => v,
Err(_) => continue,
};
if has_usage_field {
let u = ExtractedUsage::from_json(&value);
if !u.is_empty() {
return u;
if u.prompt_tokens.is_some()
|| u.completion_tokens.is_some()
|| u.total_tokens.is_some()
{
let prior_model = out.resolved_model.take();
out = u;
if out.resolved_model.is_none() {
out.resolved_model = prior_model;
}
found_usage = true;
if out.resolved_model.is_some() {
break;
}
continue;
}
}
if out.resolved_model.is_none() {
if let Some(model) = value.get("model").and_then(|v| v.as_str()) {
if !model.is_empty() {
out.resolved_model = Some(model.to_string());
if found_usage {
break;
}
}
}
}
}
ExtractedUsage::default()
out
}
/// Trait for processing streaming chunks
@ -634,4 +675,36 @@ data: [DONE]
fn no_usage_in_body_returns_default() {
assert!(extract_usage_from_bytes(br#"{"ok":true}"#).is_empty());
}
#[test]
fn streaming_resolved_model_from_chunk_without_usage() {
// DO Inference Cloud often emits model on every chunk and usage only on
// the last — and the usage chunk itself omits the model field.
let sse = b"data: {\"id\":\"x\",\"model\":\"openai-gpt-5.4\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}
data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":7,\"completion_tokens\":3,\"total_tokens\":10}}
data: [DONE]
";
let u = extract_usage_from_bytes(sse);
assert_eq!(u.prompt_tokens, Some(7));
assert_eq!(u.completion_tokens, Some(3));
assert_eq!(u.resolved_model.as_deref(), Some("openai-gpt-5.4"));
}
#[test]
fn streaming_resolved_model_when_usage_missing() {
// Even without any usage chunk, we should still surface the upstream
// model so the obs view doesn't fall back to the router alias.
let sse = b"data: {\"id\":\"x\",\"model\":\"openai-gpt-5.4\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}
data: [DONE]
";
let u = extract_usage_from_bytes(sse);
assert_eq!(u.prompt_tokens, None);
assert_eq!(u.resolved_model.as_deref(), Some("openai-gpt-5.4"));
assert!(!u.is_empty());
}
}

View file

@ -95,6 +95,7 @@ providers:
anthropic:
- anthropic/claude-sonnet-4-6
- anthropic/claude-opus-4-6
- anthropic/claude-opus-4-7
- anthropic/claude-opus-4-5-20251101
- anthropic/claude-opus-4-5
- anthropic/claude-haiku-4-5-20251001