merge origin/main, add DigitalOcean alongside Vercel and OpenRouter

This commit is contained in:
Spherrrical 2026-04-23 15:13:06 -07:00
commit 013f377ddf
138 changed files with 17041 additions and 3335 deletions

View file

@ -1,3 +1,3 @@
"""Plano CLI - Intelligent Prompt Gateway."""
__version__ = "0.4.19"
__version__ = "0.4.20"

View file

@ -30,6 +30,7 @@ SUPPORTED_PROVIDERS_WITHOUT_BASE_URL = [
"zhipu",
"vercel",
"openrouter",
"digitalocean",
]
SUPPORTED_PROVIDERS = (

View file

@ -5,7 +5,7 @@ PLANO_COLOR = "#969FF4"
SERVICE_NAME_ARCHGW = "plano"
PLANO_DOCKER_NAME = "plano"
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.19")
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.20")
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317"
# Native mode constants

163
cli/planoai/defaults.py Normal file
View file

@ -0,0 +1,163 @@
"""Default config synthesizer for zero-config ``planoai up``.
When the user runs ``planoai up`` in a directory with no ``config.yaml`` /
``plano_config.yaml``, we synthesize a pass-through config that covers the
common LLM providers and auto-wires OTel export to ``localhost:4317`` so
``planoai obs`` works out of the box.
Auth handling:
- If the provider's env var is set, bind ``access_key: $ENV_VAR``.
- Otherwise set ``passthrough_auth: true`` so the client's own Authorization
header is forwarded. No env var is required to start the proxy.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
DEFAULT_LLM_LISTENER_PORT = 12000
# plano_config validation requires an http:// scheme on the OTLP endpoint.
DEFAULT_OTLP_ENDPOINT = "http://localhost:4317"
@dataclass(frozen=True)
class ProviderDefault:
name: str
env_var: str
base_url: str
model_pattern: str
# Only set for providers whose prefix in the model pattern is NOT one of the
# built-in SUPPORTED_PROVIDERS in cli/planoai/config_generator.py. For
# built-ins, the validator infers the interface from the model prefix and
# rejects configs that set this field explicitly.
provider_interface: str | None = None
# Keep ordering stable so synthesized configs diff cleanly across runs.
PROVIDER_DEFAULTS: list[ProviderDefault] = [
ProviderDefault(
name="openai",
env_var="OPENAI_API_KEY",
base_url="https://api.openai.com/v1",
model_pattern="openai/*",
),
ProviderDefault(
name="anthropic",
env_var="ANTHROPIC_API_KEY",
base_url="https://api.anthropic.com/v1",
model_pattern="anthropic/*",
),
ProviderDefault(
name="gemini",
env_var="GEMINI_API_KEY",
base_url="https://generativelanguage.googleapis.com/v1beta",
model_pattern="gemini/*",
),
ProviderDefault(
name="groq",
env_var="GROQ_API_KEY",
base_url="https://api.groq.com/openai/v1",
model_pattern="groq/*",
),
ProviderDefault(
name="deepseek",
env_var="DEEPSEEK_API_KEY",
base_url="https://api.deepseek.com/v1",
model_pattern="deepseek/*",
),
ProviderDefault(
name="mistral",
env_var="MISTRAL_API_KEY",
base_url="https://api.mistral.ai/v1",
model_pattern="mistral/*",
),
# DigitalOcean Gradient is a first-class provider post-#889 — the
# `digitalocean/` model prefix routes to the built-in Envoy cluster, no
# base_url needed at runtime.
ProviderDefault(
name="digitalocean",
env_var="DO_API_KEY",
base_url="https://inference.do-ai.run/v1",
model_pattern="digitalocean/*",
),
]
@dataclass
class DetectionResult:
with_keys: list[ProviderDefault]
passthrough: list[ProviderDefault]
@property
def summary(self) -> str:
parts = []
if self.with_keys:
parts.append("env-keyed: " + ", ".join(p.name for p in self.with_keys))
if self.passthrough:
parts.append("pass-through: " + ", ".join(p.name for p in self.passthrough))
return " | ".join(parts) if parts else "no providers"
def detect_providers(env: dict[str, str] | None = None) -> DetectionResult:
env = env if env is not None else dict(os.environ)
with_keys: list[ProviderDefault] = []
passthrough: list[ProviderDefault] = []
for p in PROVIDER_DEFAULTS:
val = env.get(p.env_var)
if val:
with_keys.append(p)
else:
passthrough.append(p)
return DetectionResult(with_keys=with_keys, passthrough=passthrough)
def synthesize_default_config(
env: dict[str, str] | None = None,
*,
listener_port: int = DEFAULT_LLM_LISTENER_PORT,
otel_endpoint: str = DEFAULT_OTLP_ENDPOINT,
) -> dict:
"""Build a pass-through config dict suitable for validation + envoy rendering.
The returned dict can be dumped to YAML and handed to the existing `planoai up`
pipeline unchanged.
"""
detection = detect_providers(env)
def _entry(p: ProviderDefault, base: dict) -> dict:
row: dict = {"name": p.name, "model": p.model_pattern, "base_url": p.base_url}
if p.provider_interface is not None:
row["provider_interface"] = p.provider_interface
row.update(base)
return row
model_providers: list[dict] = []
for p in detection.with_keys:
model_providers.append(_entry(p, {"access_key": f"${p.env_var}"}))
for p in detection.passthrough:
model_providers.append(_entry(p, {"passthrough_auth": True}))
# No explicit `default: true` entry is synthesized: the plano config
# validator rejects wildcard models as defaults, and brightstaff already
# registers bare model names as lookup keys during wildcard expansion
# (crates/common/src/llm_providers.rs), so `{"model": "gpt-4o-mini"}`
# without a prefix resolves via the openai wildcard without needing
# `default: true`. See discussion on #890.
return {
"version": "v0.4.0",
"listeners": [
{
"name": "llm",
"type": "model",
"port": listener_port,
"address": "0.0.0.0",
}
],
"model_providers": model_providers,
"tracing": {
"random_sampling": 100,
"opentracing_grpc_endpoint": otel_endpoint,
},
}

View file

@ -6,7 +6,13 @@ import sys
import contextlib
import logging
import rich_click as click
import yaml
from planoai import targets
from planoai.defaults import (
DEFAULT_LLM_LISTENER_PORT,
detect_providers,
synthesize_default_config,
)
# Brand color - Plano purple
PLANO_COLOR = "#969FF4"
@ -31,6 +37,7 @@ from planoai.core import (
)
from planoai.init_cmd import init as init_cmd
from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_background
from planoai.obs_cmd import obs as obs_cmd
from planoai.consts import (
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT,
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT,
@ -317,7 +324,23 @@ def build(docker):
help="Show detailed startup logs with timestamps.",
is_flag=True,
)
def up(file, path, foreground, with_tracing, tracing_port, docker, verbose):
@click.option(
"--listener-port",
default=DEFAULT_LLM_LISTENER_PORT,
type=int,
show_default=True,
help="Override the LLM listener port when running without a config file. Ignored when a config file is present.",
)
def up(
file,
path,
foreground,
with_tracing,
tracing_port,
docker,
verbose,
listener_port,
):
"""Starts Plano."""
from rich.status import Status
@ -328,12 +351,23 @@ def up(file, path, foreground, with_tracing, tracing_port, docker, verbose):
# Use the utility function to find config file
plano_config_file = find_config_file(path, file)
# Check if the file exists
# Zero-config fallback: when no user config is present, synthesize a
# pass-through config that covers the common LLM providers and
# auto-wires OTel export to ``planoai obs``. See cli/planoai/defaults.py.
if not os.path.exists(plano_config_file):
detection = detect_providers()
cfg_dict = synthesize_default_config(listener_port=listener_port)
default_dir = os.path.expanduser("~/.plano")
os.makedirs(default_dir, exist_ok=True)
synthesized_path = os.path.join(default_dir, "default_config.yaml")
with open(synthesized_path, "w") as fh:
yaml.safe_dump(cfg_dict, fh, sort_keys=False)
plano_config_file = synthesized_path
console.print(
f"[red]✗[/red] Config file not found: [dim]{plano_config_file}[/dim]"
f"[dim]No plano config found; using defaults ({detection.summary}). "
f"Listening on :{listener_port}, tracing -> http://localhost:4317.[/dim]"
)
sys.exit(1)
if not docker:
from planoai.native_runner import native_validate_config
@ -681,6 +715,7 @@ main.add_command(cli_agent)
main.add_command(generate_prompt_targets)
main.add_command(init_cmd, name="init")
main.add_command(trace_cmd, name="trace")
main.add_command(obs_cmd, name="obs")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,6 @@
"""Plano observability console: in-memory live view of LLM traffic."""
from planoai.obs.collector import LLMCall, LLMCallStore, ObsCollector
from planoai.obs.pricing import PricingCatalog
__all__ = ["LLMCall", "LLMCallStore", "ObsCollector", "PricingCatalog"]

View file

@ -0,0 +1,266 @@
"""In-memory collector for LLM calls, fed by OTLP/gRPC spans from brightstaff."""
from __future__ import annotations
import threading
from collections import deque
from concurrent import futures
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Iterable
import grpc
from opentelemetry.proto.collector.trace.v1 import (
trace_service_pb2,
trace_service_pb2_grpc,
)
DEFAULT_GRPC_PORT = 4317
DEFAULT_CAPACITY = 1000
@dataclass
class LLMCall:
"""One LLM call as reconstructed from a brightstaff LLM span.
Fields default to ``None`` when the underlying span attribute was absent.
"""
request_id: str
timestamp: datetime
model: str
provider: str | None = None
request_model: str | None = None
session_id: str | None = None
route_name: str | None = None
is_streaming: bool | None = None
status_code: int | None = None
prompt_tokens: int | None = None
completion_tokens: int | None = None
total_tokens: int | None = None
cached_input_tokens: int | None = None
cache_creation_tokens: int | None = None
reasoning_tokens: int | None = None
ttft_ms: float | None = None
duration_ms: float | None = None
routing_strategy: str | None = None
routing_reason: str | None = None
cost_usd: float | None = None
@property
def tpt_ms(self) -> float | None:
if self.duration_ms is None or self.completion_tokens in (None, 0):
return None
ttft = self.ttft_ms or 0.0
generate_ms = max(0.0, self.duration_ms - ttft)
if generate_ms <= 0:
return None
return generate_ms / self.completion_tokens
@property
def tokens_per_sec(self) -> float | None:
tpt = self.tpt_ms
if tpt is None or tpt <= 0:
return None
return 1000.0 / tpt
class LLMCallStore:
"""Thread-safe ring buffer of recent LLM calls."""
def __init__(self, capacity: int = DEFAULT_CAPACITY) -> None:
self._capacity = capacity
self._calls: deque[LLMCall] = deque(maxlen=capacity)
self._lock = threading.Lock()
@property
def capacity(self) -> int:
return self._capacity
def add(self, call: LLMCall) -> None:
with self._lock:
self._calls.append(call)
def clear(self) -> None:
with self._lock:
self._calls.clear()
def snapshot(self) -> list[LLMCall]:
with self._lock:
return list(self._calls)
def __len__(self) -> int:
with self._lock:
return len(self._calls)
# Span attribute keys used below are the canonical OTel / Plano keys emitted by
# brightstaff — see crates/brightstaff/src/tracing/constants.rs for the source
# of truth.
def _anyvalue_to_python(value: Any) -> Any: # AnyValue from OTLP
kind = value.WhichOneof("value")
if kind == "string_value":
return value.string_value
if kind == "bool_value":
return value.bool_value
if kind == "int_value":
return value.int_value
if kind == "double_value":
return value.double_value
return None
def _attrs_to_dict(attrs: Iterable[Any]) -> dict[str, Any]:
out: dict[str, Any] = {}
for kv in attrs:
py = _anyvalue_to_python(kv.value)
if py is not None:
out[kv.key] = py
return out
def _maybe_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _maybe_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def span_to_llm_call(
span: Any, service_name: str, pricing: Any | None = None
) -> LLMCall | None:
"""Convert an OTLP span into an LLMCall, or return None if it isn't one.
A span is considered an LLM call iff it carries the ``llm.model`` attribute.
"""
attrs = _attrs_to_dict(span.attributes)
model = attrs.get("llm.model")
if not model:
return None
# Prefer explicit span attributes; fall back to likely aliases.
request_id = next(
(
str(attrs[key])
for key in ("request_id", "http.request_id")
if key in attrs and attrs[key] is not None
),
span.span_id.hex() if span.span_id else "",
)
start_ns = span.start_time_unix_nano or 0
ts = (
datetime.fromtimestamp(start_ns / 1_000_000_000, tz=timezone.utc).astimezone()
if start_ns
else datetime.now().astimezone()
)
call = LLMCall(
request_id=str(request_id),
timestamp=ts,
model=str(model),
provider=(
str(attrs["llm.provider"]) if "llm.provider" in attrs else service_name
),
request_model=(
str(attrs["model.requested"]) if "model.requested" in attrs else None
),
session_id=(
str(attrs["plano.session_id"]) if "plano.session_id" in attrs else None
),
route_name=(
str(attrs["plano.route.name"]) if "plano.route.name" in attrs else None
),
is_streaming=(
bool(attrs["llm.is_streaming"]) if "llm.is_streaming" in attrs else None
),
status_code=_maybe_int(attrs.get("http.status_code")),
prompt_tokens=_maybe_int(attrs.get("llm.usage.prompt_tokens")),
completion_tokens=_maybe_int(attrs.get("llm.usage.completion_tokens")),
total_tokens=_maybe_int(attrs.get("llm.usage.total_tokens")),
cached_input_tokens=_maybe_int(attrs.get("llm.usage.cached_input_tokens")),
cache_creation_tokens=_maybe_int(attrs.get("llm.usage.cache_creation_tokens")),
reasoning_tokens=_maybe_int(attrs.get("llm.usage.reasoning_tokens")),
ttft_ms=_maybe_float(attrs.get("llm.time_to_first_token")),
duration_ms=_maybe_float(attrs.get("llm.duration_ms")),
routing_strategy=(
str(attrs["routing.strategy"]) if "routing.strategy" in attrs else None
),
routing_reason=(
str(attrs["routing.selection_reason"])
if "routing.selection_reason" in attrs
else None
),
)
if pricing is not None:
call.cost_usd = pricing.cost_for_call(call)
return call
class _ObsServicer(trace_service_pb2_grpc.TraceServiceServicer):
def __init__(self, store: LLMCallStore, pricing: Any | None) -> None:
self._store = store
self._pricing = pricing
def Export(self, request, context): # noqa: N802 — gRPC generated name
for resource_spans in request.resource_spans:
service_name = "unknown"
for attr in resource_spans.resource.attributes:
if attr.key == "service.name":
val = _anyvalue_to_python(attr.value)
if val is not None:
service_name = str(val)
break
for scope_spans in resource_spans.scope_spans:
for span in scope_spans.spans:
call = span_to_llm_call(span, service_name, self._pricing)
if call is not None:
self._store.add(call)
return trace_service_pb2.ExportTraceServiceResponse()
@dataclass
class ObsCollector:
"""Owns the OTLP/gRPC server and the in-memory LLMCall ring buffer."""
store: LLMCallStore = field(default_factory=LLMCallStore)
pricing: Any | None = None
host: str = "0.0.0.0"
port: int = DEFAULT_GRPC_PORT
_server: grpc.Server | None = field(default=None, init=False, repr=False)
def start(self) -> None:
if self._server is not None:
return
server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
trace_service_pb2_grpc.add_TraceServiceServicer_to_server(
_ObsServicer(self.store, self.pricing), server
)
address = f"{self.host}:{self.port}"
bound = server.add_insecure_port(address)
if bound == 0:
raise OSError(
f"Failed to bind OTLP listener on {address}: port already in use. "
"Stop tracing via `planoai trace down` or pick another port with --port."
)
server.start()
self._server = server
def stop(self, grace: float = 2.0) -> None:
if self._server is not None:
self._server.stop(grace)
self._server = None

321
cli/planoai/obs/pricing.py Normal file
View file

@ -0,0 +1,321 @@
"""DigitalOcean Gradient pricing catalog for the obs console.
Ported loosely from ``crates/brightstaff/src/router/model_metrics.rs::fetch_do_pricing``.
Single-source: one fetch at startup, cached for the life of the process.
"""
from __future__ import annotations
import logging
import re
import threading
from dataclasses import dataclass
from typing import Any
import requests
DEFAULT_PRICING_URL = "https://api.digitalocean.com/v2/gen-ai/models/catalog"
FETCH_TIMEOUT_SECS = 5.0
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class ModelPrice:
"""Input/output $/token rates. Token counts are multiplied by these."""
input_per_token_usd: float
output_per_token_usd: float
cached_input_per_token_usd: float | None = None
class PricingCatalog:
"""In-memory pricing lookup keyed by model id.
DO's catalog uses ids like ``openai-gpt-5.4``; Plano's resolved model names
may arrive as ``do/openai-gpt-5.4`` or bare ``openai-gpt-5.4``. We strip the
leading provider prefix when looking up.
"""
def __init__(self, prices: dict[str, ModelPrice] | None = None) -> None:
self._prices: dict[str, ModelPrice] = prices or {}
self._lock = threading.Lock()
def __len__(self) -> int:
with self._lock:
return len(self._prices)
def sample_models(self, n: int = 5) -> list[str]:
with self._lock:
return list(self._prices.keys())[:n]
@classmethod
def fetch(cls, url: str = DEFAULT_PRICING_URL) -> "PricingCatalog":
"""Fetch pricing from DO's catalog endpoint. On failure, returns an
empty catalog (cost column will be blank).
The catalog endpoint is public no auth required, no signup so
``planoai obs`` gets cost data on first run out of the box.
"""
try:
resp = requests.get(url, timeout=FETCH_TIMEOUT_SECS)
resp.raise_for_status()
data = resp.json()
except Exception as exc: # noqa: BLE001 — best-effort; never fatal
logger.warning(
"DO pricing fetch failed: %s; cost column will be blank.",
exc,
)
return cls()
prices = _parse_do_pricing(data)
if not prices:
# Dump the first entry's raw shape so we can see which fields DO
# actually returned — helps when the catalog adds new fields or
# the response doesn't match our parser.
import json as _json
sample_items = _coerce_items(data)
sample = sample_items[0] if sample_items else data
logger.warning(
"DO pricing response had no parseable entries; cost column "
"will be blank. Sample entry: %s",
_json.dumps(sample, default=str)[:400],
)
return cls(prices)
def price_for(self, model_name: str | None) -> ModelPrice | None:
if not model_name:
return None
with self._lock:
# Try the full name first, then stripped prefix, then lowercased variants.
for candidate in _model_key_candidates(model_name):
hit = self._prices.get(candidate)
if hit is not None:
return hit
return None
def cost_for_call(self, call: Any) -> float | None:
"""Compute USD cost for an LLMCall. Returns None when pricing is unknown."""
price = self.price_for(getattr(call, "model", None)) or self.price_for(
getattr(call, "request_model", None)
)
if price is None:
return None
prompt = int(getattr(call, "prompt_tokens", 0) or 0)
completion = int(getattr(call, "completion_tokens", 0) or 0)
cached = int(getattr(call, "cached_input_tokens", 0) or 0)
# Cached input tokens are priced separately at the cached rate when known;
# otherwise they're already counted in prompt tokens at the regular rate.
fresh_prompt = prompt
if price.cached_input_per_token_usd is not None and cached:
fresh_prompt = max(0, prompt - cached)
cost_cached = cached * price.cached_input_per_token_usd
else:
cost_cached = 0.0
cost = (
fresh_prompt * price.input_per_token_usd
+ completion * price.output_per_token_usd
+ cost_cached
)
return round(cost, 6)
_DATE_SUFFIX_RE = re.compile(r"-\d{8}$")
_PROVIDER_PREFIXES = ("anthropic", "openai", "google", "meta", "cohere", "mistral")
_ANTHROPIC_FAMILIES = {"opus", "sonnet", "haiku"}
def _model_key_candidates(model_name: str) -> list[str]:
"""Lookup-side variants of a Plano-emitted model name.
Plano resolves names like ``claude-haiku-4-5-20251001``; the catalog stores
them as ``anthropic-claude-haiku-4.5``. We strip the date suffix and the
``provider/`` prefix here; the catalog itself registers the dash/dot and
family-order aliases at parse time (see :func:`_expand_aliases`).
"""
base = model_name.strip()
out = [base]
if "/" in base:
out.append(base.split("/", 1)[1])
for k in list(out):
stripped = _DATE_SUFFIX_RE.sub("", k)
if stripped != k:
out.append(stripped)
out.extend([v.lower() for v in list(out)])
seen: set[str] = set()
uniq = []
for key in out:
if key not in seen:
seen.add(key)
uniq.append(key)
return uniq
def _expand_aliases(model_id: str) -> set[str]:
"""Catalog-side variants of a DO model id.
DO publishes Anthropic models under ids like ``anthropic-claude-opus-4.7``
or ``anthropic-claude-4.6-sonnet`` while Plano emits ``claude-opus-4-7`` /
``claude-sonnet-4-6``. Generate a set covering provider-prefix stripping,
dashdot in version segments, and familyversion word order so a single
catalog entry matches every name shape we'll see at lookup.
"""
aliases: set[str] = set()
def add(name: str) -> None:
if not name:
return
aliases.add(name)
aliases.add(name.lower())
add(model_id)
base = model_id
head, _, rest = base.partition("-")
if head.lower() in _PROVIDER_PREFIXES and rest:
add(rest)
base = rest
for key in list(aliases):
if "." in key:
add(key.replace(".", "-"))
parts = base.split("-")
if len(parts) >= 3 and parts[0].lower() == "claude":
rest_parts = parts[1:]
for i, p in enumerate(rest_parts):
if p.lower() in _ANTHROPIC_FAMILIES:
others = rest_parts[:i] + rest_parts[i + 1 :]
if not others:
break
family_last = "claude-" + "-".join(others) + "-" + p
family_first = "claude-" + p + "-" + "-".join(others)
add(family_last)
add(family_first)
add(family_last.replace(".", "-"))
add(family_first.replace(".", "-"))
break
return aliases
def _parse_do_pricing(data: Any) -> dict[str, ModelPrice]:
"""Parse DO catalog response into a ModelPrice map keyed by model id.
DO's shape (as of 2026-04):
{
"data": [
{"model_id": "openai-gpt-5.4",
"pricing": {"input_price_per_million": 5.0,
"output_price_per_million": 15.0}},
...
]
}
Older/alternate shapes are also accepted (flat top-level fields, or the
``id``/``model``/``name`` key).
"""
prices: dict[str, ModelPrice] = {}
items = _coerce_items(data)
for item in items:
model_id = (
item.get("model_id")
or item.get("id")
or item.get("model")
or item.get("name")
)
if not model_id:
continue
# DO nests rates under `pricing`; try that first, then fall back to
# top-level fields for alternate response shapes.
sources = [item]
if isinstance(item.get("pricing"), dict):
sources.insert(0, item["pricing"])
input_rate = _extract_rate_from_sources(
sources,
["input_per_token", "input_token_price", "price_input"],
["input_price_per_million", "input_per_million", "input_per_mtok"],
)
output_rate = _extract_rate_from_sources(
sources,
["output_per_token", "output_token_price", "price_output"],
["output_price_per_million", "output_per_million", "output_per_mtok"],
)
cached_rate = _extract_rate_from_sources(
sources,
[
"cached_input_per_token",
"cached_input_token_price",
"prompt_cache_read_per_token",
],
[
"cached_input_price_per_million",
"cached_input_per_million",
"cached_input_per_mtok",
],
)
if input_rate is None or output_rate is None:
continue
# Treat 0-rate entries as "unknown" so cost falls back to `—` rather
# than showing a misleading $0.0000. DO's catalog sometimes omits
# rates for promo/open-weight models.
if input_rate == 0 and output_rate == 0:
continue
price = ModelPrice(
input_per_token_usd=input_rate,
output_per_token_usd=output_rate,
cached_input_per_token_usd=cached_rate,
)
for alias in _expand_aliases(str(model_id)):
prices.setdefault(alias, price)
return prices
def _coerce_items(data: Any) -> list[dict]:
if isinstance(data, list):
return [x for x in data if isinstance(x, dict)]
if isinstance(data, dict):
for key in ("data", "models", "pricing", "items"):
val = data.get(key)
if isinstance(val, list):
return [x for x in val if isinstance(x, dict)]
return []
def _extract_rate_from_sources(
sources: list[dict],
per_token_keys: list[str],
per_million_keys: list[str],
) -> float | None:
"""Return a per-token rate in USD, or None if unknown.
Some DO catalog responses put per-token values under a field whose name
says ``_per_million`` (e.g. ``input_price_per_million: 5E-8`` that's
$5e-8 per token, not per million). Heuristic: values < 1 are already
per-token (real per-million rates are ~0.1 to ~100); values >= 1 are
treated as per-million and divided by 1,000,000.
"""
for src in sources:
for key in per_token_keys:
if key in src and src[key] is not None:
try:
return float(src[key])
except (TypeError, ValueError):
continue
for key in per_million_keys:
if key in src and src[key] is not None:
try:
v = float(src[key])
except (TypeError, ValueError):
continue
if v >= 1:
return v / 1_000_000
return v
return None

634
cli/planoai/obs/render.py Normal file
View file

@ -0,0 +1,634 @@
"""Rich TUI renderer for the observability console."""
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
from datetime import datetime
from http import HTTPStatus
from rich.align import Align
from rich.box import SIMPLE, SIMPLE_HEAVY
from rich.console import Group
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
MAX_WIDTH = 160
from planoai.obs.collector import LLMCall
@dataclass
class AggregateStats:
count: int
total_cost_usd: float
total_input_tokens: int
total_output_tokens: int
distinct_sessions: int
current_session: str | None
p50_latency_ms: float | None = None
p95_latency_ms: float | None = None
p99_latency_ms: float | None = None
p50_ttft_ms: float | None = None
p95_ttft_ms: float | None = None
p99_ttft_ms: float | None = None
error_count: int = 0
errors_4xx: int = 0
errors_5xx: int = 0
has_cost: bool = False
@dataclass
class ModelRollup:
model: str
requests: int
input_tokens: int
output_tokens: int
cache_write: int
cache_read: int
cost_usd: float
has_cost: bool = False
avg_tokens_per_sec: float | None = None
def _percentile(values: list[float], pct: float) -> float | None:
if not values:
return None
s = sorted(values)
k = max(0, min(len(s) - 1, int(round((pct / 100.0) * (len(s) - 1)))))
return s[k]
def aggregates(calls: list[LLMCall]) -> AggregateStats:
total_cost = sum((c.cost_usd or 0.0) for c in calls)
total_input = sum(int(c.prompt_tokens or 0) for c in calls)
total_output = sum(int(c.completion_tokens or 0) for c in calls)
session_ids = {c.session_id for c in calls if c.session_id}
current = next(
(c.session_id for c in reversed(calls) if c.session_id is not None), None
)
durations = [c.duration_ms for c in calls if c.duration_ms is not None]
ttfts = [c.ttft_ms for c in calls if c.ttft_ms is not None]
errors_4xx = sum(
1 for c in calls if c.status_code is not None and 400 <= c.status_code < 500
)
errors_5xx = sum(
1 for c in calls if c.status_code is not None and c.status_code >= 500
)
has_cost = any(c.cost_usd is not None for c in calls)
return AggregateStats(
count=len(calls),
total_cost_usd=total_cost,
total_input_tokens=total_input,
total_output_tokens=total_output,
distinct_sessions=len(session_ids),
current_session=current,
p50_latency_ms=_percentile(durations, 50),
p95_latency_ms=_percentile(durations, 95),
p99_latency_ms=_percentile(durations, 99),
p50_ttft_ms=_percentile(ttfts, 50),
p95_ttft_ms=_percentile(ttfts, 95),
p99_ttft_ms=_percentile(ttfts, 99),
error_count=errors_4xx + errors_5xx,
errors_4xx=errors_4xx,
errors_5xx=errors_5xx,
has_cost=has_cost,
)
def model_rollups(calls: list[LLMCall]) -> list[ModelRollup]:
buckets: dict[str, dict[str, float | int | bool]] = {}
tps_samples: dict[str, list[float]] = {}
for c in calls:
key = c.model
b = buckets.setdefault(
key,
{
"requests": 0,
"input": 0,
"output": 0,
"cache_write": 0,
"cache_read": 0,
"cost": 0.0,
"has_cost": False,
},
)
b["requests"] = int(b["requests"]) + 1
b["input"] = int(b["input"]) + int(c.prompt_tokens or 0)
b["output"] = int(b["output"]) + int(c.completion_tokens or 0)
b["cache_write"] = int(b["cache_write"]) + int(c.cache_creation_tokens or 0)
b["cache_read"] = int(b["cache_read"]) + int(c.cached_input_tokens or 0)
b["cost"] = float(b["cost"]) + (c.cost_usd or 0.0)
if c.cost_usd is not None:
b["has_cost"] = True
tps = c.tokens_per_sec
if tps is not None:
tps_samples.setdefault(key, []).append(tps)
rollups: list[ModelRollup] = []
for model, b in buckets.items():
samples = tps_samples.get(model)
avg_tps = (sum(samples) / len(samples)) if samples else None
rollups.append(
ModelRollup(
model=model,
requests=int(b["requests"]),
input_tokens=int(b["input"]),
output_tokens=int(b["output"]),
cache_write=int(b["cache_write"]),
cache_read=int(b["cache_read"]),
cost_usd=float(b["cost"]),
has_cost=bool(b["has_cost"]),
avg_tokens_per_sec=avg_tps,
)
)
rollups.sort(key=lambda r: (r.cost_usd, r.requests), reverse=True)
return rollups
@dataclass
class RouteHit:
route: str
hits: int
pct: float
p95_latency_ms: float | None
error_count: int
def route_hits(calls: list[LLMCall]) -> list[RouteHit]:
counts: Counter[str] = Counter()
per_route_latency: dict[str, list[float]] = {}
per_route_errors: dict[str, int] = {}
for c in calls:
if not c.route_name:
continue
counts[c.route_name] += 1
if c.duration_ms is not None:
per_route_latency.setdefault(c.route_name, []).append(c.duration_ms)
if c.status_code is not None and c.status_code >= 400:
per_route_errors[c.route_name] = per_route_errors.get(c.route_name, 0) + 1
total = sum(counts.values())
if total == 0:
return []
return [
RouteHit(
route=r,
hits=n,
pct=(n / total) * 100.0,
p95_latency_ms=_percentile(per_route_latency.get(r, []), 95),
error_count=per_route_errors.get(r, 0),
)
for r, n in counts.most_common()
]
def _fmt_cost(v: float | None, *, zero: str = "") -> str:
if v is None:
return ""
if v == 0:
return zero
if abs(v) < 0.0001:
return f"${v:.8f}".rstrip("0").rstrip(".")
if abs(v) < 0.01:
return f"${v:.6f}".rstrip("0").rstrip(".")
if abs(v) < 1:
return f"${v:.4f}"
return f"${v:,.2f}"
def _fmt_ms(v: float | None) -> str:
if v is None:
return ""
if v >= 1000:
return f"{v / 1000:.1f}s"
return f"{v:.0f}ms"
def _fmt_int(v: int | None) -> str:
if v is None or v == 0:
return ""
return f"{v:,}"
def _fmt_tokens(v: int | None) -> str:
if v is None:
return ""
return f"{v:,}"
def _fmt_tps(v: float | None) -> str:
if v is None or v <= 0:
return ""
if v >= 100:
return f"{v:.0f}/s"
return f"{v:.1f}/s"
def _latency_style(v: float | None) -> str:
if v is None:
return "dim"
if v < 500:
return "green"
if v < 2000:
return "yellow"
return "red"
def _ttft_style(v: float | None) -> str:
if v is None:
return "dim"
if v < 300:
return "green"
if v < 1000:
return "yellow"
return "red"
def _truncate_model(name: str, limit: int = 32) -> str:
if len(name) <= limit:
return name
return name[: limit - 1] + ""
def _status_text(code: int | None) -> Text:
if code is None:
return Text("", style="dim")
if 200 <= code < 300:
return Text("● ok", style="green")
if 300 <= code < 400:
return Text(f"{code}", style="yellow")
if 400 <= code < 500:
return Text(f"{code}", style="yellow bold")
return Text(f"{code}", style="red bold")
def _summary_panel(last: LLMCall | None, stats: AggregateStats) -> Panel:
# Content-sized columns with a fixed gutter keep the two blocks close
# together instead of stretching across the full terminal on wide screens.
grid = Table.grid(padding=(0, 4))
grid.add_column(no_wrap=True)
grid.add_column(no_wrap=True)
# Left: latest request snapshot.
left = Table.grid(padding=(0, 1))
left.add_column(style="dim", no_wrap=True)
left.add_column(no_wrap=True)
if last is None:
left.add_row("latest", Text("waiting for spans…", style="dim italic"))
else:
model_text = Text(_truncate_model(last.model, 48), style="bold cyan")
if last.is_streaming:
model_text.append(" ⟳ stream", style="dim")
left.add_row("model", model_text)
if last.request_model and last.request_model != last.model:
left.add_row(
"requested", Text(_truncate_model(last.request_model, 48), style="cyan")
)
if last.route_name:
left.add_row("route", Text(last.route_name, style="yellow"))
left.add_row("status", _status_text(last.status_code))
tokens = Text()
tokens.append(_fmt_tokens(last.prompt_tokens))
tokens.append(" in", style="dim")
tokens.append(" · ", style="dim")
tokens.append(_fmt_tokens(last.completion_tokens), style="green")
tokens.append(" out", style="dim")
if last.cached_input_tokens:
tokens.append(" · ", style="dim")
tokens.append(_fmt_tokens(last.cached_input_tokens), style="yellow")
tokens.append(" cached", style="dim")
left.add_row("tokens", tokens)
timing = Text()
timing.append("TTFT ", style="dim")
timing.append(_fmt_ms(last.ttft_ms), style=_ttft_style(last.ttft_ms))
timing.append(" · ", style="dim")
timing.append("lat ", style="dim")
timing.append(_fmt_ms(last.duration_ms), style=_latency_style(last.duration_ms))
tps = last.tokens_per_sec
if tps:
timing.append(" · ", style="dim")
timing.append(_fmt_tps(tps), style="green")
left.add_row("timing", timing)
left.add_row("cost", Text(_fmt_cost(last.cost_usd), style="green bold"))
# Right: lifetime totals.
right = Table.grid(padding=(0, 1))
right.add_column(style="dim", no_wrap=True)
right.add_column(no_wrap=True)
right.add_row(
"requests",
Text(f"{stats.count:,}", style="bold"),
)
if stats.error_count:
err_text = Text()
err_text.append(f"{stats.error_count:,}", style="red bold")
parts: list[str] = []
if stats.errors_4xx:
parts.append(f"{stats.errors_4xx} 4xx")
if stats.errors_5xx:
parts.append(f"{stats.errors_5xx} 5xx")
if parts:
err_text.append(f" ({' · '.join(parts)})", style="dim")
right.add_row("errors", err_text)
cost_str = _fmt_cost(stats.total_cost_usd) if stats.has_cost else ""
right.add_row("total cost", Text(cost_str, style="green bold"))
tokens_total = Text()
tokens_total.append(_fmt_tokens(stats.total_input_tokens))
tokens_total.append(" in", style="dim")
tokens_total.append(" · ", style="dim")
tokens_total.append(_fmt_tokens(stats.total_output_tokens), style="green")
tokens_total.append(" out", style="dim")
right.add_row("tokens", tokens_total)
lat_text = Text()
lat_text.append("p50 ", style="dim")
lat_text.append(
_fmt_ms(stats.p50_latency_ms), style=_latency_style(stats.p50_latency_ms)
)
lat_text.append(" · ", style="dim")
lat_text.append("p95 ", style="dim")
lat_text.append(
_fmt_ms(stats.p95_latency_ms), style=_latency_style(stats.p95_latency_ms)
)
lat_text.append(" · ", style="dim")
lat_text.append("p99 ", style="dim")
lat_text.append(
_fmt_ms(stats.p99_latency_ms), style=_latency_style(stats.p99_latency_ms)
)
right.add_row("latency", lat_text)
ttft_text = Text()
ttft_text.append("p50 ", style="dim")
ttft_text.append(_fmt_ms(stats.p50_ttft_ms), style=_ttft_style(stats.p50_ttft_ms))
ttft_text.append(" · ", style="dim")
ttft_text.append("p95 ", style="dim")
ttft_text.append(_fmt_ms(stats.p95_ttft_ms), style=_ttft_style(stats.p95_ttft_ms))
ttft_text.append(" · ", style="dim")
ttft_text.append("p99 ", style="dim")
ttft_text.append(_fmt_ms(stats.p99_ttft_ms), style=_ttft_style(stats.p99_ttft_ms))
right.add_row("TTFT", ttft_text)
sess = Text()
sess.append(f"{stats.distinct_sessions}")
if stats.current_session:
sess.append(" · current ", style="dim")
sess.append(stats.current_session, style="magenta")
right.add_row("sessions", sess)
grid.add_row(left, right)
return Panel(
grid,
title="[bold]live LLM traffic[/]",
border_style="cyan",
box=SIMPLE_HEAVY,
padding=(0, 1),
)
def _model_rollup_table(rollups: list[ModelRollup]) -> Table:
table = Table(
title="by model",
title_justify="left",
title_style="bold dim",
caption="cost via DigitalOcean Gradient catalog",
caption_justify="left",
caption_style="dim italic",
box=SIMPLE,
header_style="bold",
pad_edge=False,
padding=(0, 1),
)
table.add_column("model", style="cyan", no_wrap=True)
table.add_column("req", justify="right")
table.add_column("input", justify="right")
table.add_column("output", justify="right", style="green")
table.add_column("cache wr", justify="right", style="yellow")
table.add_column("cache rd", justify="right", style="yellow")
table.add_column("tok/s", justify="right")
table.add_column("cost", justify="right", style="green")
if not rollups:
table.add_row(
Text("no requests yet", style="dim italic"),
*([""] * 7),
)
return table
for r in rollups:
cost_cell = _fmt_cost(r.cost_usd) if r.has_cost else ""
table.add_row(
_truncate_model(r.model),
f"{r.requests:,}",
_fmt_tokens(r.input_tokens),
_fmt_tokens(r.output_tokens),
_fmt_int(r.cache_write),
_fmt_int(r.cache_read),
_fmt_tps(r.avg_tokens_per_sec),
cost_cell,
)
return table
def _route_hit_table(hits: list[RouteHit]) -> Table:
table = Table(
title="route share",
title_justify="left",
title_style="bold dim",
box=SIMPLE,
header_style="bold",
pad_edge=False,
padding=(0, 1),
)
table.add_column("route", style="cyan")
table.add_column("hits", justify="right")
table.add_column("%", justify="right")
table.add_column("p95", justify="right")
table.add_column("err", justify="right")
for h in hits:
err_cell = (
Text(f"{h.error_count:,}", style="red bold") if h.error_count else ""
)
table.add_row(
h.route,
f"{h.hits:,}",
f"{h.pct:5.1f}%",
Text(_fmt_ms(h.p95_latency_ms), style=_latency_style(h.p95_latency_ms)),
err_cell,
)
return table
def _recent_table(calls: list[LLMCall], limit: int = 15) -> Table:
show_route = any(c.route_name for c in calls)
show_cache = any((c.cached_input_tokens or 0) > 0 for c in calls)
show_rsn = any((c.reasoning_tokens or 0) > 0 for c in calls)
caption_parts = ["in·new = fresh prompt tokens"]
if show_cache:
caption_parts.append("in·cache = cached read")
if show_rsn:
caption_parts.append("rsn = reasoning")
caption_parts.append("lat = total latency")
table = Table(
title=f"recent · last {min(limit, len(calls)) if calls else 0}",
title_justify="left",
title_style="bold dim",
caption=" · ".join(caption_parts),
caption_justify="left",
caption_style="dim italic",
box=SIMPLE,
header_style="bold",
pad_edge=False,
padding=(0, 1),
)
table.add_column("time", no_wrap=True)
table.add_column("model", style="cyan", no_wrap=True)
if show_route:
table.add_column("route", style="yellow", no_wrap=True)
table.add_column("in·new", justify="right")
if show_cache:
table.add_column("in·cache", justify="right", style="yellow")
table.add_column("out", justify="right", style="green")
if show_rsn:
table.add_column("rsn", justify="right")
table.add_column("tok/s", justify="right")
table.add_column("TTFT", justify="right")
table.add_column("lat", justify="right")
table.add_column("cost", justify="right", style="green")
table.add_column("status")
if not calls:
cols = len(table.columns)
table.add_row(
Text("waiting for spans…", style="dim italic"),
*([""] * (cols - 1)),
)
return table
recent = list(reversed(calls))[:limit]
for idx, c in enumerate(recent):
is_newest = idx == 0
time_style = "bold white" if is_newest else None
model_style = "bold cyan" if is_newest else "cyan"
row: list[object] = [
(
Text(c.timestamp.strftime("%H:%M:%S"), style=time_style)
if time_style
else c.timestamp.strftime("%H:%M:%S")
),
Text(_truncate_model(c.model), style=model_style),
]
if show_route:
row.append(c.route_name or "")
row.append(_fmt_tokens(c.prompt_tokens))
if show_cache:
row.append(_fmt_int(c.cached_input_tokens))
row.append(_fmt_tokens(c.completion_tokens))
if show_rsn:
row.append(_fmt_int(c.reasoning_tokens))
row.extend(
[
_fmt_tps(c.tokens_per_sec),
Text(_fmt_ms(c.ttft_ms), style=_ttft_style(c.ttft_ms)),
Text(_fmt_ms(c.duration_ms), style=_latency_style(c.duration_ms)),
_fmt_cost(c.cost_usd),
_status_text(c.status_code),
]
)
table.add_row(*row)
return table
def _last_error(calls: list[LLMCall]) -> LLMCall | None:
for c in reversed(calls):
if c.status_code is not None and c.status_code >= 400:
return c
return None
def _http_reason(code: int) -> str:
try:
return HTTPStatus(code).phrase
except ValueError:
return ""
def _fmt_ago(ts: datetime) -> str:
# `ts` is produced in collector.py via datetime.now(tz=...), but fall back
# gracefully if a naive timestamp ever sneaks in.
now = datetime.now(tz=ts.tzinfo) if ts.tzinfo else datetime.now()
delta = (now - ts).total_seconds()
if delta < 0:
delta = 0
if delta < 60:
return f"{int(delta)}s ago"
if delta < 3600:
return f"{int(delta // 60)}m ago"
return f"{int(delta // 3600)}h ago"
def _error_banner(call: LLMCall) -> Panel:
code = call.status_code or 0
border = "red" if code >= 500 else "yellow"
header = Text()
header.append(f"{code}", style=f"{border} bold")
reason = _http_reason(code)
if reason:
header.append(f" {reason}", style=border)
header.append(" · ", style="dim")
header.append(_truncate_model(call.model, 48), style="cyan")
if call.route_name:
header.append(" · ", style="dim")
header.append(call.route_name, style="yellow")
header.append(" · ", style="dim")
header.append(_fmt_ago(call.timestamp), style="dim")
if call.request_id:
header.append(" · req ", style="dim")
header.append(call.request_id, style="magenta")
return Panel(
header,
title="[bold]last error[/]",
title_align="left",
border_style=border,
box=SIMPLE,
padding=(0, 1),
)
def _footer(stats: AggregateStats) -> Text:
waiting = stats.count == 0
text = Text()
text.append("Ctrl-C ", style="bold")
text.append("exit", style="dim")
text.append(" · OTLP :4317", style="dim")
text.append(" · pricing: DigitalOcean ", style="dim")
if waiting:
text.append("waiting for spans", style="yellow")
text.append(
" — set tracing.opentracing_grpc_endpoint=localhost:4317", style="dim"
)
else:
text.append(f"receiving · {stats.count:,} call(s) buffered", style="green")
return text
def render(calls: list[LLMCall]) -> Align:
last = calls[-1] if calls else None
stats = aggregates(calls)
rollups = model_rollups(calls)
hits = route_hits(calls)
parts: list[object] = [_summary_panel(last, stats)]
err = _last_error(calls)
if err is not None:
parts.append(_error_banner(err))
if hits:
split = Table.grid(padding=(0, 2))
split.add_column(no_wrap=False)
split.add_column(no_wrap=False)
split.add_row(_model_rollup_table(rollups), _route_hit_table(hits))
parts.append(split)
else:
parts.append(_model_rollup_table(rollups))
parts.append(_recent_table(calls))
parts.append(_footer(stats))
# Cap overall width so wide terminals don't stretch the layout into a
# mostly-whitespace gap between columns.
return Align.left(Group(*parts), width=MAX_WIDTH)

99
cli/planoai/obs_cmd.py Normal file
View file

@ -0,0 +1,99 @@
"""`planoai obs` — live observability TUI."""
from __future__ import annotations
import time
import rich_click as click
from rich.console import Console
from rich.live import Live
from planoai.consts import PLANO_COLOR
from planoai.obs.collector import (
DEFAULT_CAPACITY,
DEFAULT_GRPC_PORT,
LLMCallStore,
ObsCollector,
)
from planoai.obs.pricing import PricingCatalog
from planoai.obs.render import render
@click.command(name="obs", help="Live observability console for Plano LLM traffic.")
@click.option(
"--port",
type=int,
default=DEFAULT_GRPC_PORT,
show_default=True,
help="OTLP/gRPC port to listen on. Must match the brightstaff tracing endpoint.",
)
@click.option(
"--host",
type=str,
default="0.0.0.0",
show_default=True,
help="Host to bind the OTLP listener.",
)
@click.option(
"--capacity",
type=int,
default=DEFAULT_CAPACITY,
show_default=True,
help="Max LLM calls kept in memory; older calls evicted FIFO.",
)
@click.option(
"--refresh-ms",
type=int,
default=500,
show_default=True,
help="TUI refresh interval.",
)
def obs(port: int, host: str, capacity: int, refresh_ms: int) -> None:
console = Console()
console.print(
f"[bold {PLANO_COLOR}]planoai obs[/] — loading DO pricing catalog...",
end="",
)
pricing = PricingCatalog.fetch()
if len(pricing):
sample = ", ".join(pricing.sample_models(3))
console.print(
f" [green]{len(pricing)} models loaded[/] [dim]({sample}, ...)[/]"
)
else:
console.print(
" [yellow]no pricing loaded[/] — "
"[dim]cost column will be blank (DO catalog unreachable)[/]"
)
store = LLMCallStore(capacity=capacity)
collector = ObsCollector(store=store, pricing=pricing, host=host, port=port)
try:
collector.start()
except OSError as exc:
console.print(f"[red]{exc}[/]")
raise SystemExit(1)
console.print(
f"Listening for OTLP spans on [bold]{host}:{port}[/]. "
"Ensure plano config has [cyan]tracing.opentracing_grpc_endpoint: http://localhost:4317[/] "
"and [cyan]tracing.random_sampling: 100[/] (or run [bold]planoai up[/] "
"with no config — it wires this automatically)."
)
console.print("Press [bold]Ctrl-C[/] to exit.\n")
refresh = max(0.05, refresh_ms / 1000.0)
try:
with Live(
render(store.snapshot()),
console=console,
refresh_per_second=1.0 / refresh,
screen=False,
) as live:
while True:
time.sleep(refresh)
live.update(render(store.snapshot()))
except KeyboardInterrupt:
console.print("\n[dim]obs stopped[/]")
finally:
collector.stop()

View file

@ -61,7 +61,7 @@ def configure_rich_click(plano_color: str) -> None:
},
{
"name": "Observability",
"commands": ["trace"],
"commands": ["trace", "obs"],
},
{
"name": "Utilities",

View file

@ -91,7 +91,12 @@ def convert_legacy_listeners(
"type": "model",
"port": 12000,
"address": "0.0.0.0",
"timeout": "30s",
# LLM streaming responses routinely exceed 30s (extended thinking,
# long tool reasoning, large completions). Match the 300s ceiling
# used by the direct upstream-provider routes so Envoy doesn't
# abort streams with UT mid-response. Users can override via their
# plano_config.yaml `listeners.timeout` field.
"timeout": "300s",
"model_providers": model_providers or [],
}
@ -100,7 +105,7 @@ def convert_legacy_listeners(
"type": "prompt",
"port": 10000,
"address": "0.0.0.0",
"timeout": "30s",
"timeout": "300s",
}
# Handle None case

View file

@ -1,6 +1,6 @@
[project]
name = "planoai"
version = "0.4.19"
version = "0.4.20"
description = "Python-based CLI tool to manage Plano."
authors = [{name = "Katanemo Labs, Inc."}]
readme = "README.md"

86
cli/test/test_defaults.py Normal file
View file

@ -0,0 +1,86 @@
from pathlib import Path
import jsonschema
import yaml
from planoai.defaults import (
PROVIDER_DEFAULTS,
detect_providers,
synthesize_default_config,
)
_SCHEMA_PATH = Path(__file__).parents[2] / "config" / "plano_config_schema.yaml"
def _schema() -> dict:
return yaml.safe_load(_SCHEMA_PATH.read_text())
def test_zero_env_vars_produces_pure_passthrough():
cfg = synthesize_default_config(env={})
assert cfg["version"] == "v0.4.0"
assert cfg["listeners"][0]["port"] == 12000
for provider in cfg["model_providers"]:
assert provider.get("passthrough_auth") is True
assert "access_key" not in provider
# No provider should be marked default in pure pass-through mode.
assert provider.get("default") is not True
# All known providers should be listed.
names = {p["name"] for p in cfg["model_providers"]}
assert "digitalocean" in names
assert "openai" in names
assert "anthropic" in names
def test_env_keys_promote_providers_to_env_keyed():
cfg = synthesize_default_config(
env={"OPENAI_API_KEY": "sk-1", "DO_API_KEY": "do-1"}
)
by_name = {p["name"]: p for p in cfg["model_providers"]}
assert by_name["openai"].get("access_key") == "$OPENAI_API_KEY"
assert by_name["openai"].get("passthrough_auth") is None
assert by_name["digitalocean"].get("access_key") == "$DO_API_KEY"
# Unset env keys remain pass-through.
assert by_name["anthropic"].get("passthrough_auth") is True
def test_no_default_is_synthesized():
# Bare model names resolve via brightstaff's wildcard expansion registering
# bare keys, so the synthesizer intentionally never sets `default: true`.
cfg = synthesize_default_config(
env={"OPENAI_API_KEY": "sk-1", "ANTHROPIC_API_KEY": "a-1"}
)
assert not any(p.get("default") is True for p in cfg["model_providers"])
def test_listener_port_is_configurable():
cfg = synthesize_default_config(env={}, listener_port=11000)
assert cfg["listeners"][0]["port"] == 11000
def test_detection_summary_strings():
det = detect_providers(env={"OPENAI_API_KEY": "sk", "DO_API_KEY": "d"})
summary = det.summary
assert "env-keyed" in summary and "openai" in summary and "digitalocean" in summary
assert "pass-through" in summary
def test_tracing_block_points_at_local_console():
cfg = synthesize_default_config(env={})
tracing = cfg["tracing"]
assert tracing["opentracing_grpc_endpoint"] == "http://localhost:4317"
# random_sampling is a percentage in the plano config — 100 = every span.
assert tracing["random_sampling"] == 100
def test_synthesized_config_validates_against_schema():
cfg = synthesize_default_config(env={"OPENAI_API_KEY": "sk"})
jsonschema.validate(cfg, _schema())
def test_provider_defaults_digitalocean_is_configured():
by_name = {p.name: p for p in PROVIDER_DEFAULTS}
assert "digitalocean" in by_name
assert by_name["digitalocean"].env_var == "DO_API_KEY"
assert by_name["digitalocean"].base_url == "https://inference.do-ai.run/v1"
assert by_name["digitalocean"].model_pattern == "digitalocean/*"

View file

@ -0,0 +1,145 @@
import time
from datetime import datetime, timezone
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from planoai.obs.collector import LLMCall, LLMCallStore, span_to_llm_call
def _mk_attr(key: str, value):
v = MagicMock()
if isinstance(value, bool):
v.WhichOneof.return_value = "bool_value"
v.bool_value = value
elif isinstance(value, int):
v.WhichOneof.return_value = "int_value"
v.int_value = value
elif isinstance(value, float):
v.WhichOneof.return_value = "double_value"
v.double_value = value
else:
v.WhichOneof.return_value = "string_value"
v.string_value = str(value)
kv = MagicMock()
kv.key = key
kv.value = v
return kv
def _mk_span(
attrs: dict, start_ns: int | None = None, span_id_hex: str = "ab"
) -> MagicMock:
span = MagicMock()
span.attributes = [_mk_attr(k, v) for k, v in attrs.items()]
span.start_time_unix_nano = start_ns or int(time.time() * 1_000_000_000)
span.span_id.hex.return_value = span_id_hex
return span
def test_span_without_llm_model_is_ignored():
span = _mk_span({"http.method": "POST"})
assert span_to_llm_call(span, "plano(llm)") is None
def test_span_with_full_llm_attrs_produces_call():
span = _mk_span(
{
"llm.model": "openai-gpt-5.4",
"model.requested": "router:software-engineering",
"plano.session_id": "sess-abc",
"plano.route.name": "software-engineering",
"llm.is_streaming": False,
"llm.duration_ms": 1234,
"llm.time_to_first_token": 210,
"llm.usage.prompt_tokens": 100,
"llm.usage.completion_tokens": 50,
"llm.usage.total_tokens": 150,
"llm.usage.cached_input_tokens": 30,
"llm.usage.cache_creation_tokens": 5,
"llm.usage.reasoning_tokens": 200,
"http.status_code": 200,
"request_id": "req-42",
}
)
call = span_to_llm_call(span, "plano(llm)")
assert call is not None
assert call.request_id == "req-42"
assert call.model == "openai-gpt-5.4"
assert call.request_model == "router:software-engineering"
assert call.session_id == "sess-abc"
assert call.route_name == "software-engineering"
assert call.is_streaming is False
assert call.duration_ms == 1234.0
assert call.ttft_ms == 210.0
assert call.prompt_tokens == 100
assert call.completion_tokens == 50
assert call.total_tokens == 150
assert call.cached_input_tokens == 30
assert call.cache_creation_tokens == 5
assert call.reasoning_tokens == 200
assert call.status_code == 200
def test_pricing_lookup_attaches_cost():
class StubPricing:
def cost_for_call(self, call):
# Simple: 2 * prompt + 3 * completion, in cents
return 0.02 * (call.prompt_tokens or 0) + 0.03 * (
call.completion_tokens or 0
)
span = _mk_span(
{
"llm.model": "do/openai-gpt-5.4",
"llm.usage.prompt_tokens": 10,
"llm.usage.completion_tokens": 2,
}
)
call = span_to_llm_call(span, "plano(llm)", pricing=StubPricing())
assert call is not None
assert call.cost_usd == pytest.approx(0.26)
def test_tpt_and_tokens_per_sec_derived():
call = LLMCall(
request_id="x",
timestamp=datetime.now(tz=timezone.utc),
model="m",
duration_ms=1000,
ttft_ms=200,
completion_tokens=80,
)
# (1000 - 200) / 80 = 10ms per token => 100 tokens/sec
assert call.tpt_ms == 10.0
assert call.tokens_per_sec == 100.0
def test_tpt_returns_none_when_no_completion_tokens():
call = LLMCall(
request_id="x",
timestamp=datetime.now(tz=timezone.utc),
model="m",
duration_ms=1000,
ttft_ms=200,
completion_tokens=0,
)
assert call.tpt_ms is None
assert call.tokens_per_sec is None
def test_store_evicts_fifo_at_capacity():
store = LLMCallStore(capacity=3)
now = datetime.now(tz=timezone.utc)
for i in range(5):
store.add(
LLMCall(
request_id=f"r{i}",
timestamp=now,
model="m",
)
)
snap = store.snapshot()
assert len(snap) == 3
assert [c.request_id for c in snap] == ["r2", "r3", "r4"]

View file

@ -0,0 +1,146 @@
from datetime import datetime, timezone
from planoai.obs.collector import LLMCall
from planoai.obs.pricing import ModelPrice, PricingCatalog
def _call(model: str, prompt: int, completion: int, cached: int = 0) -> LLMCall:
return LLMCall(
request_id="r",
timestamp=datetime.now(tz=timezone.utc),
model=model,
prompt_tokens=prompt,
completion_tokens=completion,
cached_input_tokens=cached,
)
def test_lookup_matches_bare_and_prefixed():
prices = {
"openai-gpt-5.4": ModelPrice(
input_per_token_usd=0.000001, output_per_token_usd=0.000002
)
}
catalog = PricingCatalog(prices)
assert catalog.price_for("openai-gpt-5.4") is not None
# do/openai-gpt-5.4 should resolve after stripping the provider prefix.
assert catalog.price_for("do/openai-gpt-5.4") is not None
assert catalog.price_for("unknown-model") is None
def test_cost_computation_without_cache():
prices = {
"m": ModelPrice(input_per_token_usd=0.000001, output_per_token_usd=0.000002)
}
cost = PricingCatalog(prices).cost_for_call(_call("m", 1000, 500))
assert cost == 0.002 # 1000 * 1e-6 + 500 * 2e-6
def test_cost_computation_with_cached_discount():
prices = {
"m": ModelPrice(
input_per_token_usd=0.000001,
output_per_token_usd=0.000002,
cached_input_per_token_usd=0.0000001,
)
}
# 800 fresh @ 1e-6 = 8e-4; 200 cached @ 1e-7 = 2e-5; 500 out @ 2e-6 = 1e-3
cost = PricingCatalog(prices).cost_for_call(_call("m", 1000, 500, cached=200))
assert cost == round(0.0008 + 0.00002 + 0.001, 6)
def test_empty_catalog_returns_none():
assert PricingCatalog().cost_for_call(_call("m", 100, 50)) is None
def test_parse_do_catalog_treats_small_values_as_per_token():
"""DO's real catalog uses per-token values under the `_per_million` key
(e.g. 5E-8 for GPT-oss-20b). We treat values < 1 as already per-token."""
from planoai.obs.pricing import _parse_do_pricing
sample = {
"data": [
{
"model_id": "openai-gpt-oss-20b",
"pricing": {
"input_price_per_million": 5e-8,
"output_price_per_million": 4.5e-7,
},
},
{
"model_id": "openai-gpt-oss-120b",
"pricing": {
"input_price_per_million": 1e-7,
"output_price_per_million": 7e-7,
},
},
]
}
prices = _parse_do_pricing(sample)
# Values < 1 are assumed to already be per-token — no extra division.
assert prices["openai-gpt-oss-20b"].input_per_token_usd == 5e-8
assert prices["openai-gpt-oss-20b"].output_per_token_usd == 4.5e-7
assert prices["openai-gpt-oss-120b"].input_per_token_usd == 1e-7
def test_anthropic_aliases_match_plano_emitted_names():
"""DO publishes 'anthropic-claude-opus-4.7' and 'anthropic-claude-haiku-4.5';
Plano emits 'claude-opus-4-7' and 'claude-haiku-4-5-20251001'. Aliases
registered at parse time should bridge the gap."""
from planoai.obs.pricing import _parse_do_pricing
sample = {
"data": [
{
"model_id": "anthropic-claude-opus-4.7",
"pricing": {
"input_price_per_million": 15.0,
"output_price_per_million": 75.0,
},
},
{
"model_id": "anthropic-claude-haiku-4.5",
"pricing": {
"input_price_per_million": 1.0,
"output_price_per_million": 5.0,
},
},
{
"model_id": "anthropic-claude-4.6-sonnet",
"pricing": {
"input_price_per_million": 3.0,
"output_price_per_million": 15.0,
},
},
]
}
catalog = PricingCatalog(_parse_do_pricing(sample))
# Family-last shapes Plano emits.
assert catalog.price_for("claude-opus-4-7") is not None
assert catalog.price_for("claude-haiku-4-5") is not None
# Date-suffixed name (Anthropic API style).
assert catalog.price_for("claude-haiku-4-5-20251001") is not None
# Word-order swap: DO has 'claude-4.6-sonnet', Plano emits 'claude-sonnet-4-6'.
assert catalog.price_for("claude-sonnet-4-6") is not None
# Original DO ids still resolve.
assert catalog.price_for("anthropic-claude-opus-4.7") is not None
def test_parse_do_catalog_divides_large_values_as_per_million():
"""A provider that genuinely reports $5-per-million in that field gets divided."""
from planoai.obs.pricing import _parse_do_pricing
sample = {
"data": [
{
"model_id": "mystery-model",
"pricing": {
"input_price_per_million": 5.0, # > 1 → treated as per-million
"output_price_per_million": 15.0,
},
},
]
}
prices = _parse_do_pricing(sample)
assert prices["mystery-model"].input_per_token_usd == 5.0 / 1_000_000
assert prices["mystery-model"].output_per_token_usd == 15.0 / 1_000_000

106
cli/test/test_obs_render.py Normal file
View file

@ -0,0 +1,106 @@
from datetime import datetime, timedelta, timezone
from planoai.obs.collector import LLMCall
from planoai.obs.render import aggregates, model_rollups, route_hits
def _call(
model: str,
ts: datetime,
prompt=0,
completion=0,
cost=None,
route=None,
session=None,
cache_read=0,
cache_write=0,
):
return LLMCall(
request_id="r",
timestamp=ts,
model=model,
prompt_tokens=prompt,
completion_tokens=completion,
cached_input_tokens=cache_read,
cache_creation_tokens=cache_write,
cost_usd=cost,
route_name=route,
session_id=session,
)
def test_aggregates_sum_and_session_counts():
now = datetime.now(tz=timezone.utc).astimezone()
calls = [
_call(
"m1",
now - timedelta(seconds=50),
prompt=10,
completion=5,
cost=0.001,
session="s1",
),
_call(
"m2",
now - timedelta(seconds=40),
prompt=20,
completion=10,
cost=0.002,
session="s1",
),
_call(
"m1",
now - timedelta(seconds=30),
prompt=30,
completion=15,
cost=0.003,
session="s2",
),
]
stats = aggregates(calls)
assert stats.count == 3
assert stats.total_cost_usd == 0.006
assert stats.total_input_tokens == 60
assert stats.total_output_tokens == 30
assert stats.distinct_sessions == 2
assert stats.current_session == "s2"
def test_rollups_split_by_model_and_cache():
now = datetime.now(tz=timezone.utc).astimezone()
calls = [
_call(
"m1", now, prompt=10, completion=5, cost=0.001, cache_write=3, cache_read=7
),
_call("m1", now, prompt=20, completion=10, cost=0.002, cache_read=1),
_call("m2", now, prompt=30, completion=15, cost=0.004),
]
rollups = model_rollups(calls)
by_model = {r.model: r for r in rollups}
assert by_model["m1"].requests == 2
assert by_model["m1"].input_tokens == 30
assert by_model["m1"].cache_write == 3
assert by_model["m1"].cache_read == 8
assert by_model["m2"].input_tokens == 30
def test_route_hits_only_for_routed_calls():
now = datetime.now(tz=timezone.utc).astimezone()
calls = [
_call("m", now, route="code"),
_call("m", now, route="code"),
_call("m", now, route="summarization"),
_call("m", now), # no route
]
hits = route_hits(calls)
# Only calls with route names are counted.
assert sum(h.hits for h in hits) == 3
hits_by_name = {h.route: h for h in hits}
assert hits_by_name["code"].hits == 2
assert hits_by_name["summarization"].hits == 1
def test_route_hits_empty_when_no_routes():
now = datetime.now(tz=timezone.utc).astimezone()
calls = [_call("m", now), _call("m", now)]
assert route_hits(calls) == []

2
cli/uv.lock generated
View file

@ -337,7 +337,7 @@ wheels = [
[[package]]
name = "planoai"
version = "0.4.18"
version = "0.4.20"
source = { editable = "." }
dependencies = [
{ name = "click" },