diff --git a/cli/planoai/obs/pricing.py b/cli/planoai/obs/pricing.py index 19eb1297..6f2ce5b4 100644 --- a/cli/planoai/obs/pricing.py +++ b/cli/planoai/obs/pricing.py @@ -7,6 +7,7 @@ Single-source: one fetch at startup, cached for the life of the process. from __future__ import annotations import logging +import re import threading from dataclasses import dataclass from typing import Any @@ -123,13 +124,28 @@ class PricingCatalog: return round(cost, 6) +_DATE_SUFFIX_RE = re.compile(r"-\d{8}$") +_PROVIDER_PREFIXES = ("anthropic", "openai", "google", "meta", "cohere", "mistral") +_ANTHROPIC_FAMILIES = {"opus", "sonnet", "haiku"} + + def _model_key_candidates(model_name: str) -> list[str]: + """Lookup-side variants of a Plano-emitted model name. + + Plano resolves names like ``claude-haiku-4-5-20251001``; the catalog stores + them as ``anthropic-claude-haiku-4.5``. We strip the date suffix and the + ``provider/`` prefix here; the catalog itself registers the dash/dot and + family-order aliases at parse time (see :func:`_expand_aliases`). + """ base = model_name.strip() out = [base] if "/" in base: out.append(base.split("/", 1)[1]) + for k in list(out): + stripped = _DATE_SUFFIX_RE.sub("", k) + if stripped != k: + out.append(stripped) out.extend([v.lower() for v in list(out)]) - # Dedup while preserving order. seen: set[str] = set() uniq = [] for key in out: @@ -139,6 +155,54 @@ def _model_key_candidates(model_name: str) -> list[str]: return uniq +def _expand_aliases(model_id: str) -> set[str]: + """Catalog-side variants of a DO model id. + + DO publishes Anthropic models under ids like ``anthropic-claude-opus-4.7`` + or ``anthropic-claude-4.6-sonnet`` while Plano emits ``claude-opus-4-7`` / + ``claude-sonnet-4-6``. Generate a set covering provider-prefix stripping, + dash↔dot in version segments, and family↔version word order so a single + catalog entry matches every name shape we'll see at lookup. + """ + aliases: set[str] = set() + + def add(name: str) -> None: + if not name: + return + aliases.add(name) + aliases.add(name.lower()) + + add(model_id) + + base = model_id + head, _, rest = base.partition("-") + if head.lower() in _PROVIDER_PREFIXES and rest: + add(rest) + base = rest + + for key in list(aliases): + if "." in key: + add(key.replace(".", "-")) + + parts = base.split("-") + if len(parts) >= 3 and parts[0].lower() == "claude": + rest_parts = parts[1:] + for i, p in enumerate(rest_parts): + if p.lower() in _ANTHROPIC_FAMILIES: + others = rest_parts[:i] + rest_parts[i + 1 :] + if not others: + break + family_last = "claude-" + "-".join(others) + "-" + p + family_first = "claude-" + p + "-" + "-".join(others) + add(family_last) + add(family_first) + add(family_last.replace(".", "-")) + add(family_first.replace(".", "-")) + break + + return aliases + + def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]: """Parse DO catalog response into a ModelPrice map keyed by model id. @@ -204,11 +268,13 @@ def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]: # rates for promo/open-weight models. if input_rate == 0 and output_rate == 0: continue - prices[str(model_id)] = ModelPrice( + price = ModelPrice( input_per_token_usd=input_rate, output_per_token_usd=output_rate, cached_input_per_token_usd=cached_rate, ) + for alias in _expand_aliases(str(model_id)): + prices.setdefault(alias, price) return prices diff --git a/cli/planoai/obs/render.py b/cli/planoai/obs/render.py index 602b8aed..76091f3d 100644 --- a/cli/planoai/obs/render.py +++ b/cli/planoai/obs/render.py @@ -4,15 +4,18 @@ from __future__ import annotations from collections import Counter from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime +from http import HTTPStatus -from rich.box import SIMPLE -from rich.columns import Columns +from rich.align import Align +from rich.box import SIMPLE, SIMPLE_HEAVY from rich.console import Group from rich.panel import Panel from rich.table import Table from rich.text import Text +MAX_WIDTH = 160 + from planoai.obs.collector import LLMCall @@ -24,6 +27,16 @@ class AggregateStats: total_output_tokens: int distinct_sessions: int current_session: str | None + p50_latency_ms: float | None = None + p95_latency_ms: float | None = None + p99_latency_ms: float | None = None + p50_ttft_ms: float | None = None + p95_ttft_ms: float | None = None + p99_ttft_ms: float | None = None + error_count: int = 0 + errors_4xx: int = 0 + errors_5xx: int = 0 + has_cost: bool = False @dataclass @@ -35,10 +48,16 @@ class ModelRollup: cache_write: int cache_read: int cost_usd: float + has_cost: bool = False + avg_tokens_per_sec: float | None = None -def _now() -> datetime: - return datetime.now(tz=timezone.utc).astimezone() +def _percentile(values: list[float], pct: float) -> float | None: + if not values: + return None + s = sorted(values) + k = max(0, min(len(s) - 1, int(round((pct / 100.0) * (len(s) - 1))))) + return s[k] def aggregates(calls: list[LLMCall]) -> AggregateStats: @@ -49,6 +68,15 @@ def aggregates(calls: list[LLMCall]) -> AggregateStats: current = next( (c.session_id for c in reversed(calls) if c.session_id is not None), None ) + durations = [c.duration_ms for c in calls if c.duration_ms is not None] + ttfts = [c.ttft_ms for c in calls if c.ttft_ms is not None] + errors_4xx = sum( + 1 for c in calls if c.status_code is not None and 400 <= c.status_code < 500 + ) + errors_5xx = sum( + 1 for c in calls if c.status_code is not None and c.status_code >= 500 + ) + has_cost = any(c.cost_usd is not None for c in calls) return AggregateStats( count=len(calls), total_cost_usd=total_cost, @@ -56,11 +84,22 @@ def aggregates(calls: list[LLMCall]) -> AggregateStats: total_output_tokens=total_output, distinct_sessions=len(session_ids), current_session=current, + p50_latency_ms=_percentile(durations, 50), + p95_latency_ms=_percentile(durations, 95), + p99_latency_ms=_percentile(durations, 99), + p50_ttft_ms=_percentile(ttfts, 50), + p95_ttft_ms=_percentile(ttfts, 95), + p99_ttft_ms=_percentile(ttfts, 99), + error_count=errors_4xx + errors_5xx, + errors_4xx=errors_4xx, + errors_5xx=errors_5xx, + has_cost=has_cost, ) def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]: - buckets: dict[str, dict[str, float | int]] = {} + buckets: dict[str, dict[str, float | int | bool]] = {} + tps_samples: dict[str, list[float]] = {} for c in calls: key = c.model b = buckets.setdefault( @@ -72,6 +111,7 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]: "cache_write": 0, "cache_read": 0, "cost": 0.0, + "has_cost": False, }, ) b["requests"] = int(b["requests"]) + 1 @@ -80,9 +120,16 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]: b["cache_write"] = int(b["cache_write"]) + int(c.cache_creation_tokens or 0) b["cache_read"] = int(b["cache_read"]) + int(c.cached_input_tokens or 0) b["cost"] = float(b["cost"]) + (c.cost_usd or 0.0) + if c.cost_usd is not None: + b["has_cost"] = True + tps = c.tokens_per_sec + if tps is not None: + tps_samples.setdefault(key, []).append(tps) rollups: list[ModelRollup] = [] for model, b in buckets.items(): + samples = tps_samples.get(model) + avg_tps = (sum(samples) / len(samples)) if samples else None rollups.append( ModelRollup( model=model, @@ -92,34 +139,62 @@ def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]: cache_write=int(b["cache_write"]), cache_read=int(b["cache_read"]), cost_usd=float(b["cost"]), + has_cost=bool(b["has_cost"]), + avg_tokens_per_sec=avg_tps, ) ) - rollups.sort(key=lambda r: r.cost_usd, reverse=True) + rollups.sort(key=lambda r: (r.cost_usd, r.requests), reverse=True) return rollups -def route_hits(calls: list[LLMCall]) -> list[tuple[str, int, float]]: +@dataclass +class RouteHit: + route: str + hits: int + pct: float + p95_latency_ms: float | None + error_count: int + + +def route_hits(calls: list[LLMCall]) -> list[RouteHit]: counts: Counter[str] = Counter() + per_route_latency: dict[str, list[float]] = {} + per_route_errors: dict[str, int] = {} for c in calls: - if c.route_name: - counts[c.route_name] += 1 + if not c.route_name: + continue + counts[c.route_name] += 1 + if c.duration_ms is not None: + per_route_latency.setdefault(c.route_name, []).append(c.duration_ms) + if c.status_code is not None and c.status_code >= 400: + per_route_errors[c.route_name] = per_route_errors.get(c.route_name, 0) + 1 total = sum(counts.values()) if total == 0: return [] - return [(r, n, (n / total) * 100.0) for r, n in counts.most_common()] + return [ + RouteHit( + route=r, + hits=n, + pct=(n / total) * 100.0, + p95_latency_ms=_percentile(per_route_latency.get(r, []), 95), + error_count=per_route_errors.get(r, 0), + ) + for r, n in counts.most_common() + ] -def _fmt_cost(v: float | None) -> str: +def _fmt_cost(v: float | None, *, zero: str = "—") -> str: if v is None: return "—" if v == 0: - return "$0" - # Adaptive precision so tiny costs ($3.8e-5) remain readable. + return zero if abs(v) < 0.0001: return f"${v:.8f}".rstrip("0").rstrip(".") if abs(v) < 0.01: return f"${v:.6f}".rstrip("0").rstrip(".") - return f"${v:.4f}" + if abs(v) < 1: + return f"${v:.4f}" + return f"${v:,.2f}" def _fmt_ms(v: float | None) -> str: @@ -142,187 +217,412 @@ def _fmt_tokens(v: int | None) -> str: return f"{v:,}" -def _request_panel(last: LLMCall | None) -> Panel: +def _fmt_tps(v: float | None) -> str: + if v is None or v <= 0: + return "—" + if v >= 100: + return f"{v:.0f}/s" + return f"{v:.1f}/s" + + +def _latency_style(v: float | None) -> str: + if v is None: + return "dim" + if v < 500: + return "green" + if v < 2000: + return "yellow" + return "red" + + +def _ttft_style(v: float | None) -> str: + if v is None: + return "dim" + if v < 300: + return "green" + if v < 1000: + return "yellow" + return "red" + + +def _truncate_model(name: str, limit: int = 32) -> str: + if len(name) <= limit: + return name + return name[: limit - 1] + "…" + + +def _status_text(code: int | None) -> Text: + if code is None: + return Text("—", style="dim") + if 200 <= code < 300: + return Text("● ok", style="green") + if 300 <= code < 400: + return Text(f"● {code}", style="yellow") + if 400 <= code < 500: + return Text(f"● {code}", style="yellow bold") + return Text(f"● {code}", style="red bold") + + +def _summary_panel(last: LLMCall | None, stats: AggregateStats) -> Panel: + # Content-sized columns with a fixed gutter keep the two blocks close + # together instead of stretching across the full terminal on wide screens. + grid = Table.grid(padding=(0, 4)) + grid.add_column(no_wrap=True) + grid.add_column(no_wrap=True) + + # Left: latest request snapshot. + left = Table.grid(padding=(0, 1)) + left.add_column(style="dim", no_wrap=True) + left.add_column(no_wrap=True) if last is None: - body = Text("no requests yet", style="dim") + left.add_row("latest", Text("waiting for spans…", style="dim italic")) else: - t = Table.grid(padding=(0, 1)) - t.add_column(style="bold cyan") - t.add_column() - t.add_row("Endpoint", "chat/completions") - status = "—" if last.status_code is None else str(last.status_code) - t.add_row("Status", status) - t.add_row("Model", last.model) + model_text = Text(_truncate_model(last.model, 48), style="bold cyan") + if last.is_streaming: + model_text.append(" ⟳ stream", style="dim") + left.add_row("model", model_text) if last.request_model and last.request_model != last.model: - t.add_row("Req model", last.request_model) + left.add_row( + "requested", Text(_truncate_model(last.request_model, 48), style="cyan") + ) if last.route_name: - t.add_row("Route", last.route_name) - body = t - return Panel(body, title="[bold]Request[/]", border_style="cyan", box=SIMPLE) - - -def _cost_panel(last: LLMCall | None) -> Panel: - if last is None: - body = Text("—", style="dim") - else: - t = Table.grid(padding=(0, 1)) - t.add_column(style="bold green") - t.add_column() - t.add_row("Request", _fmt_cost(last.cost_usd)) - t.add_row("Input", _fmt_tokens(last.prompt_tokens)) - t.add_row("Output", _fmt_tokens(last.completion_tokens)) + left.add_row("route", Text(last.route_name, style="yellow")) + left.add_row("status", _status_text(last.status_code)) + tokens = Text() + tokens.append(_fmt_tokens(last.prompt_tokens)) + tokens.append(" in", style="dim") + tokens.append(" · ", style="dim") + tokens.append(_fmt_tokens(last.completion_tokens), style="green") + tokens.append(" out", style="dim") if last.cached_input_tokens: - t.add_row("Cached", _fmt_tokens(last.cached_input_tokens)) - body = t - return Panel(body, title="[bold]Cost[/]", border_style="green", box=SIMPLE) + tokens.append(" · ", style="dim") + tokens.append(_fmt_tokens(last.cached_input_tokens), style="yellow") + tokens.append(" cached", style="dim") + left.add_row("tokens", tokens) + timing = Text() + timing.append("TTFT ", style="dim") + timing.append(_fmt_ms(last.ttft_ms), style=_ttft_style(last.ttft_ms)) + timing.append(" · ", style="dim") + timing.append("lat ", style="dim") + timing.append( + _fmt_ms(last.duration_ms), style=_latency_style(last.duration_ms) + ) + tps = last.tokens_per_sec + if tps: + timing.append(" · ", style="dim") + timing.append(_fmt_tps(tps), style="green") + left.add_row("timing", timing) + left.add_row("cost", Text(_fmt_cost(last.cost_usd), style="green bold")) + # Right: lifetime totals. + right = Table.grid(padding=(0, 1)) + right.add_column(style="dim", no_wrap=True) + right.add_column(no_wrap=True) + right.add_row( + "requests", + Text(f"{stats.count:,}", style="bold"), + ) + if stats.error_count: + err_text = Text() + err_text.append(f"{stats.error_count:,}", style="red bold") + parts: list[str] = [] + if stats.errors_4xx: + parts.append(f"{stats.errors_4xx} 4xx") + if stats.errors_5xx: + parts.append(f"{stats.errors_5xx} 5xx") + if parts: + err_text.append(f" ({' · '.join(parts)})", style="dim") + right.add_row("errors", err_text) + cost_str = _fmt_cost(stats.total_cost_usd) if stats.has_cost else "—" + right.add_row("total cost", Text(cost_str, style="green bold")) + tokens_total = Text() + tokens_total.append(_fmt_tokens(stats.total_input_tokens)) + tokens_total.append(" in", style="dim") + tokens_total.append(" · ", style="dim") + tokens_total.append(_fmt_tokens(stats.total_output_tokens), style="green") + tokens_total.append(" out", style="dim") + right.add_row("tokens", tokens_total) + lat_text = Text() + lat_text.append("p50 ", style="dim") + lat_text.append(_fmt_ms(stats.p50_latency_ms), style=_latency_style(stats.p50_latency_ms)) + lat_text.append(" · ", style="dim") + lat_text.append("p95 ", style="dim") + lat_text.append(_fmt_ms(stats.p95_latency_ms), style=_latency_style(stats.p95_latency_ms)) + lat_text.append(" · ", style="dim") + lat_text.append("p99 ", style="dim") + lat_text.append(_fmt_ms(stats.p99_latency_ms), style=_latency_style(stats.p99_latency_ms)) + right.add_row("latency", lat_text) + ttft_text = Text() + ttft_text.append("p50 ", style="dim") + ttft_text.append(_fmt_ms(stats.p50_ttft_ms), style=_ttft_style(stats.p50_ttft_ms)) + ttft_text.append(" · ", style="dim") + ttft_text.append("p95 ", style="dim") + ttft_text.append(_fmt_ms(stats.p95_ttft_ms), style=_ttft_style(stats.p95_ttft_ms)) + ttft_text.append(" · ", style="dim") + ttft_text.append("p99 ", style="dim") + ttft_text.append(_fmt_ms(stats.p99_ttft_ms), style=_ttft_style(stats.p99_ttft_ms)) + right.add_row("TTFT", ttft_text) + sess = Text() + sess.append(f"{stats.distinct_sessions}") + if stats.current_session: + sess.append(" · current ", style="dim") + sess.append(stats.current_session, style="magenta") + right.add_row("sessions", sess) -def _totals_panel(stats: AggregateStats) -> Panel: - t = Table.grid(padding=(0, 1)) - t.add_column(style="bold magenta") - t.add_column() - t.add_column(style="bold magenta") - t.add_column() - t.add_row( - "Total cost", - _fmt_cost(stats.total_cost_usd), - "Requests", - str(stats.count), + grid.add_row(left, right) + return Panel( + grid, + title="[bold]live LLM traffic[/]", + border_style="cyan", + box=SIMPLE_HEAVY, + padding=(0, 1), ) - t.add_row( - "Input", - _fmt_tokens(stats.total_input_tokens), - "Output", - _fmt_tokens(stats.total_output_tokens), - ) - t.add_row( - "Sessions", - str(stats.distinct_sessions), - "Current session", - stats.current_session or "—", - ) - return Panel(t, title="[bold]Totals[/]", border_style="magenta", box=SIMPLE) def _model_rollup_table(rollups: list[ModelRollup]) -> Table: table = Table( - title="Totals by model", + title="by model", + title_justify="left", + title_style="bold dim", + caption="cost via DigitalOcean Gradient catalog", + caption_justify="left", + caption_style="dim italic", box=SIMPLE, header_style="bold", - expand=True, + pad_edge=False, + padding=(0, 1), ) - table.add_column("Model", style="cyan") - table.add_column("Req", justify="right") - table.add_column("Input", justify="right") - table.add_column("Output", justify="right", style="green") - table.add_column("Cache write", justify="right", style="yellow") - table.add_column("Cache read", justify="right", style="yellow") - table.add_column("Cost", justify="right", style="green") + table.add_column("model", style="cyan", no_wrap=True) + table.add_column("req", justify="right") + table.add_column("input", justify="right") + table.add_column("output", justify="right", style="green") + table.add_column("cache wr", justify="right", style="yellow") + table.add_column("cache rd", justify="right", style="yellow") + table.add_column("tok/s", justify="right") + table.add_column("cost", justify="right", style="green") if not rollups: - table.add_row("—", "—", "—", "—", "—", "—", "—") - for r in rollups: table.add_row( - r.model, - str(r.requests), + Text("no requests yet", style="dim italic"), + *(["—"] * 7), + ) + return table + for r in rollups: + cost_cell = _fmt_cost(r.cost_usd) if r.has_cost else "—" + table.add_row( + _truncate_model(r.model), + f"{r.requests:,}", _fmt_tokens(r.input_tokens), _fmt_tokens(r.output_tokens), _fmt_int(r.cache_write), _fmt_int(r.cache_read), - _fmt_cost(r.cost_usd), + _fmt_tps(r.avg_tokens_per_sec), + cost_cell, ) return table -def _route_hit_table(hits: list[tuple[str, int, float]]) -> Table: +def _route_hit_table(hits: list[RouteHit]) -> Table: table = Table( - title="Route hit %", + title="route share", + title_justify="left", + title_style="bold dim", box=SIMPLE, header_style="bold", - expand=True, + pad_edge=False, + padding=(0, 1), ) - table.add_column("Route", style="cyan") - table.add_column("Hits", justify="right") + table.add_column("route", style="cyan") + table.add_column("hits", justify="right") table.add_column("%", justify="right") - for route, n, pct in hits: - table.add_row(route, str(n), f"{pct:.1f}") + table.add_column("p95", justify="right") + table.add_column("err", justify="right") + for h in hits: + err_cell = ( + Text(f"{h.error_count:,}", style="red bold") if h.error_count else "—" + ) + table.add_row( + h.route, + f"{h.hits:,}", + f"{h.pct:5.1f}%", + Text(_fmt_ms(h.p95_latency_ms), style=_latency_style(h.p95_latency_ms)), + err_cell, + ) return table def _recent_table(calls: list[LLMCall], limit: int = 15) -> Table: show_route = any(c.route_name for c in calls) + show_cache = any((c.cached_input_tokens or 0) > 0 for c in calls) + show_rsn = any((c.reasoning_tokens or 0) > 0 for c in calls) + + caption_parts = ["in·new = fresh prompt tokens"] + if show_cache: + caption_parts.append("in·cache = cached read") + if show_rsn: + caption_parts.append("rsn = reasoning") + caption_parts.append("lat = total latency") + table = Table( - title="Recent requests", + title=f"recent · last {min(limit, len(calls)) if calls else 0}", + title_justify="left", + title_style="bold dim", + caption=" · ".join(caption_parts), + caption_justify="left", + caption_style="dim italic", box=SIMPLE, header_style="bold", - expand=True, + pad_edge=False, + padding=(0, 1), ) - table.add_column("time") - table.add_column("model", style="cyan") + table.add_column("time", no_wrap=True) + table.add_column("model", style="cyan", no_wrap=True) if show_route: - table.add_column("route", style="yellow") - table.add_column("in", justify="right") - table.add_column("cache", justify="right", style="yellow") + table.add_column("route", style="yellow", no_wrap=True) + table.add_column("in·new", justify="right") + if show_cache: + table.add_column("in·cache", justify="right", style="yellow") table.add_column("out", justify="right", style="green") - table.add_column("rsn", justify="right") - table.add_column("cost", justify="right", style="green") + if show_rsn: + table.add_column("rsn", justify="right") + table.add_column("tok/s", justify="right") table.add_column("TTFT", justify="right") table.add_column("lat", justify="right") - table.add_column("st") + table.add_column("cost", justify="right", style="green") + table.add_column("status") + + if not calls: + cols = len(table.columns) + table.add_row( + Text("waiting for spans…", style="dim italic"), + *(["—"] * (cols - 1)), + ) + return table recent = list(reversed(calls))[:limit] - for c in recent: - status_cell = ( - "ok" - if c.status_code and 200 <= c.status_code < 400 - else str(c.status_code or "—") - ) - row = [ - c.timestamp.strftime("%H:%M:%S"), - c.model, + for idx, c in enumerate(recent): + is_newest = idx == 0 + time_style = "bold white" if is_newest else None + model_style = "bold cyan" if is_newest else "cyan" + row: list[object] = [ + Text(c.timestamp.strftime("%H:%M:%S"), style=time_style) + if time_style + else c.timestamp.strftime("%H:%M:%S"), + Text(_truncate_model(c.model), style=model_style), ] if show_route: row.append(c.route_name or "—") + row.append(_fmt_tokens(c.prompt_tokens)) + if show_cache: + row.append(_fmt_int(c.cached_input_tokens)) + row.append(_fmt_tokens(c.completion_tokens)) + if show_rsn: + row.append(_fmt_int(c.reasoning_tokens)) row.extend( [ - _fmt_tokens(c.prompt_tokens), - _fmt_int(c.cached_input_tokens), - _fmt_tokens(c.completion_tokens), - _fmt_int(c.reasoning_tokens), + _fmt_tps(c.tokens_per_sec), + Text(_fmt_ms(c.ttft_ms), style=_ttft_style(c.ttft_ms)), + Text(_fmt_ms(c.duration_ms), style=_latency_style(c.duration_ms)), _fmt_cost(c.cost_usd), - _fmt_ms(c.ttft_ms), - _fmt_ms(c.duration_ms), - status_cell, + _status_text(c.status_code), ] ) table.add_row(*row) - if not recent: - table.add_row(*(["no requests yet"] + ["—"] * (10 if show_route else 9))) return table -def render(calls: list[LLMCall]) -> Group: +def _last_error(calls: list[LLMCall]) -> LLMCall | None: + for c in reversed(calls): + if c.status_code is not None and c.status_code >= 400: + return c + return None + + +def _http_reason(code: int) -> str: + try: + return HTTPStatus(code).phrase + except ValueError: + return "" + + +def _fmt_ago(ts: datetime) -> str: + # `ts` is produced in collector.py via datetime.now(tz=...), but fall back + # gracefully if a naive timestamp ever sneaks in. + now = datetime.now(tz=ts.tzinfo) if ts.tzinfo else datetime.now() + delta = (now - ts).total_seconds() + if delta < 0: + delta = 0 + if delta < 60: + return f"{int(delta)}s ago" + if delta < 3600: + return f"{int(delta // 60)}m ago" + return f"{int(delta // 3600)}h ago" + + +def _error_banner(call: LLMCall) -> Panel: + code = call.status_code or 0 + border = "red" if code >= 500 else "yellow" + header = Text() + header.append(f"● {code}", style=f"{border} bold") + reason = _http_reason(code) + if reason: + header.append(f" {reason}", style=border) + header.append(" · ", style="dim") + header.append(_truncate_model(call.model, 48), style="cyan") + if call.route_name: + header.append(" · ", style="dim") + header.append(call.route_name, style="yellow") + header.append(" · ", style="dim") + header.append(_fmt_ago(call.timestamp), style="dim") + if call.request_id: + header.append(" · req ", style="dim") + header.append(call.request_id, style="magenta") + return Panel( + header, + title="[bold]last error[/]", + title_align="left", + border_style=border, + box=SIMPLE, + padding=(0, 1), + ) + + +def _footer(stats: AggregateStats) -> Text: + waiting = stats.count == 0 + text = Text() + text.append("Ctrl-C ", style="bold") + text.append("exit", style="dim") + text.append(" · OTLP :4317", style="dim") + text.append(" · pricing: DigitalOcean ", style="dim") + if waiting: + text.append("waiting for spans", style="yellow") + text.append( + " — set tracing.opentracing_grpc_endpoint=localhost:4317", style="dim" + ) + else: + text.append(f"receiving · {stats.count:,} call(s) buffered", style="green") + return text + + +def render(calls: list[LLMCall]) -> Align: last = calls[-1] if calls else None stats = aggregates(calls) rollups = model_rollups(calls) hits = route_hits(calls) - header = Columns( - [_request_panel(last), _cost_panel(last), _totals_panel(stats)], - expand=True, - equal=True, - ) - parts = [ - header, - _model_rollup_table(rollups), - ] + parts: list[object] = [_summary_panel(last, stats)] + err = _last_error(calls) + if err is not None: + parts.append(_error_banner(err)) if hits: - parts.append(_route_hit_table(hits)) + split = Table.grid(padding=(0, 2)) + split.add_column(no_wrap=False) + split.add_column(no_wrap=False) + split.add_row(_model_rollup_table(rollups), _route_hit_table(hits)) + parts.append(split) + else: + parts.append(_model_rollup_table(rollups)) parts.append(_recent_table(calls)) - parts.append( - Text( - "q quit · c clear · waiting for spans on OTLP :4317 — brightstaff needs " - "tracing.opentracing_grpc_endpoint=localhost:4317", - style="dim", - ) - ) - return Group(*parts) + parts.append(_footer(stats)) + # Cap overall width so wide terminals don't stretch the layout into a + # mostly-whitespace gap between columns. + return Align.left(Group(*parts), width=MAX_WIDTH) diff --git a/cli/planoai/utils.py b/cli/planoai/utils.py index 8f73bf18..214fd0a3 100644 --- a/cli/planoai/utils.py +++ b/cli/planoai/utils.py @@ -91,7 +91,12 @@ def convert_legacy_listeners( "type": "model", "port": 12000, "address": "0.0.0.0", - "timeout": "30s", + # LLM streaming responses routinely exceed 30s (extended thinking, + # long tool reasoning, large completions). Match the 300s ceiling + # used by the direct upstream-provider routes so Envoy doesn't + # abort streams with UT mid-response. Users can override via their + # plano_config.yaml `listeners.timeout` field. + "timeout": "300s", "model_providers": model_providers or [], } @@ -100,7 +105,7 @@ def convert_legacy_listeners( "type": "prompt", "port": 10000, "address": "0.0.0.0", - "timeout": "30s", + "timeout": "300s", } # Handle None case diff --git a/cli/test/test_obs_pricing.py b/cli/test/test_obs_pricing.py index 95f9a2da..02247d3d 100644 --- a/cli/test/test_obs_pricing.py +++ b/cli/test/test_obs_pricing.py @@ -83,6 +83,49 @@ def test_parse_do_catalog_treats_small_values_as_per_token(): assert prices["openai-gpt-oss-120b"].input_per_token_usd == 1e-7 +def test_anthropic_aliases_match_plano_emitted_names(): + """DO publishes 'anthropic-claude-opus-4.7' and 'anthropic-claude-haiku-4.5'; + Plano emits 'claude-opus-4-7' and 'claude-haiku-4-5-20251001'. Aliases + registered at parse time should bridge the gap.""" + from planoai.obs.pricing import _parse_do_pricing + + sample = { + "data": [ + { + "model_id": "anthropic-claude-opus-4.7", + "pricing": { + "input_price_per_million": 15.0, + "output_price_per_million": 75.0, + }, + }, + { + "model_id": "anthropic-claude-haiku-4.5", + "pricing": { + "input_price_per_million": 1.0, + "output_price_per_million": 5.0, + }, + }, + { + "model_id": "anthropic-claude-4.6-sonnet", + "pricing": { + "input_price_per_million": 3.0, + "output_price_per_million": 15.0, + }, + }, + ] + } + catalog = PricingCatalog(_parse_do_pricing(sample)) + # Family-last shapes Plano emits. + assert catalog.price_for("claude-opus-4-7") is not None + assert catalog.price_for("claude-haiku-4-5") is not None + # Date-suffixed name (Anthropic API style). + assert catalog.price_for("claude-haiku-4-5-20251001") is not None + # Word-order swap: DO has 'claude-4.6-sonnet', Plano emits 'claude-sonnet-4-6'. + assert catalog.price_for("claude-sonnet-4-6") is not None + # Original DO ids still resolve. + assert catalog.price_for("anthropic-claude-opus-4.7") is not None + + def test_parse_do_catalog_divides_large_values_as_per_million(): """A provider that genuinely reports $5-per-million in that field gets divided.""" from planoai.obs.pricing import _parse_do_pricing diff --git a/cli/test/test_obs_render.py b/cli/test/test_obs_render.py index 11f4a1fc..dd598363 100644 --- a/cli/test/test_obs_render.py +++ b/cli/test/test_obs_render.py @@ -94,10 +94,10 @@ def test_route_hits_only_for_routed_calls(): ] hits = route_hits(calls) # Only calls with route names are counted. - assert sum(n for _, n, _ in hits) == 3 - hits_by_name = {name: (n, pct) for name, n, pct in hits} - assert hits_by_name["code"][0] == 2 - assert hits_by_name["summarization"][0] == 1 + assert sum(h.hits for h in hits) == 3 + hits_by_name = {h.route: h for h in hits} + assert hits_by_name["code"].hits == 2 + assert hits_by_name["summarization"].hits == 1 def test_route_hits_empty_when_no_routes(): diff --git a/crates/brightstaff/src/streaming.rs b/crates/brightstaff/src/streaming.rs index 40cbbe7c..03913bdc 100644 --- a/crates/brightstaff/src/streaming.rs +++ b/crates/brightstaff/src/streaming.rs @@ -100,6 +100,12 @@ impl ExtractedUsage { /// Try to pull usage out of an accumulated response body. /// Handles both a single JSON object (non-streaming) and SSE streams where the /// final `data: {...}` event carries the `usage` field. +/// +/// Resolved model is captured independently from usage: many providers (DO +/// Inference Cloud, OpenAI in some modes) emit `model` on every SSE chunk but +/// `usage` only on the terminal one — and that terminal chunk may omit the +/// model field entirely. Scanning both fields separately means we still +/// surface the real upstream model even when usage extraction fails. fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage { if buf.is_empty() { return ExtractedUsage::default(); @@ -113,11 +119,14 @@ fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage { } } - // SSE path: scan from the end for a `data:` line containing a usage object. let text = match std::str::from_utf8(buf) { Ok(t) => t, Err(_) => return ExtractedUsage::default(), }; + + let mut out = ExtractedUsage::default(); + let mut found_usage = false; + for line in text.lines().rev() { let trimmed = line.trim_start(); let payload = match trimmed.strip_prefix("data:") { @@ -127,18 +136,50 @@ fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage { if payload == "[DONE]" || payload.is_empty() { continue; } - if !payload.contains("\"usage\"") { + + let has_model_field = out.resolved_model.is_none() && payload.contains("\"model\""); + let has_usage_field = !found_usage && payload.contains("\"usage\""); + if !has_model_field && !has_usage_field { continue; } - if let Ok(value) = serde_json::from_str::(payload) { + + let value = match serde_json::from_str::(payload) { + Ok(v) => v, + Err(_) => continue, + }; + + if has_usage_field { let u = ExtractedUsage::from_json(&value); - if !u.is_empty() { - return u; + if u.prompt_tokens.is_some() + || u.completion_tokens.is_some() + || u.total_tokens.is_some() + { + let prior_model = out.resolved_model.take(); + out = u; + if out.resolved_model.is_none() { + out.resolved_model = prior_model; + } + found_usage = true; + if out.resolved_model.is_some() { + break; + } + continue; + } + } + + if out.resolved_model.is_none() { + if let Some(model) = value.get("model").and_then(|v| v.as_str()) { + if !model.is_empty() { + out.resolved_model = Some(model.to_string()); + if found_usage { + break; + } + } } } } - ExtractedUsage::default() + out } /// Trait for processing streaming chunks @@ -634,4 +675,36 @@ data: [DONE] fn no_usage_in_body_returns_default() { assert!(extract_usage_from_bytes(br#"{"ok":true}"#).is_empty()); } + + #[test] + fn streaming_resolved_model_from_chunk_without_usage() { + // DO Inference Cloud often emits model on every chunk and usage only on + // the last — and the usage chunk itself omits the model field. + let sse = b"data: {\"id\":\"x\",\"model\":\"openai-gpt-5.4\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]} + +data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":7,\"completion_tokens\":3,\"total_tokens\":10}} + +data: [DONE] + +"; + let u = extract_usage_from_bytes(sse); + assert_eq!(u.prompt_tokens, Some(7)); + assert_eq!(u.completion_tokens, Some(3)); + assert_eq!(u.resolved_model.as_deref(), Some("openai-gpt-5.4")); + } + + #[test] + fn streaming_resolved_model_when_usage_missing() { + // Even without any usage chunk, we should still surface the upstream + // model so the obs view doesn't fall back to the router alias. + let sse = b"data: {\"id\":\"x\",\"model\":\"openai-gpt-5.4\",\"choices\":[{\"delta\":{\"content\":\"hi\"}}]} + +data: [DONE] + +"; + let u = extract_usage_from_bytes(sse); + assert_eq!(u.prompt_tokens, None); + assert_eq!(u.resolved_model.as_deref(), Some("openai-gpt-5.4")); + assert!(!u.is_empty()); + } } diff --git a/crates/hermesllm/src/bin/provider_models.yaml b/crates/hermesllm/src/bin/provider_models.yaml index 22f69a7d..d07e265d 100644 --- a/crates/hermesllm/src/bin/provider_models.yaml +++ b/crates/hermesllm/src/bin/provider_models.yaml @@ -95,6 +95,7 @@ providers: anthropic: - anthropic/claude-sonnet-4-6 - anthropic/claude-opus-4-6 + - anthropic/claude-opus-4-7 - anthropic/claude-opus-4-5-20251101 - anthropic/claude-opus-4-5 - anthropic/claude-haiku-4-5-20251001