Fix request closures during long-running streaming

This commit is contained in:
adilhafeez 2026-04-18 18:10:23 -07:00
parent ffea891dba
commit 9c0fc6b3a1
7 changed files with 637 additions and 149 deletions

View file

@ -7,6 +7,7 @@ Single-source: one fetch at startup, cached for the life of the process.
from __future__ import annotations from __future__ import annotations
import logging import logging
import re
import threading import threading
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
@ -123,13 +124,28 @@ class PricingCatalog:
return round(cost, 6) return round(cost, 6)
_DATE_SUFFIX_RE = re.compile(r"-\d{8}$")
_PROVIDER_PREFIXES = ("anthropic", "openai", "google", "meta", "cohere", "mistral")
_ANTHROPIC_FAMILIES = {"opus", "sonnet", "haiku"}
def _model_key_candidates(model_name: str) -> list[str]: def _model_key_candidates(model_name: str) -> list[str]:
"""Lookup-side variants of a Plano-emitted model name.
Plano resolves names like ``claude-haiku-4-5-20251001``; the catalog stores
them as ``anthropic-claude-haiku-4.5``. We strip the date suffix and the
``provider/`` prefix here; the catalog itself registers the dash/dot and
family-order aliases at parse time (see :func:`_expand_aliases`).
"""
base = model_name.strip() base = model_name.strip()
out = [base] out = [base]
if "/" in base: if "/" in base:
out.append(base.split("/", 1)[1]) out.append(base.split("/", 1)[1])
for k in list(out):
stripped = _DATE_SUFFIX_RE.sub("", k)
if stripped != k:
out.append(stripped)
out.extend([v.lower() for v in list(out)]) out.extend([v.lower() for v in list(out)])
# Dedup while preserving order.
seen: set[str] = set() seen: set[str] = set()
uniq = [] uniq = []
for key in out: for key in out:
@ -139,6 +155,54 @@ def _model_key_candidates(model_name: str) -> list[str]:
return uniq return uniq
def _expand_aliases(model_id: str) -> set[str]:
"""Catalog-side variants of a DO model id.
DO publishes Anthropic models under ids like ``anthropic-claude-opus-4.7``
or ``anthropic-claude-4.6-sonnet`` while Plano emits ``claude-opus-4-7`` /
``claude-sonnet-4-6``. Generate a set covering provider-prefix stripping,
dashdot in version segments, and familyversion word order so a single
catalog entry matches every name shape we'll see at lookup.
"""
aliases: set[str] = set()
def add(name: str) -> None:
if not name:
return
aliases.add(name)
aliases.add(name.lower())
add(model_id)
base = model_id
head, _, rest = base.partition("-")
if head.lower() in _PROVIDER_PREFIXES and rest:
add(rest)
base = rest
for key in list(aliases):
if "." in key:
add(key.replace(".", "-"))
parts = base.split("-")
if len(parts) >= 3 and parts[0].lower() == "claude":
rest_parts = parts[1:]
for i, p in enumerate(rest_parts):
if p.lower() in _ANTHROPIC_FAMILIES:
others = rest_parts[:i] + rest_parts[i + 1 :]
if not others:
break
family_last = "claude-" + "-".join(others) + "-" + p
family_first = "claude-" + p + "-" + "-".join(others)
add(family_last)
add(family_first)
add(family_last.replace(".", "-"))
add(family_first.replace(".", "-"))
break
return aliases
def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]: def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]:
"""Parse DO catalog response into a ModelPrice map keyed by model id. """Parse DO catalog response into a ModelPrice map keyed by model id.
@ -204,11 +268,13 @@ def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]:
# rates for promo/open-weight models. # rates for promo/open-weight models.
if input_rate == 0 and output_rate == 0: if input_rate == 0 and output_rate == 0:
continue continue
prices[str(model_id)] = ModelPrice( price = ModelPrice(
input_per_token_usd=input_rate, input_per_token_usd=input_rate,
output_per_token_usd=output_rate, output_per_token_usd=output_rate,
cached_input_per_token_usd=cached_rate, cached_input_per_token_usd=cached_rate,
) )
for alias in _expand_aliases(str(model_id)):
prices.setdefault(alias, price)
return prices return prices

View file

@ -4,15 +4,18 @@ from __future__ import annotations
from collections import Counter from collections import Counter
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime
from http import HTTPStatus
from rich.box import SIMPLE from rich.align import Align
from rich.columns import Columns from rich.box import SIMPLE, SIMPLE_HEAVY
from rich.console import Group from rich.console import Group
from rich.panel import Panel from rich.panel import Panel
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
MAX_WIDTH = 160
from planoai.obs.collector import LLMCall from planoai.obs.collector import LLMCall
@ -24,6 +27,16 @@ class AggregateStats:
total_output_tokens: int total_output_tokens: int
distinct_sessions: int distinct_sessions: int
current_session: str | None current_session: str | None
p50_latency_ms: float | None = None
p95_latency_ms: float | None = None
p99_latency_ms: float | None = None
p50_ttft_ms: float | None = None
p95_ttft_ms: float | None = None
p99_ttft_ms: float | None = None
error_count: int = 0
errors_4xx: int = 0
errors_5xx: int = 0
has_cost: bool = False
@dataclass @dataclass
@ -35,10 +48,16 @@ class ModelRollup:
cache_write: int cache_write: int
cache_read: int cache_read: int
cost_usd: float cost_usd: float
has_cost: bool = False
avg_tokens_per_sec: float | None = None
def _now() -> datetime: def _percentile(values: list[float], pct: float) -> float | None:
return datetime.now(tz=timezone.utc).astimezone() if not values:
return None
s = sorted(values)
k = max(0, min(len(s) - 1, int(round((pct / 100.0) * (len(s) - 1)))))
return s[k]
def aggregates(calls: list[LLMCall]) -> AggregateStats: def aggregates(calls: list[LLMCall]) -> AggregateStats:
@ -49,6 +68,15 @@ def aggregates(calls: list[LLMCall]) -> AggregateStats:
current = next( current = next(
(c.session_id for c in reversed(calls) if c.session_id is not None), None (c.session_id for c in reversed(calls) if c.session_id is not None), None
) )
durations = [c.duration_ms for c in calls if c.duration_ms is not None]
ttfts = [c.ttft_ms for c in calls if c.ttft_ms is not None]
errors_4xx = sum(
1 for c in calls if c.status_code is not None and 400 <= c.status_code < 500
)
errors_5xx = sum(
1 for c in calls if c.status_code is not None and c.status_code >= 500
)
has_cost = any(c.cost_usd is not None for c in calls)
return AggregateStats( return AggregateStats(
count=len(calls), count=len(calls),
total_cost_usd=total_cost, total_cost_usd=total_cost,
@ -56,11 +84,22 @@ def aggregates(calls: list[LLMCall]) -> AggregateStats:
total_output_tokens=total_output, total_output_tokens=total_output,
distinct_sessions=len(session_ids), distinct_sessions=len(session_ids),
current_session=current, current_session=current,
p50_latency_ms=_percentile(durations, 50),
p95_latency_ms=_percentile(durations, 95),
p99_latency_ms=_percentile(durations, 99),
p50_ttft_ms=_percentile(ttfts, 50),
p95_ttft_ms=_percentile(ttfts, 95),
p99_ttft_ms=_percentile(ttfts, 99),
error_count=errors_4xx + errors_5xx,
errors_4xx=errors_4xx,
errors_5xx=errors_5xx,
has_cost=has_cost,
) )
def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]: def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
buckets: dict[str, dict[str, float | int]] = {} buckets: dict[str, dict[str, float | int | bool]] = {}
tps_samples: dict[str, list[float]] = {}
for c in calls: for c in calls:
key = c.model key = c.model
b = buckets.setdefault( b = buckets.setdefault(
@ -72,6 +111,7 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
"cache_write": 0, "cache_write": 0,
"cache_read": 0, "cache_read": 0,
"cost": 0.0, "cost": 0.0,
"has_cost": False,
}, },
) )
b["requests"] = int(b["requests"]) + 1 b["requests"] = int(b["requests"]) + 1
@ -80,9 +120,16 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
b["cache_write"] = int(b["cache_write"]) + int(c.cache_creation_tokens or 0) b["cache_write"] = int(b["cache_write"]) + int(c.cache_creation_tokens or 0)
b["cache_read"] = int(b["cache_read"]) + int(c.cached_input_tokens or 0) b["cache_read"] = int(b["cache_read"]) + int(c.cached_input_tokens or 0)
b["cost"] = float(b["cost"]) + (c.cost_usd or 0.0) b["cost"] = float(b["cost"]) + (c.cost_usd or 0.0)
if c.cost_usd is not None:
b["has_cost"] = True
tps = c.tokens_per_sec
if tps is not None:
tps_samples.setdefault(key, []).append(tps)
rollups: list[ModelRollup] = [] rollups: list[ModelRollup] = []
for model, b in buckets.items(): for model, b in buckets.items():
samples = tps_samples.get(model)
avg_tps = (sum(samples) / len(samples)) if samples else None
rollups.append( rollups.append(
ModelRollup( ModelRollup(
model=model, model=model,
@ -92,34 +139,62 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
cache_write=int(b["cache_write"]), cache_write=int(b["cache_write"]),
cache_read=int(b["cache_read"]), cache_read=int(b["cache_read"]),
cost_usd=float(b["cost"]), cost_usd=float(b["cost"]),
has_cost=bool(b["has_cost"]),
avg_tokens_per_sec=avg_tps,
) )
) )
rollups.sort(key=lambda r: r.cost_usd, reverse=True) rollups.sort(key=lambda r: (r.cost_usd, r.requests), reverse=True)
return rollups return rollups
def route_hits(calls: list[LLMCall]) -> list[tuple[str, int, float]]: @dataclass
class RouteHit:
route: str
hits: int
pct: float
p95_latency_ms: float | None
error_count: int
def route_hits(calls: list[LLMCall]) -> list[RouteHit]:
counts: Counter[str] = Counter() counts: Counter[str] = Counter()
per_route_latency: dict[str, list[float]] = {}
per_route_errors: dict[str, int] = {}
for c in calls: for c in calls:
if c.route_name: if not c.route_name:
counts[c.route_name] += 1 continue
counts[c.route_name] += 1
if c.duration_ms is not None:
per_route_latency.setdefault(c.route_name, []).append(c.duration_ms)
if c.status_code is not None and c.status_code >= 400:
per_route_errors[c.route_name] = per_route_errors.get(c.route_name, 0) + 1
total = sum(counts.values()) total = sum(counts.values())
if total == 0: if total == 0:
return [] return []
return [(r, n, (n / total) * 100.0) for r, n in counts.most_common()] return [
RouteHit(
route=r,
hits=n,
pct=(n / total) * 100.0,
p95_latency_ms=_percentile(per_route_latency.get(r, []), 95),
error_count=per_route_errors.get(r, 0),
)
for r, n in counts.most_common()
]
def _fmt_cost(v: float | None) -> str: def _fmt_cost(v: float | None, *, zero: str = "") -> str:
if v is None: if v is None:
return "" return ""
if v == 0: if v == 0:
return "$0" return zero
# Adaptive precision so tiny costs ($3.8e-5) remain readable.
if abs(v) < 0.0001: if abs(v) < 0.0001:
return f"${v:.8f}".rstrip("0").rstrip(".") return f"${v:.8f}".rstrip("0").rstrip(".")
if abs(v) < 0.01: if abs(v) < 0.01:
return f"${v:.6f}".rstrip("0").rstrip(".") return f"${v:.6f}".rstrip("0").rstrip(".")
return f"${v:.4f}" if abs(v) < 1:
return f"${v:.4f}"
return f"${v:,.2f}"
def _fmt_ms(v: float | None) -> str: def _fmt_ms(v: float | None) -> str:
@ -142,187 +217,412 @@ def _fmt_tokens(v: int | None) -> str:
return f"{v:,}" return f"{v:,}"
def _request_panel(last: LLMCall | None) -> Panel: def _fmt_tps(v: float | None) -> str:
if v is None or v <= 0:
return ""
if v >= 100:
return f"{v:.0f}/s"
return f"{v:.1f}/s"
def _latency_style(v: float | None) -> str:
if v is None:
return "dim"
if v < 500:
return "green"
if v < 2000:
return "yellow"
return "red"
def _ttft_style(v: float | None) -> str:
if v is None:
return "dim"
if v < 300:
return "green"
if v < 1000:
return "yellow"
return "red"
def _truncate_model(name: str, limit: int = 32) -> str:
if len(name) <= limit:
return name
return name[: limit - 1] + ""
def _status_text(code: int | None) -> Text:
if code is None:
return Text("", style="dim")
if 200 <= code < 300:
return Text("● ok", style="green")
if 300 <= code < 400:
return Text(f"{code}", style="yellow")
if 400 <= code < 500:
return Text(f"{code}", style="yellow bold")
return Text(f"{code}", style="red bold")
def _summary_panel(last: LLMCall | None, stats: AggregateStats) -> Panel:
# Content-sized columns with a fixed gutter keep the two blocks close
# together instead of stretching across the full terminal on wide screens.
grid = Table.grid(padding=(0, 4))
grid.add_column(no_wrap=True)
grid.add_column(no_wrap=True)
# Left: latest request snapshot.
left = Table.grid(padding=(0, 1))
left.add_column(style="dim", no_wrap=True)
left.add_column(no_wrap=True)
if last is None: if last is None:
body = Text("no requests yet", style="dim") left.add_row("latest", Text("waiting for spans…", style="dim italic"))
else: else:
t = Table.grid(padding=(0, 1)) model_text = Text(_truncate_model(last.model, 48), style="bold cyan")
t.add_column(style="bold cyan") if last.is_streaming:
t.add_column() model_text.append(" ⟳ stream", style="dim")
t.add_row("Endpoint", "chat/completions") left.add_row("model", model_text)
status = "" if last.status_code is None else str(last.status_code)
t.add_row("Status", status)
t.add_row("Model", last.model)
if last.request_model and last.request_model != last.model: if last.request_model and last.request_model != last.model:
t.add_row("Req model", last.request_model) left.add_row(
"requested", Text(_truncate_model(last.request_model, 48), style="cyan")
)
if last.route_name: if last.route_name:
t.add_row("Route", last.route_name) left.add_row("route", Text(last.route_name, style="yellow"))
body = t left.add_row("status", _status_text(last.status_code))
return Panel(body, title="[bold]Request[/]", border_style="cyan", box=SIMPLE) tokens = Text()
tokens.append(_fmt_tokens(last.prompt_tokens))
tokens.append(" in", style="dim")
def _cost_panel(last: LLMCall | None) -> Panel: tokens.append(" · ", style="dim")
if last is None: tokens.append(_fmt_tokens(last.completion_tokens), style="green")
body = Text("", style="dim") tokens.append(" out", style="dim")
else:
t = Table.grid(padding=(0, 1))
t.add_column(style="bold green")
t.add_column()
t.add_row("Request", _fmt_cost(last.cost_usd))
t.add_row("Input", _fmt_tokens(last.prompt_tokens))
t.add_row("Output", _fmt_tokens(last.completion_tokens))
if last.cached_input_tokens: if last.cached_input_tokens:
t.add_row("Cached", _fmt_tokens(last.cached_input_tokens)) tokens.append(" · ", style="dim")
body = t tokens.append(_fmt_tokens(last.cached_input_tokens), style="yellow")
return Panel(body, title="[bold]Cost[/]", border_style="green", box=SIMPLE) tokens.append(" cached", style="dim")
left.add_row("tokens", tokens)
timing = Text()
timing.append("TTFT ", style="dim")
timing.append(_fmt_ms(last.ttft_ms), style=_ttft_style(last.ttft_ms))
timing.append(" · ", style="dim")
timing.append("lat ", style="dim")
timing.append(
_fmt_ms(last.duration_ms), style=_latency_style(last.duration_ms)
)
tps = last.tokens_per_sec
if tps:
timing.append(" · ", style="dim")
timing.append(_fmt_tps(tps), style="green")
left.add_row("timing", timing)
left.add_row("cost", Text(_fmt_cost(last.cost_usd), style="green bold"))
# Right: lifetime totals.
right = Table.grid(padding=(0, 1))
right.add_column(style="dim", no_wrap=True)
right.add_column(no_wrap=True)
right.add_row(
"requests",
Text(f"{stats.count:,}", style="bold"),
)
if stats.error_count:
err_text = Text()
err_text.append(f"{stats.error_count:,}", style="red bold")
parts: list[str] = []
if stats.errors_4xx:
parts.append(f"{stats.errors_4xx} 4xx")
if stats.errors_5xx:
parts.append(f"{stats.errors_5xx} 5xx")
if parts:
err_text.append(f" ({' · '.join(parts)})", style="dim")
right.add_row("errors", err_text)
cost_str = _fmt_cost(stats.total_cost_usd) if stats.has_cost else ""
right.add_row("total cost", Text(cost_str, style="green bold"))
tokens_total = Text()
tokens_total.append(_fmt_tokens(stats.total_input_tokens))
tokens_total.append(" in", style="dim")
tokens_total.append(" · ", style="dim")
tokens_total.append(_fmt_tokens(stats.total_output_tokens), style="green")
tokens_total.append(" out", style="dim")
right.add_row("tokens", tokens_total)
lat_text = Text()
lat_text.append("p50 ", style="dim")
lat_text.append(_fmt_ms(stats.p50_latency_ms), style=_latency_style(stats.p50_latency_ms))
lat_text.append(" · ", style="dim")
lat_text.append("p95 ", style="dim")
lat_text.append(_fmt_ms(stats.p95_latency_ms), style=_latency_style(stats.p95_latency_ms))
lat_text.append(" · ", style="dim")
lat_text.append("p99 ", style="dim")
lat_text.append(_fmt_ms(stats.p99_latency_ms), style=_latency_style(stats.p99_latency_ms))
right.add_row("latency", lat_text)
ttft_text = Text()
ttft_text.append("p50 ", style="dim")
ttft_text.append(_fmt_ms(stats.p50_ttft_ms), style=_ttft_style(stats.p50_ttft_ms))
ttft_text.append(" · ", style="dim")
ttft_text.append("p95 ", style="dim")
ttft_text.append(_fmt_ms(stats.p95_ttft_ms), style=_ttft_style(stats.p95_ttft_ms))
ttft_text.append(" · ", style="dim")
ttft_text.append("p99 ", style="dim")
ttft_text.append(_fmt_ms(stats.p99_ttft_ms), style=_ttft_style(stats.p99_ttft_ms))
right.add_row("TTFT", ttft_text)
sess = Text()
sess.append(f"{stats.distinct_sessions}")
if stats.current_session:
sess.append(" · current ", style="dim")
sess.append(stats.current_session, style="magenta")
right.add_row("sessions", sess)
def _totals_panel(stats: AggregateStats) -> Panel: grid.add_row(left, right)
t = Table.grid(padding=(0, 1)) return Panel(
t.add_column(style="bold magenta") grid,
t.add_column() title="[bold]live LLM traffic[/]",
t.add_column(style="bold magenta") border_style="cyan",
t.add_column() box=SIMPLE_HEAVY,
t.add_row( padding=(0, 1),
"Total cost",
_fmt_cost(stats.total_cost_usd),
"Requests",
str(stats.count),
) )
t.add_row(
"Input",
_fmt_tokens(stats.total_input_tokens),
"Output",
_fmt_tokens(stats.total_output_tokens),
)
t.add_row(
"Sessions",
str(stats.distinct_sessions),
"Current session",
stats.current_session or "",
)
return Panel(t, title="[bold]Totals[/]", border_style="magenta", box=SIMPLE)
def _model_rollup_table(rollups: list[ModelRollup]) -> Table: def _model_rollup_table(rollups: list[ModelRollup]) -> Table:
table = Table( table = Table(
title="Totals by model", title="by model",
title_justify="left",
title_style="bold dim",
caption="cost via DigitalOcean Gradient catalog",
caption_justify="left",
caption_style="dim italic",
box=SIMPLE, box=SIMPLE,
header_style="bold", header_style="bold",
expand=True, pad_edge=False,
padding=(0, 1),
) )
table.add_column("Model", style="cyan") table.add_column("model", style="cyan", no_wrap=True)
table.add_column("Req", justify="right") table.add_column("req", justify="right")
table.add_column("Input", justify="right") table.add_column("input", justify="right")
table.add_column("Output", justify="right", style="green") table.add_column("output", justify="right", style="green")
table.add_column("Cache write", justify="right", style="yellow") table.add_column("cache wr", justify="right", style="yellow")
table.add_column("Cache read", justify="right", style="yellow") table.add_column("cache rd", justify="right", style="yellow")
table.add_column("Cost", justify="right", style="green") table.add_column("tok/s", justify="right")
table.add_column("cost", justify="right", style="green")
if not rollups: if not rollups:
table.add_row("", "", "", "", "", "", "")
for r in rollups:
table.add_row( table.add_row(
r.model, Text("no requests yet", style="dim italic"),
str(r.requests), *([""] * 7),
)
return table
for r in rollups:
cost_cell = _fmt_cost(r.cost_usd) if r.has_cost else ""
table.add_row(
_truncate_model(r.model),
f"{r.requests:,}",
_fmt_tokens(r.input_tokens), _fmt_tokens(r.input_tokens),
_fmt_tokens(r.output_tokens), _fmt_tokens(r.output_tokens),
_fmt_int(r.cache_write), _fmt_int(r.cache_write),
_fmt_int(r.cache_read), _fmt_int(r.cache_read),
_fmt_cost(r.cost_usd), _fmt_tps(r.avg_tokens_per_sec),
cost_cell,
) )
return table return table
def _route_hit_table(hits: list[tuple[str, int, float]]) -> Table: def _route_hit_table(hits: list[RouteHit]) -> Table:
table = Table( table = Table(
title="Route hit %", title="route share",
title_justify="left",
title_style="bold dim",
box=SIMPLE, box=SIMPLE,
header_style="bold", header_style="bold",
expand=True, pad_edge=False,
padding=(0, 1),
) )
table.add_column("Route", style="cyan") table.add_column("route", style="cyan")
table.add_column("Hits", justify="right") table.add_column("hits", justify="right")
table.add_column("%", justify="right") table.add_column("%", justify="right")
for route, n, pct in hits: table.add_column("p95", justify="right")
table.add_row(route, str(n), f"{pct:.1f}") table.add_column("err", justify="right")
for h in hits:
err_cell = (
Text(f"{h.error_count:,}", style="red bold") if h.error_count else ""
)
table.add_row(
h.route,
f"{h.hits:,}",
f"{h.pct:5.1f}%",
Text(_fmt_ms(h.p95_latency_ms), style=_latency_style(h.p95_latency_ms)),
err_cell,
)
return table return table
def _recent_table(calls: list[LLMCall], limit: int = 15) -> Table: def _recent_table(calls: list[LLMCall], limit: int = 15) -> Table:
show_route = any(c.route_name for c in calls) show_route = any(c.route_name for c in calls)
show_cache = any((c.cached_input_tokens or 0) > 0 for c in calls)
show_rsn = any((c.reasoning_tokens or 0) > 0 for c in calls)
caption_parts = ["in·new = fresh prompt tokens"]
if show_cache:
caption_parts.append("in·cache = cached read")
if show_rsn:
caption_parts.append("rsn = reasoning")
caption_parts.append("lat = total latency")
table = Table( table = Table(
title="Recent requests", title=f"recent · last {min(limit, len(calls)) if calls else 0}",
title_justify="left",
title_style="bold dim",
caption=" · ".join(caption_parts),
caption_justify="left",
caption_style="dim italic",
box=SIMPLE, box=SIMPLE,
header_style="bold", header_style="bold",
expand=True, pad_edge=False,
padding=(0, 1),
) )
table.add_column("time") table.add_column("time", no_wrap=True)
table.add_column("model", style="cyan") table.add_column("model", style="cyan", no_wrap=True)
if show_route: if show_route:
table.add_column("route", style="yellow") table.add_column("route", style="yellow", no_wrap=True)
table.add_column("in", justify="right") table.add_column("in·new", justify="right")
table.add_column("cache", justify="right", style="yellow") if show_cache:
table.add_column("in·cache", justify="right", style="yellow")
table.add_column("out", justify="right", style="green") table.add_column("out", justify="right", style="green")
table.add_column("rsn", justify="right") if show_rsn:
table.add_column("cost", justify="right", style="green") table.add_column("rsn", justify="right")
table.add_column("tok/s", justify="right")
table.add_column("TTFT", justify="right") table.add_column("TTFT", justify="right")
table.add_column("lat", justify="right") table.add_column("lat", justify="right")
table.add_column("st") table.add_column("cost", justify="right", style="green")
table.add_column("status")
if not calls:
cols = len(table.columns)
table.add_row(
Text("waiting for spans…", style="dim italic"),
*([""] * (cols - 1)),
)
return table
recent = list(reversed(calls))[:limit] recent = list(reversed(calls))[:limit]
for c in recent: for idx, c in enumerate(recent):
status_cell = ( is_newest = idx == 0
"ok" time_style = "bold white" if is_newest else None
if c.status_code and 200 <= c.status_code < 400 model_style = "bold cyan" if is_newest else "cyan"
else str(c.status_code or "") row: list[object] = [
) Text(c.timestamp.strftime("%H:%M:%S"), style=time_style)
row = [ if time_style
c.timestamp.strftime("%H:%M:%S"), else c.timestamp.strftime("%H:%M:%S"),
c.model, Text(_truncate_model(c.model), style=model_style),
] ]
if show_route: if show_route:
row.append(c.route_name or "") row.append(c.route_name or "")
row.append(_fmt_tokens(c.prompt_tokens))
if show_cache:
row.append(_fmt_int(c.cached_input_tokens))
row.append(_fmt_tokens(c.completion_tokens))
if show_rsn:
row.append(_fmt_int(c.reasoning_tokens))
row.extend( row.extend(
[ [
_fmt_tokens(c.prompt_tokens), _fmt_tps(c.tokens_per_sec),
_fmt_int(c.cached_input_tokens), Text(_fmt_ms(c.ttft_ms), style=_ttft_style(c.ttft_ms)),
_fmt_tokens(c.completion_tokens), Text(_fmt_ms(c.duration_ms), style=_latency_style(c.duration_ms)),
_fmt_int(c.reasoning_tokens),
_fmt_cost(c.cost_usd), _fmt_cost(c.cost_usd),
_fmt_ms(c.ttft_ms), _status_text(c.status_code),
_fmt_ms(c.duration_ms),
status_cell,
] ]
) )
table.add_row(*row) table.add_row(*row)
if not recent:
table.add_row(*(["no requests yet"] + [""] * (10 if show_route else 9)))
return table return table
def render(calls: list[LLMCall]) -> Group: def _last_error(calls: list[LLMCall]) -> LLMCall | None:
for c in reversed(calls):
if c.status_code is not None and c.status_code >= 400:
return c
return None
def _http_reason(code: int) -> str:
try:
return HTTPStatus(code).phrase
except ValueError:
return ""
def _fmt_ago(ts: datetime) -> str:
# `ts` is produced in collector.py via datetime.now(tz=...), but fall back
# gracefully if a naive timestamp ever sneaks in.
now = datetime.now(tz=ts.tzinfo) if ts.tzinfo else datetime.now()
delta = (now - ts).total_seconds()
if delta < 0:
delta = 0
if delta < 60:
return f"{int(delta)}s ago"
if delta < 3600:
return f"{int(delta // 60)}m ago"
return f"{int(delta // 3600)}h ago"
def _error_banner(call: LLMCall) -> Panel:
code = call.status_code or 0
border = "red" if code >= 500 else "yellow"
header = Text()
header.append(f"{code}", style=f"{border} bold")
reason = _http_reason(code)
if reason:
header.append(f" {reason}", style=border)
header.append(" · ", style="dim")
header.append(_truncate_model(call.model, 48), style="cyan")
if call.route_name:
header.append(" · ", style="dim")
header.append(call.route_name, style="yellow")
header.append(" · ", style="dim")
header.append(_fmt_ago(call.timestamp), style="dim")
if call.request_id:
header.append(" · req ", style="dim")
header.append(call.request_id, style="magenta")
return Panel(
header,
title="[bold]last error[/]",
title_align="left",
border_style=border,
box=SIMPLE,
padding=(0, 1),
)
def _footer(stats: AggregateStats) -> Text:
waiting = stats.count == 0
text = Text()
text.append("Ctrl-C ", style="bold")
text.append("exit", style="dim")
text.append(" · OTLP :4317", style="dim")
text.append(" · pricing: DigitalOcean ", style="dim")
if waiting:
text.append("waiting for spans", style="yellow")
text.append(
" — set tracing.opentracing_grpc_endpoint=localhost:4317", style="dim"
)
else:
text.append(f"receiving · {stats.count:,} call(s) buffered", style="green")
return text
def render(calls: list[LLMCall]) -> Align:
last = calls[-1] if calls else None last = calls[-1] if calls else None
stats = aggregates(calls) stats = aggregates(calls)
rollups = model_rollups(calls) rollups = model_rollups(calls)
hits = route_hits(calls) hits = route_hits(calls)
header = Columns( parts: list[object] = [_summary_panel(last, stats)]
[_request_panel(last), _cost_panel(last), _totals_panel(stats)], err = _last_error(calls)
expand=True, if err is not None:
equal=True, parts.append(_error_banner(err))
)
parts = [
header,
_model_rollup_table(rollups),
]
if hits: if hits:
parts.append(_route_hit_table(hits)) split = Table.grid(padding=(0, 2))
split.add_column(no_wrap=False)
split.add_column(no_wrap=False)
split.add_row(_model_rollup_table(rollups), _route_hit_table(hits))
parts.append(split)
else:
parts.append(_model_rollup_table(rollups))
parts.append(_recent_table(calls)) parts.append(_recent_table(calls))
parts.append( parts.append(_footer(stats))
Text( # Cap overall width so wide terminals don't stretch the layout into a
"q quit · c clear · waiting for spans on OTLP :4317 — brightstaff needs " # mostly-whitespace gap between columns.
"tracing.opentracing_grpc_endpoint=localhost:4317", return Align.left(Group(*parts), width=MAX_WIDTH)
style="dim",
)
)
return Group(*parts)

View file

@ -91,7 +91,12 @@ def convert_legacy_listeners(
"type": "model", "type": "model",
"port": 12000, "port": 12000,
"address": "0.0.0.0", "address": "0.0.0.0",
"timeout": "30s", # LLM streaming responses routinely exceed 30s (extended thinking,
# long tool reasoning, large completions). Match the 300s ceiling
# used by the direct upstream-provider routes so Envoy doesn't
# abort streams with UT mid-response. Users can override via their
# plano_config.yaml `listeners.timeout` field.
"timeout": "300s",
"model_providers": model_providers or [], "model_providers": model_providers or [],
} }
@ -100,7 +105,7 @@ def convert_legacy_listeners(
"type": "prompt", "type": "prompt",
"port": 10000, "port": 10000,
"address": "0.0.0.0", "address": "0.0.0.0",
"timeout": "30s", "timeout": "300s",
} }
# Handle None case # Handle None case

View file

@ -83,6 +83,49 @@ def test_parse_do_catalog_treats_small_values_as_per_token():
assert prices["openai-gpt-oss-120b"].input_per_token_usd == 1e-7 assert prices["openai-gpt-oss-120b"].input_per_token_usd == 1e-7
def test_anthropic_aliases_match_plano_emitted_names():
"""DO publishes 'anthropic-claude-opus-4.7' and 'anthropic-claude-haiku-4.5';
Plano emits 'claude-opus-4-7' and 'claude-haiku-4-5-20251001'. Aliases
registered at parse time should bridge the gap."""
from planoai.obs.pricing import _parse_do_pricing
sample = {
"data": [
{
"model_id": "anthropic-claude-opus-4.7",
"pricing": {
"input_price_per_million": 15.0,
"output_price_per_million": 75.0,
},
},
{
"model_id": "anthropic-claude-haiku-4.5",
"pricing": {
"input_price_per_million": 1.0,
"output_price_per_million": 5.0,
},
},
{
"model_id": "anthropic-claude-4.6-sonnet",
"pricing": {
"input_price_per_million": 3.0,
"output_price_per_million": 15.0,
},
},
]
}
catalog = PricingCatalog(_parse_do_pricing(sample))
# Family-last shapes Plano emits.
assert catalog.price_for("claude-opus-4-7") is not None
assert catalog.price_for("claude-haiku-4-5") is not None
# Date-suffixed name (Anthropic API style).
assert catalog.price_for("claude-haiku-4-5-20251001") is not None
# Word-order swap: DO has 'claude-4.6-sonnet', Plano emits 'claude-sonnet-4-6'.
assert catalog.price_for("claude-sonnet-4-6") is not None
# Original DO ids still resolve.
assert catalog.price_for("anthropic-claude-opus-4.7") is not None
def test_parse_do_catalog_divides_large_values_as_per_million(): def test_parse_do_catalog_divides_large_values_as_per_million():
"""A provider that genuinely reports $5-per-million in that field gets divided.""" """A provider that genuinely reports $5-per-million in that field gets divided."""
from planoai.obs.pricing import _parse_do_pricing from planoai.obs.pricing import _parse_do_pricing

View file

@ -94,10 +94,10 @@ def test_route_hits_only_for_routed_calls():
] ]
hits = route_hits(calls) hits = route_hits(calls)
# Only calls with route names are counted. # Only calls with route names are counted.
assert sum(n for _, n, _ in hits) == 3 assert sum(h.hits for h in hits) == 3
hits_by_name = {name: (n, pct) for name, n, pct in hits} hits_by_name = {h.route: h for h in hits}
assert hits_by_name["code"][0] == 2 assert hits_by_name["code"].hits == 2
assert hits_by_name["summarization"][0] == 1 assert hits_by_name["summarization"].hits == 1
def test_route_hits_empty_when_no_routes(): def test_route_hits_empty_when_no_routes():

View file

@ -100,6 +100,12 @@ impl ExtractedUsage {
/// Try to pull usage out of an accumulated response body. /// Try to pull usage out of an accumulated response body.
/// Handles both a single JSON object (non-streaming) and SSE streams where the /// Handles both a single JSON object (non-streaming) and SSE streams where the
/// final `data: {...}` event carries the `usage` field. /// final `data: {...}` event carries the `usage` field.
///
/// Resolved model is captured independently from usage: many providers (DO
/// Inference Cloud, OpenAI in some modes) emit `model` on every SSE chunk but
/// `usage` only on the terminal one — and that terminal chunk may omit the
/// model field entirely. Scanning both fields separately means we still
/// surface the real upstream model even when usage extraction fails.
fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage { fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
if buf.is_empty() { if buf.is_empty() {
return ExtractedUsage::default(); return ExtractedUsage::default();
@ -113,11 +119,14 @@ fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
} }
} }
// SSE path: scan from the end for a `data:` line containing a usage object.
let text = match std::str::from_utf8(buf) { let text = match std::str::from_utf8(buf) {
Ok(t) => t, Ok(t) => t,
Err(_) => return ExtractedUsage::default(), Err(_) => return ExtractedUsage::default(),
}; };
let mut out = ExtractedUsage::default();
let mut found_usage = false;
for line in text.lines().rev() { for line in text.lines().rev() {
let trimmed = line.trim_start(); let trimmed = line.trim_start();
let payload = match trimmed.strip_prefix("data:") { let payload = match trimmed.strip_prefix("data:") {
@ -127,18 +136,50 @@ fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
if payload == "[DONE]" || payload.is_empty() { if payload == "[DONE]" || payload.is_empty() {
continue; continue;
} }
if !payload.contains("\"usage\"") {
let has_model_field = out.resolved_model.is_none() && payload.contains("\"model\"");
let has_usage_field = !found_usage && payload.contains("\"usage\"");
if !has_model_field && !has_usage_field {
continue; continue;
} }
if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload) {
let value = match serde_json::from_str::<serde_json::Value>(payload) {
Ok(v) => v,
Err(_) => continue,
};
if has_usage_field {
let u = ExtractedUsage::from_json(&value); let u = ExtractedUsage::from_json(&value);
if !u.is_empty() { if u.prompt_tokens.is_some()
return u; || u.completion_tokens.is_some()
|| u.total_tokens.is_some()
{
let prior_model = out.resolved_model.take();
out = u;
if out.resolved_model.is_none() {
out.resolved_model = prior_model;
}
found_usage = true;
if out.resolved_model.is_some() {
break;
}
continue;
}
}
if out.resolved_model.is_none() {
if let Some(model) = value.get("model").and_then(|v| v.as_str()) {
if !model.is_empty() {
out.resolved_model = Some(model.to_string());
if found_usage {
break;
}
}
} }
} }
} }
ExtractedUsage::default() out
} }
/// Trait for processing streaming chunks /// Trait for processing streaming chunks
@ -634,4 +675,36 @@ data: [DONE]
fn no_usage_in_body_returns_default() { fn no_usage_in_body_returns_default() {
assert!(extract_usage_from_bytes(br#"{"ok":true}"#).is_empty()); assert!(extract_usage_from_bytes(br#"{"ok":true}"#).is_empty());
} }
#[test]
fn streaming_resolved_model_from_chunk_without_usage() {
// DO Inference Cloud often emits model on every chunk and usage only on
// the last — and the usage chunk itself omits the model field.
let sse = b"data: {\"id\":\"x\",\"model\":\"openai-gpt-5.4\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}
data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":7,\"completion_tokens\":3,\"total_tokens\":10}}
data: [DONE]
";
let u = extract_usage_from_bytes(sse);
assert_eq!(u.prompt_tokens, Some(7));
assert_eq!(u.completion_tokens, Some(3));
assert_eq!(u.resolved_model.as_deref(), Some("openai-gpt-5.4"));
}
#[test]
fn streaming_resolved_model_when_usage_missing() {
// Even without any usage chunk, we should still surface the upstream
// model so the obs view doesn't fall back to the router alias.
let sse = b"data: {\"id\":\"x\",\"model\":\"openai-gpt-5.4\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}
data: [DONE]
";
let u = extract_usage_from_bytes(sse);
assert_eq!(u.prompt_tokens, None);
assert_eq!(u.resolved_model.as_deref(), Some("openai-gpt-5.4"));
assert!(!u.is_empty());
}
} }

View file

@ -95,6 +95,7 @@ providers:
anthropic: anthropic:
- anthropic/claude-sonnet-4-6 - anthropic/claude-sonnet-4-6
- anthropic/claude-opus-4-6 - anthropic/claude-opus-4-6
- anthropic/claude-opus-4-7
- anthropic/claude-opus-4-5-20251101 - anthropic/claude-opus-4-5-20251101
- anthropic/claude-opus-4-5 - anthropic/claude-opus-4-5
- anthropic/claude-haiku-4-5-20251001 - anthropic/claude-haiku-4-5-20251001