plano/cli/planoai/obs_cmd.py

173 lines
5 KiB
Python

"""`planoai obs` — live observability TUI."""
from __future__ import annotations
import logging
import os
import time
import rich_click as click
import yaml
from rich.console import Console
from rich.live import Live
from planoai.consts import PLANO_COLOR
from planoai.obs.collector import (
DEFAULT_CAPACITY,
DEFAULT_GRPC_PORT,
LLMCallStore,
ObsCollector,
)
from planoai.obs.pricing import DEFAULT_PRICING_PROVIDER, PricingCatalog
from planoai.obs.render import render
from planoai.utils import find_config_file
logger = logging.getLogger(__name__)
def _resolve_pricing_source(
config_file: str | None,
provider_override: str | None,
url_override: str | None,
) -> tuple[str, str | None]:
"""Pick the cost pricing source.
Precedence: explicit CLI overrides > the first ``type: cost`` entry in
``model_metrics_sources`` from the Plano config > the DigitalOcean default.
"""
provider = DEFAULT_PRICING_PROVIDER
url: str | None = None
config_path = find_config_file(file=config_file)
if config_path and os.path.exists(config_path):
try:
with open(config_path, "r") as f:
config = yaml.safe_load(f) or {}
sources = config.get("model_metrics_sources") or []
for source in sources:
if isinstance(source, dict) and source.get("type") == "cost":
if source.get("provider"):
provider = str(source["provider"])
if source.get("url"):
url = str(source["url"])
break
except Exception as exc: # noqa: BLE001 — config is optional for obs
logger.warning(
"could not read pricing source from %s: %s", config_path, exc
)
if provider_override:
provider = provider_override
if url_override:
url = url_override
return provider, url
@click.command(name="obs", help="Live observability console for Plano LLM traffic.")
@click.option(
"--port",
type=int,
default=DEFAULT_GRPC_PORT,
show_default=True,
help="OTLP/gRPC port to listen on. Must match the brightstaff tracing endpoint.",
)
@click.option(
"--host",
type=str,
default="0.0.0.0",
show_default=True,
help="Host to bind the OTLP listener.",
)
@click.option(
"--capacity",
type=int,
default=DEFAULT_CAPACITY,
show_default=True,
help="Max LLM calls kept in memory; older calls evicted FIFO.",
)
@click.option(
"--refresh-ms",
type=int,
default=500,
show_default=True,
help="TUI refresh interval.",
)
@click.option(
"--config",
"config_file",
type=str,
default=None,
help="Path to the Plano config to read the pricing source from "
"(defaults to ./config.yaml or ./plano_config.yaml).",
)
@click.option(
"--pricing-provider",
type=click.Choice(["digitalocean", "models.dev"]),
default=None,
help="Override the cost pricing provider (otherwise read from config).",
)
@click.option(
"--pricing-url",
type=str,
default=None,
help="Override the pricing catalog URL (otherwise read from config / provider default).",
)
def obs(
port: int,
host: str,
capacity: int,
refresh_ms: int,
config_file: str | None,
pricing_provider: str | None,
pricing_url: str | None,
) -> None:
console = Console()
provider, url = _resolve_pricing_source(config_file, pricing_provider, pricing_url)
console.print(
f"[bold {PLANO_COLOR}]planoai obs[/] — loading {provider} pricing catalog...",
end="",
)
pricing = PricingCatalog.fetch(provider=provider, url=url)
if len(pricing):
sample = ", ".join(pricing.sample_models(3))
console.print(
f" [green]{len(pricing)} models loaded[/] [dim]({sample}, ...)[/]"
)
else:
console.print(
" [yellow]no pricing loaded[/] — "
f"[dim]cost column will be blank ({provider} catalog unreachable)[/]"
)
store = LLMCallStore(capacity=capacity)
collector = ObsCollector(store=store, pricing=pricing, host=host, port=port)
try:
collector.start()
except OSError as exc:
console.print(f"[red]{exc}[/]")
raise SystemExit(1)
console.print(
f"Listening for OTLP spans on [bold]{host}:{port}[/]. "
"Ensure plano config has [cyan]tracing.opentracing_grpc_endpoint: http://localhost:4317[/] "
"and [cyan]tracing.random_sampling: 100[/] (or run [bold]planoai up[/] "
"with no config — it wires this automatically)."
)
console.print("Press [bold]Ctrl-C[/] to exit.\n")
refresh = max(0.05, refresh_ms / 1000.0)
try:
with Live(
render(store.snapshot()),
console=console,
refresh_per_second=1.0 / refresh,
screen=False,
) as live:
while True:
time.sleep(refresh)
live.update(render(store.snapshot()))
except KeyboardInterrupt:
console.print("\n[dim]obs stopped[/]")
finally:
collector.stop()