Merge remote-tracking branch 'origin/main' into musa/coding-agents-cli

Made-with: Cursor

# Conflicts:
#	crates/brightstaff/src/handlers/llm.rs
This commit is contained in:
Musa 2026-03-02 13:09:36 -08:00
commit 8f3113557a
No known key found for this signature in database
44 changed files with 3686 additions and 392 deletions

View file

@ -20,8 +20,8 @@ jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
- uses: pre-commit/action@v3.0.1
# ──────────────────────────────────────────────
@ -33,10 +33,10 @@ jobs:
run:
working-directory: ./cli
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
@ -60,7 +60,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Free disk space on runner
run: |
@ -79,16 +79,16 @@ jobs:
load: true
tags: |
${{ env.PLANO_DOCKER_IMAGE }}
${{ env.DOCKER_IMAGE }}:0.4.8
${{ env.DOCKER_IMAGE }}:0.4.9
${{ env.DOCKER_IMAGE }}:latest
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Save image as artifact
run: docker save ${{ env.PLANO_DOCKER_IMAGE }} ${{ env.DOCKER_IMAGE }}:0.4.8 ${{ env.DOCKER_IMAGE }}:latest -o /tmp/plano-image.tar
run: docker save ${{ env.PLANO_DOCKER_IMAGE }} ${{ env.DOCKER_IMAGE }}:0.4.9 ${{ env.DOCKER_IMAGE }}:latest -o /tmp/plano-image.tar
- name: Upload image artifact
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v6
with:
name: plano-image
path: /tmp/plano-image.tar
@ -102,15 +102,15 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -129,10 +129,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -173,7 +173,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Free disk space on runner
run: |
@ -182,7 +182,7 @@ jobs:
docker volume prune -f || true
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -191,12 +191,12 @@ jobs:
run: docker load -i /tmp/plano-image.tar
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install uv
uses: astral-sh/setup-uv@v5
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
cache-dependency-glob: |
@ -223,7 +223,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Free disk space on runner
run: |
@ -232,7 +232,7 @@ jobs:
docker volume prune -f || true
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -241,12 +241,12 @@ jobs:
run: docker load -i /tmp/plano-image.tar
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install uv
uses: astral-sh/setup-uv@v5
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
cache-dependency-glob: |
@ -273,7 +273,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Free disk space on runner
run: |
@ -282,7 +282,7 @@ jobs:
docker volume prune -f || true
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -291,12 +291,12 @@ jobs:
run: docker load -i /tmp/plano-image.tar
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install uv
uses: astral-sh/setup-uv@v5
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
cache-dependency-glob: |
@ -330,15 +330,15 @@ jobs:
working-directory: ./tests/archgw
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -388,15 +388,15 @@ jobs:
runs-on: ubuntu-latest-m
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp
@ -440,15 +440,15 @@ jobs:
runs-on: ubuntu-latest-m
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Download plano image
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: plano-image
path: /tmp

View file

@ -19,7 +19,7 @@ jobs:
runs-on: [linux-arm64]
steps:
- name: Checkout Repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Log in to Docker Hub
uses: docker/login-action@v3
@ -35,7 +35,7 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and Push ARM64 Image
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile
@ -50,7 +50,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Log in to Docker Hub
uses: docker/login-action@v3
@ -66,7 +66,7 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and Push AMD64 Image
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile

View file

@ -18,7 +18,7 @@ jobs:
runs-on: [linux-arm64]
steps:
- name: Checkout Repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Log in to Docker Hub
uses: docker/login-action@v3
@ -42,7 +42,7 @@ jobs:
type=raw,value={{tag}}
- name: Build and Push ARM64 Image
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile
@ -57,7 +57,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Log in to Docker Hub
uses: docker/login-action@v3
@ -81,7 +81,7 @@ jobs:
type=raw,value={{tag}}
- name: Build and Push AMD64 Image
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile

View file

@ -17,15 +17,15 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@v7
with:
enable-cache: true

View file

@ -13,11 +13,11 @@ jobs:
steps:
# Check out the code from the repository
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v6
# Set up Docker
- name: Set up Docker
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
# Build and run the Docker container to generate the documentation
- name: Build documentation using Docker
@ -30,7 +30,7 @@ jobs:
# Deploy the docs to GitHub Pages
- name: Deploy to GitHub Pages
uses: peaceiris/actions-gh-pages@v3
uses: peaceiris/actions-gh-pages@v4
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./docs/build/html # Adjust this path based on where the HTML is generated

View file

@ -24,7 +24,7 @@ export function Hero() {
>
<div className="inline-flex flex-wrap items-center gap-1.5 sm:gap-2 px-3 sm:px-4 py-1 rounded-full bg-[rgba(185,191,255,0.4)] border border-[var(--secondary)] shadow backdrop-blur hover:bg-[rgba(185,191,255,0.6)] transition-colors cursor-pointer">
<span className="text-xs sm:text-sm font-medium text-black/65">
v0.4.8
v0.4.9
</span>
<span className="text-xs sm:text-sm font-medium text-black ">

View file

@ -1 +1 @@
docker build -f Dockerfile . -t katanemo/plano -t katanemo/plano:0.4.8
docker build -f Dockerfile . -t katanemo/plano -t katanemo/plano:0.4.9

View file

@ -1,3 +1,3 @@
"""Plano CLI - Intelligent Prompt Gateway."""
__version__ = "0.4.8"
__version__ = "0.4.9"

View file

@ -5,5 +5,5 @@ PLANO_COLOR = "#969FF4"
SERVICE_NAME_ARCHGW = "plano"
PLANO_DOCKER_NAME = "plano"
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.8")
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.9")
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT = "http://host.docker.internal:4317"

View file

@ -2,8 +2,11 @@ import json
import os
import re
import string
import subprocess
import sys
import threading
import time
from http import HTTPStatus
from collections import OrderedDict
from concurrent import futures
from dataclasses import dataclass
@ -22,6 +25,7 @@ from rich.text import Text
from rich.tree import Tree
from planoai.consts import PLANO_COLOR
from planoai import trace_listener_runtime
DEFAULT_GRPC_PORT = 4317
MAX_TRACES = 50
@ -35,7 +39,7 @@ class TraceListenerBindError(RuntimeError):
def _trace_listener_bind_error_message(address: str) -> str:
return (
f"Failed to start OTLP listener on {address}: address is already in use.\n"
"Stop the process using that port or run `planoai trace listen --port <PORT>`."
"Stop the process using that port or run `planoai trace listen`."
)
@ -57,6 +61,25 @@ class TraceSummary:
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
def _is_port_in_use(host: str, port: int) -> bool:
"""Check whether a TCP listener is accepting connections on host:port."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
return s.connect_ex((host, port)) == 0
def _get_listener_pid() -> int | None:
"""Return persisted listener PID if process is alive."""
return trace_listener_runtime.get_listener_pid()
def _stop_background_listener() -> bool:
"""Stop persisted listener process if one is running."""
return trace_listener_runtime.stop_listener_process()
def _parse_filter_patterns(filter_patterns: tuple[str, ...]) -> list[str]:
parts: list[str] = []
for raw in filter_patterns:
@ -437,8 +460,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
"""gRPC servicer that receives OTLP ExportTraceServiceRequest and
merges incoming spans into the global _TRACE_STORE by trace_id."""
_console = Console(stderr=True)
def Export(self, request, context): # noqa: N802
for resource_spans in request.resource_spans:
service_name = "unknown"
@ -456,27 +477,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
continue
span_dict = _proto_span_to_dict(span, service_name)
_TRACE_STORE.merge_spans(trace_id, [span_dict])
short_id = trace_id[:8]
short_span = span.span_id.hex()[:8]
span_start = (
datetime.fromtimestamp(
span.start_time_unix_nano / 1_000_000_000, tz=timezone.utc
)
.astimezone()
.strftime("%H:%M:%S.%f")[:-3]
)
dur_ns = span.end_time_unix_nano - span.start_time_unix_nano
dur_s = dur_ns / 1_000_000_000
dur_str = f"{dur_s:.3f}".rstrip("0").rstrip(".")
dur_str = f"{dur_str}s"
self._console.print(
f"[dim]{span_start}[/dim], "
f"trace=[yellow]{short_id}[/yellow], "
f"span=[yellow]{short_span}[/yellow], "
f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] "
f"[cyan]{span.name}[/cyan] "
f"[dim]({dur_str})[/dim]"
)
return trace_service_pb2.ExportTraceServiceResponse()
@ -499,12 +499,8 @@ class _TraceQueryHandler(grpc.GenericRpcHandler):
return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8")
def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace-collection server.
Returns the running ``grpc.Server``. The caller is responsible
for calling ``server.stop()`` when done.
"""
def _start_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace server."""
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4),
handlers=[_TraceQueryHandler()],
@ -525,38 +521,88 @@ def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
return grpc_server
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener and block until interrupted."""
console = Console()
try:
grpc_server = _create_trace_server(host, grpc_port)
except TraceListenerBindError as exc:
raise click.ClickException(str(exc)) from exc
def _serve_trace_listener(host: str, grpc_port: int) -> None:
"""Run the listener loop until process termination."""
# Persist PID immediately after fork, before server startup.
# This ensures the PID file exists even if server initialization fails.
trace_listener_runtime.write_listener_pid(os.getpid())
console.print()
console.print(f"[bold {PLANO_COLOR}]Listening for traces...[/bold {PLANO_COLOR}]")
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print("[dim]Press Ctrl+C to stop.[/dim]")
console.print()
try:
grpc_server = _start_trace_server(host, grpc_port)
grpc_server.wait_for_termination()
except KeyboardInterrupt:
pass
finally:
grpc_server.stop(grace=2)
# Best-effort cleanup; server may not exist if startup failed.
try:
grpc_server.stop(grace=2)
except NameError:
pass
trace_listener_runtime.remove_listener_pid()
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener as a daemon process."""
console = Console()
# Check if the requested port is already in use.
if _is_port_in_use(host, grpc_port):
existing_pid = _get_listener_pid()
if existing_pid:
# If the process PID is known, inform user that our listener is already running.
console.print(
f"[yellow]⚠[/yellow] Trace listener already running on port [cyan]{grpc_port}[/cyan] (PID: {existing_pid})"
)
else:
# If port is taken but no tracked listener PID exists, warn user of unknown conflict.
console.print(
f"[red]✗[/red] Port [cyan]{grpc_port}[/cyan] is already in use by another process"
)
console.print(f"\n[dim]Check what's using the port:[/dim]")
console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]")
return
# Fork/daemonize and run the trace server in the background.
try:
pid = trace_listener_runtime.daemonize_and_run(
lambda: _serve_trace_listener(host, grpc_port)
)
except OSError as e:
console.print(f"[red]✗[/red] Failed to start trace listener: {e}")
return
if pid is None:
# We're in the child process; daemonize_and_run never returns here.
return
# In the parent process: wait briefly for the background process to bind the port.
time.sleep(0.5) # Give child process time to start and bind to the port.
if _is_port_in_use(host, grpc_port):
# Success: the trace listener started and bound the port.
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print()
else:
# Failure: trace listener child process did not successfully start.
console.print(f"[red]✗[/red] Failed to start trace listener")
def start_trace_listener_background(
host: str = "0.0.0.0", grpc_port: int = DEFAULT_GRPC_PORT
) -> grpc.Server:
"""Start the trace listener in the background (non-blocking).
Returns the running ``grpc.Server`` so the caller can call
``server.stop()`` later.
"""
return _create_trace_server(host, grpc_port)
"""Start the trace server in-process and return ``grpc.Server`` handle."""
return _start_trace_server(host, grpc_port)
def _span_time_ns(span: dict[str, Any], key: str) -> int:
@ -584,13 +630,13 @@ def _trace_summary(trace: dict[str, Any]) -> TraceSummary:
def _service_color(service: str) -> str:
service = service.lower()
if "inbound" in service:
return "white"
return "#4860fa"
if "outbound" in service:
return "white"
return "#57d9a9"
if "orchestrator" in service:
return PLANO_COLOR
if "routing" in service:
return "magenta"
return "#e3a2fa"
if "agent" in service:
return "cyan"
if "llm" in service:
@ -598,6 +644,63 @@ def _service_color(service: str) -> str:
return "white"
def _error_symbol(status_code: str) -> str:
code = int(status_code) if status_code.isdigit() else 0
if code >= 500:
return "💥" # Server error - something broke
elif code == 429:
return "🚦" # Rate limited
elif code == 404:
return "🔍" # Not found
elif code == 403:
return "🚫" # Forbidden
elif code == 401:
return "🔐" # Unauthorized
elif code >= 400:
return "⚠️" # Client error
else:
return "" # Generic error
def _error_description(status_code: str) -> str:
"""Return a developer-friendly description of the error."""
code = int(status_code) if status_code.isdigit() else 0
if code < 400:
return "Error"
try:
return HTTPStatus(code).phrase
except ValueError:
if code >= 500:
return "Server Error"
return "Client Error"
def _detect_error(span: dict[str, Any]) -> tuple[bool, str, str]:
"""Detect if span has an error and return (has_error, status_code, error_msg).
Returns:
tuple: (has_error, status_code, error_description)
"""
attrs = _attrs(span)
status_code = attrs.get("http.status_code", "")
# Check for non-2xx status codes
if status_code and status_code.isdigit():
code = int(status_code)
if code >= 400:
return True, status_code, _error_description(status_code)
# Check for explicit error attributes
if "error.message" in attrs:
return True, status_code or "unknown", attrs["error.message"]
if "exception.message" in attrs:
return True, status_code or "unknown", attrs["exception.message"]
return False, "", ""
# Attributes to show for inbound/outbound spans when not verbose (trimmed view).
_INBOUND_OUTBOUND_ATTR_KEYS = (
"http.method",
@ -621,10 +724,20 @@ def _trim_attrs_for_display(
def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
# Error attributes always come first
error_priority = [
"http.status_code",
"error.type",
"error.message",
"error.stack",
"exception.type",
"exception.message",
]
# Then regular priority attributes
priority = [
"http.method",
"http.target",
"http.status_code",
"guid:x-request-id",
"request_size",
"response_size",
@ -641,7 +754,10 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
"llm.duration_ms",
"llm.response_bytes",
]
prioritized = [(k, attrs[k]) for k in priority if k in attrs]
# Combine error priority with regular priority
full_priority = error_priority + priority
prioritized = [(k, attrs[k]) for k in full_priority if k in attrs]
prioritized_keys = {k for k, _ in prioritized}
remaining = [(k, v) for k, v in attrs.items() if k not in prioritized_keys]
remaining.sort(key=lambda item: item[0])
@ -649,8 +765,14 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
def _display_attr_value(key: str, value: str) -> str:
if key == "http.status_code" and value != "200":
return f"{value} ⚠️"
if key == "http.status_code":
if value.isdigit():
code = int(value)
if code >= 400:
return f"{value} {_error_symbol(value)}"
elif code >= 200 and code < 300:
return f"{value}"
return value
return value
@ -670,7 +792,7 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
)
spans.sort(key=lambda s: _span_time_ns(s, "startTimeUnixNano"))
tree = Tree("", guide_style="dim")
tree = Tree("", guide_style="dim #5b5a5c bold")
for span in spans:
service = span.get("service", "plano(unknown)")
@ -678,22 +800,52 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
offset_ms = max(
0, (_span_time_ns(span, "startTimeUnixNano") - start_ns) / 1_000_000
)
color = _service_color(service)
label = Text(f"{offset_ms:.0f}ms ", style="yellow")
label.append(service, style=f"bold {color}")
if name:
label.append(f" {name}", style="dim white")
# Check for errors in this span
has_error, error_code, error_desc = _detect_error(span)
if has_error:
# Create error banner above the span
error_banner = Text()
error_banner.append(error_desc, style="bright_red")
tree.add(error_banner)
# Style the span label in light red
label = Text(f"{offset_ms:.0f}ms ", style="#ff6b6b")
label.append(service, style="bold #ff6b6b")
if name:
label.append(f" {name}", style="#ff6b6b italic")
else:
# Normal styling
color = _service_color(service)
label = Text(f"{offset_ms:.0f}ms ", style="#949c99")
label.append(service, style=f"bold {color}")
if name:
label.append(f" {name}", style="dim white bold italic")
node = tree.add(label)
attrs = _trim_attrs_for_display(_attrs(span), service, verbose)
sorted_items = list(_sorted_attr_items(attrs))
for idx, (key, value) in enumerate(sorted_items):
attr_line = Text()
attr_line.append(f"{key}: ", style="white")
attr_line.append(
_display_attr_value(key, str(value)),
style=f"{PLANO_COLOR}",
)
# attribute key
attr_line.append(f"{key}: ", style="#a4a9aa")
# attribute value
if key == "http.status_code" and value.isdigit():
val_int = int(value)
val_style = "bold red" if val_int >= 400 else "green"
attr_line.append(_display_attr_value(key, str(value)), style=val_style)
elif key in [
"error.message",
"exception.message",
"error.type",
"exception.type",
]:
attr_line.append(_display_attr_value(key, str(value)), style="red")
else:
attr_line.append(
_display_attr_value(key, str(value)), style=f"{PLANO_COLOR} bold"
)
if idx == len(sorted_items) - 1:
attr_line.append("\n")
node.add(attr_line)
@ -904,7 +1056,7 @@ def _run_trace_show(
_build_tree(trace_obj, console, verbose=verbose)
@click.group(invoke_without_command=True)
@click.command()
@click.argument("target", required=False)
@click.option(
"--filter",
@ -950,9 +1102,8 @@ def trace(
verbose,
):
"""Trace requests from the local OTLP listener."""
if ctx.invoked_subcommand:
return
if target == "listen" and not any(
# Handle operational shortcuts when invoked as target values.
has_show_options = any(
[
filter_patterns,
where_filters,
@ -963,9 +1114,20 @@ def trace(
json_out,
verbose,
]
):
)
if target == "listen" and not has_show_options:
_start_trace_listener("0.0.0.0", DEFAULT_GRPC_PORT)
return
if target in ("stop", "down") and not has_show_options:
console = Console()
if _stop_background_listener():
console.print(f"[green]✓[/green] Trace listener stopped")
else:
console.print(f"[dim]No background trace listener running[/dim]")
return
_run_trace_show(
target,
filter_patterns,
@ -977,17 +1139,3 @@ def trace(
json_out,
verbose,
)
@trace.command("listen")
@click.option("--host", default="0.0.0.0", show_default=True)
@click.option(
"--port",
type=int,
default=DEFAULT_GRPC_PORT,
show_default=True,
help="gRPC port for receiving OTLP traces.",
)
def trace_listen(host: str, port: int) -> None:
"""Listen for OTLP/gRPC traces."""
_start_trace_listener(host, port)

View file

@ -0,0 +1,127 @@
"""
Trace listener process runtime utilities.
"""
import os
import signal
import time
import logging
from collections.abc import Callable
# Canonical PID file used by `planoai trace listen/down`.
TRACE_LISTENER_PID_PATH = os.path.expanduser("~/.plano/run/trace_listener.pid")
TRACE_LISTENER_LOG_PATH = os.path.expanduser("~/.plano/run/trace_listener.log")
LOGGER = logging.getLogger(__name__)
def write_listener_pid(pid: int) -> None:
"""Persist listener PID for later management commands."""
# Ensure parent directory exists for first-time installs.
os.makedirs(os.path.dirname(TRACE_LISTENER_PID_PATH), exist_ok=True)
with open(TRACE_LISTENER_PID_PATH, "w") as f:
f.write(str(pid))
def remove_listener_pid() -> None:
"""Remove persisted listener PID file if present."""
# Best-effort cleanup; missing file is not an error.
if os.path.exists(TRACE_LISTENER_PID_PATH):
os.remove(TRACE_LISTENER_PID_PATH)
def get_listener_pid() -> int | None:
"""Return listener PID if present and process is alive."""
if not os.path.exists(TRACE_LISTENER_PID_PATH):
return None
try:
# Parse persisted PID.
with open(TRACE_LISTENER_PID_PATH, "r") as f:
pid = int(f.read().strip())
# Signal 0 performs liveness check without sending a real signal.
os.kill(pid, 0)
return pid
except (ValueError, ProcessLookupError, OSError):
# Stale or malformed PID file: clean it up to prevent repeated confusion.
LOGGER.warning(
"Removing stale or malformed trace listener PID file at %s",
TRACE_LISTENER_PID_PATH,
)
remove_listener_pid()
return None
def stop_listener_process(grace_seconds: float = 0.5) -> bool:
"""Stop persisted listener process, returning True if one was stopped."""
pid = get_listener_pid()
if pid is None:
return False
try:
# Try graceful shutdown first.
os.kill(pid, signal.SIGTERM)
# Allow the process a short window to exit cleanly.
time.sleep(grace_seconds)
try:
# If still alive, force terminate.
os.kill(pid, 0)
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
# Already exited after SIGTERM.
pass
remove_listener_pid()
return True
except ProcessLookupError:
# Process disappeared between checks; treat as already stopped.
remove_listener_pid()
return False
def daemonize_and_run(run_forever: Callable[[], None]) -> int | None:
"""
Fork and detach process to create a Unix daemon.
Returns:
- Parent process: child PID (> 0), allowing caller to report startup.
- Child process: never returns; runs callback in daemon context until termination.
Raises:
- OSError: if fork fails (e.g., resource limits exceeded).
"""
# Duplicate current process. Raises OSError if fork fails.
pid = os.fork()
if pid > 0:
# Parent returns child PID to caller.
return pid
# Child: detach from controlling terminal/session.
# This prevents SIGHUP when parent terminal closes and ensures
# the daemon cannot reacquire a controlling terminal.
os.setsid()
# Redirect stdin to /dev/null and stdout/stderr to a persistent log file.
# This keeps the daemon terminal-independent while preserving diagnostics.
os.makedirs(os.path.dirname(TRACE_LISTENER_LOG_PATH), exist_ok=True)
devnull_in = os.open(os.devnull, os.O_RDONLY)
try:
log_fd = os.open(
TRACE_LISTENER_LOG_PATH,
os.O_WRONLY | os.O_CREAT | os.O_APPEND,
0o644,
)
except OSError:
# If logging cannot be initialized, keep running with output discarded.
log_fd = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull_in, 0) # stdin
os.dup2(log_fd, 1) # stdout
os.dup2(log_fd, 2) # stderr
if devnull_in > 2:
os.close(devnull_in)
if log_fd > 2:
os.close(log_fd)
# Run the daemon main loop (expected to block until process termination).
run_forever()
# If callback unexpectedly returns, exit cleanly to avoid returning to parent context.
os._exit(0)

View file

@ -1,6 +1,6 @@
[project]
name = "planoai"
version = "0.4.8"
version = "0.4.9"
description = "Python-based CLI tool to manage Plano."
authors = [{name = "Katanemo Labs, Inc."}]
readme = "README.md"

1133
cli/test/source/failure.json Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,803 @@
{
"traces": [
{
"trace_id": "86f21585168a31a23578d77096cc143b",
"spans": [
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "1d6159b920daf4e9",
"parentSpanId": "c5d6cd3cfb32b551",
"name": "POST archfc.katanemo.dev/v1/chat/completions",
"startTimeUnixNano": "1770937700292451000",
"endTimeUnixNano": "1770937700552403000",
"service": "plano(outbound)",
"attributes": [
{
"key": "node_id",
"value": {
"stringValue": ""
}
},
{
"key": "zone",
"value": {
"stringValue": ""
}
},
{
"key": "guid:x-request-id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.url",
"value": {
"stringValue": "https://archfc.katanemo.dev/v1/chat/completions"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "downstream_cluster",
"value": {
"stringValue": "-"
}
},
{
"key": "user_agent",
"value": {
"stringValue": "-"
}
},
{
"key": "http.protocol",
"value": {
"stringValue": "HTTP/1.1"
}
},
{
"key": "peer.address",
"value": {
"stringValue": "127.0.0.1"
}
},
{
"key": "request_size",
"value": {
"stringValue": "3293"
}
},
{
"key": "response_size",
"value": {
"stringValue": "341"
}
},
{
"key": "component",
"value": {
"stringValue": "proxy"
}
},
{
"key": "upstream_cluster",
"value": {
"stringValue": "arch"
}
},
{
"key": "upstream_cluster.name",
"value": {
"stringValue": "arch"
}
},
{
"key": "http.status_code",
"value": {
"stringValue": "200"
}
},
{
"key": "response_flags",
"value": {
"stringValue": "-"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "4234f793a77a40c8",
"parentSpanId": "445f868c5c36294e",
"name": "routing",
"startTimeUnixNano": "1770937700576995630",
"endTimeUnixNano": "1770937700577104880",
"service": "plano(routing)",
"attributes": [
{
"key": "component",
"value": {
"stringValue": "routing"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "http.target",
"value": {
"stringValue": "/v1/chat/completions"
}
},
{
"key": "model.requested",
"value": {
"stringValue": "openai/gpt-4o-mini"
}
},
{
"key": "model.alias_resolved",
"value": {
"stringValue": "openai/gpt-4o-mini"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(routing)"
}
},
{
"key": "routing.determination_ms",
"value": {
"intValue": "0"
}
},
{
"key": "route.selected_model",
"value": {
"stringValue": "none"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "445f868c5c36294e",
"parentSpanId": "8311d2245d859e71",
"name": "POST /v1/chat/completions openai/gpt-4o-mini",
"startTimeUnixNano": "1770937700576869630",
"endTimeUnixNano": "1770937701151370214",
"service": "plano(llm)",
"attributes": [
{
"key": "component",
"value": {
"stringValue": "llm"
}
},
{
"key": "request_id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "http.path",
"value": {
"stringValue": "/v1/chat/completions"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(llm)"
}
},
{
"key": "llm.temperature",
"value": {
"stringValue": "0.1"
}
},
{
"key": "llm.user_message_preview",
"value": {
"stringValue": "Whats the weather in Seattle?"
}
},
{
"key": "llm.model",
"value": {
"stringValue": "openai/gpt-4o-mini"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(llm)"
}
},
{
"key": "llm.time_to_first_token",
"value": {
"intValue": "572"
}
},
{
"key": "signals.quality",
"value": {
"stringValue": "Good"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "da348b97890a6c9b",
"parentSpanId": "",
"name": "POST /v1/chat/completions",
"startTimeUnixNano": "1770937700183402000",
"endTimeUnixNano": "1770937704394122000",
"service": "plano(inbound)",
"attributes": [
{
"key": "node_id",
"value": {
"stringValue": ""
}
},
{
"key": "zone",
"value": {
"stringValue": ""
}
},
{
"key": "guid:x-request-id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.url",
"value": {
"stringValue": "https://localhost/v1/chat/completions"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "downstream_cluster",
"value": {
"stringValue": "-"
}
},
{
"key": "user_agent",
"value": {
"stringValue": "Python/3.11 aiohttp/3.13.2"
}
},
{
"key": "http.protocol",
"value": {
"stringValue": "HTTP/1.1"
}
},
{
"key": "peer.address",
"value": {
"stringValue": "172.18.0.1"
}
},
{
"key": "request_size",
"value": {
"stringValue": "125"
}
},
{
"key": "response_size",
"value": {
"stringValue": "34401"
}
},
{
"key": "component",
"value": {
"stringValue": "proxy"
}
},
{
"key": "upstream_cluster",
"value": {
"stringValue": "bright_staff"
}
},
{
"key": "upstream_cluster.name",
"value": {
"stringValue": "bright_staff"
}
},
{
"key": "http.status_code",
"value": {
"stringValue": "200"
}
},
{
"key": "response_flags",
"value": {
"stringValue": "-"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "79a116cf7d63602a",
"parentSpanId": "8b6345129425cf4a",
"name": "POST api.openai.com/v1/chat/completions",
"startTimeUnixNano": "1770937702607128000",
"endTimeUnixNano": "1770937704391625000",
"service": "plano(outbound)",
"attributes": [
{
"key": "node_id",
"value": {
"stringValue": ""
}
},
{
"key": "zone",
"value": {
"stringValue": ""
}
},
{
"key": "guid:x-request-id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.url",
"value": {
"stringValue": "https://api.openai.com/v1/chat/completions"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "downstream_cluster",
"value": {
"stringValue": "-"
}
},
{
"key": "user_agent",
"value": {
"stringValue": "AsyncOpenAI/Python 2.17.0"
}
},
{
"key": "http.protocol",
"value": {
"stringValue": "HTTP/1.1"
}
},
{
"key": "peer.address",
"value": {
"stringValue": "127.0.0.1"
}
},
{
"key": "request_size",
"value": {
"stringValue": "1927"
}
},
{
"key": "response_size",
"value": {
"stringValue": "20646"
}
},
{
"key": "component",
"value": {
"stringValue": "proxy"
}
},
{
"key": "upstream_cluster",
"value": {
"stringValue": "openai"
}
},
{
"key": "upstream_cluster.name",
"value": {
"stringValue": "openai"
}
},
{
"key": "http.status_code",
"value": {
"stringValue": "200"
}
},
{
"key": "response_flags",
"value": {
"stringValue": "-"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "60508ba7960d51bc",
"parentSpanId": "445f868c5c36294e",
"name": "POST api.openai.com/v1/chat/completions",
"startTimeUnixNano": "1770937700589205000",
"endTimeUnixNano": "1770937701149191000",
"service": "plano(outbound)",
"attributes": [
{
"key": "node_id",
"value": {
"stringValue": ""
}
},
{
"key": "zone",
"value": {
"stringValue": ""
}
},
{
"key": "guid:x-request-id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.url",
"value": {
"stringValue": "https://api.openai.com/v1/chat/completions"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "downstream_cluster",
"value": {
"stringValue": "-"
}
},
{
"key": "user_agent",
"value": {
"stringValue": "AsyncOpenAI/Python 2.17.0"
}
},
{
"key": "http.protocol",
"value": {
"stringValue": "HTTP/1.1"
}
},
{
"key": "peer.address",
"value": {
"stringValue": "127.0.0.1"
}
},
{
"key": "request_size",
"value": {
"stringValue": "930"
}
},
{
"key": "response_size",
"value": {
"stringValue": "346"
}
},
{
"key": "component",
"value": {
"stringValue": "proxy"
}
},
{
"key": "upstream_cluster",
"value": {
"stringValue": "openai"
}
},
{
"key": "upstream_cluster.name",
"value": {
"stringValue": "openai"
}
},
{
"key": "http.status_code",
"value": {
"stringValue": "200"
}
},
{
"key": "response_flags",
"value": {
"stringValue": "-"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "8311d2245d859e71",
"parentSpanId": "c5d6cd3cfb32b551",
"name": "weather_agent /v1/chat/completions",
"startTimeUnixNano": "1770937700553490130",
"endTimeUnixNano": "1770937704393946299",
"service": "plano(agent)",
"attributes": [
{
"key": "agent_id",
"value": {
"stringValue": "weather_agent"
}
},
{
"key": "message_count",
"value": {
"stringValue": "1"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(agent)"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "9eb8a70a8c167f85",
"parentSpanId": "8b6345129425cf4a",
"name": "routing",
"startTimeUnixNano": "1770937702591610381",
"endTimeUnixNano": "1770937702592150423",
"service": "plano(routing)",
"attributes": [
{
"key": "component",
"value": {
"stringValue": "routing"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "http.target",
"value": {
"stringValue": "/v1/chat/completions"
}
},
{
"key": "model.requested",
"value": {
"stringValue": "openai/gpt-5.2"
}
},
{
"key": "model.alias_resolved",
"value": {
"stringValue": "openai/gpt-5.2"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(routing)"
}
},
{
"key": "routing.determination_ms",
"value": {
"intValue": "0"
}
},
{
"key": "route.selected_model",
"value": {
"stringValue": "none"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "c5d6cd3cfb32b551",
"parentSpanId": "da348b97890a6c9b",
"name": "travel_booking_service",
"startTimeUnixNano": "1770937700188669630",
"endTimeUnixNano": "1770937704393949091",
"service": "plano(orchestrator)",
"attributes": [
{
"key": "component",
"value": {
"stringValue": "orchestrator"
}
},
{
"key": "request_id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "http.path",
"value": {
"stringValue": "/agents/v1/chat/completions"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(orchestrator)"
}
},
{
"key": "selection.listener",
"value": {
"stringValue": "travel_booking_service"
}
},
{
"key": "selection.agent_count",
"value": {
"intValue": "1"
}
},
{
"key": "selection.agents",
"value": {
"stringValue": "weather_agent"
}
},
{
"key": "selection.determination_ms",
"value": {
"stringValue": "264.48"
}
}
]
},
{
"traceId": "86f21585168a31a23578d77096cc143b",
"spanId": "8b6345129425cf4a",
"parentSpanId": "8311d2245d859e71",
"name": "POST /v1/chat/completions openai/gpt-5.2",
"startTimeUnixNano": "1770937702591499256",
"endTimeUnixNano": "1770937704393043174",
"service": "plano(llm)",
"attributes": [
{
"key": "component",
"value": {
"stringValue": "llm"
}
},
{
"key": "request_id",
"value": {
"stringValue": "0e1acd44-41ea-9681-9944-f2f1bec65faf"
}
},
{
"key": "http.method",
"value": {
"stringValue": "POST"
}
},
{
"key": "http.path",
"value": {
"stringValue": "/v1/chat/completions"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(llm)"
}
},
{
"key": "llm.temperature",
"value": {
"stringValue": "0.7"
}
},
{
"key": "llm.user_message_preview",
"value": {
"stringValue": "Whats the weather in Seattle?\n\nWeather data for S..."
}
},
{
"key": "llm.model",
"value": {
"stringValue": "openai/gpt-5.2"
}
},
{
"key": "service.name.override",
"value": {
"stringValue": "plano(llm)"
}
},
{
"key": "llm.time_to_first_token",
"value": {
"intValue": "506"
}
},
{
"key": "signals.quality",
"value": {
"stringValue": "Good"
}
}
]
}
]
}
]
}

View file

@ -1,7 +1,70 @@
import pytest
import rich_click as click
import copy
import json
import re
from pathlib import Path
from planoai import trace_cmd
import pytest
from click.testing import CliRunner
from planoai.trace_cmd import trace
import planoai.trace_cmd as trace_cmd
def _load_success_traces() -> list[dict]:
source_path = Path(__file__).parent / "source" / "success.json"
payload = json.loads(source_path.read_text(encoding="utf-8"))
return payload["traces"]
def _load_failure_traces() -> list[dict]:
source_path = Path(__file__).parent / "source" / "failure.json"
payload = json.loads(source_path.read_text(encoding="utf-8"))
return payload["traces"]
def _build_trace_set() -> list[dict]:
traces = copy.deepcopy(_load_success_traces())
primary = traces[0]
secondary = copy.deepcopy(primary)
secondary["trace_id"] = "1234567890abcdef1234567890abcdef"
for span in secondary.get("spans", []):
span["traceId"] = secondary["trace_id"]
if span.get("startTimeUnixNano", "").isdigit():
span["startTimeUnixNano"] = str(
int(span["startTimeUnixNano"]) - 1_000_000_000
)
if span.get("endTimeUnixNano", "").isdigit():
span["endTimeUnixNano"] = str(int(span["endTimeUnixNano"]) - 1_000_000_000)
return [primary, secondary]
def _json_from_output(output: str) -> dict:
start = output.find("{")
if start == -1:
raise AssertionError(f"No JSON object found in output:\n{output}")
return json.loads(output[start:])
def _plain_output(output: str) -> str:
# Strip ANSI color/style sequences emitted by rich-click in CI terminals.
return re.sub(r"\x1b\[[0-9;]*m", "", output)
@pytest.fixture
def runner() -> CliRunner:
return CliRunner()
@pytest.fixture
def traces() -> list[dict]:
return _build_trace_set()
@pytest.fixture
def failure_traces() -> list[dict]:
return copy.deepcopy(_load_failure_traces())
class _FakeGrpcServer:
@ -12,7 +75,7 @@ class _FakeGrpcServer:
return None
def test_create_trace_server_raises_bind_error(monkeypatch):
def test_start_trace_server_raises_bind_error(monkeypatch):
monkeypatch.setattr(
trace_cmd.grpc, "server", lambda *_args, **_kwargs: _FakeGrpcServer()
)
@ -23,22 +86,305 @@ def test_create_trace_server_raises_bind_error(monkeypatch):
)
with pytest.raises(trace_cmd.TraceListenerBindError) as excinfo:
trace_cmd._create_trace_server("0.0.0.0", 4317)
trace_cmd._start_trace_server("0.0.0.0", 4317)
assert "already in use" in str(excinfo.value)
assert "planoai trace listen --port" in str(excinfo.value)
assert "planoai trace listen" in str(excinfo.value)
def test_start_trace_listener_converts_bind_error_to_click_exception(monkeypatch):
monkeypatch.setattr(
trace_cmd,
"_create_trace_server",
lambda *_args, **_kwargs: (_ for _ in ()).throw(
trace_cmd.TraceListenerBindError("port in use")
),
def test_trace_listen_starts_listener_with_defaults(runner, monkeypatch):
seen = {}
def fake_start(host: str, port: int) -> None:
seen["host"] = host
seen["port"] = port
monkeypatch.setattr(trace_cmd, "_start_trace_listener", fake_start)
result = runner.invoke(trace, ["listen"])
assert result.exit_code == 0, result.output
assert seen == {"host": "0.0.0.0", "port": trace_cmd.DEFAULT_GRPC_PORT}
def test_trace_down_prints_success_when_listener_stopped(runner, monkeypatch):
monkeypatch.setattr(trace_cmd, "_stop_background_listener", lambda: True)
result = runner.invoke(trace, ["down"])
assert result.exit_code == 0, result.output
assert "Trace listener stopped" in result.output
def test_trace_down_prints_no_listener_when_not_running(runner, monkeypatch):
monkeypatch.setattr(trace_cmd, "_stop_background_listener", lambda: False)
result = runner.invoke(trace, ["down"])
assert result.exit_code == 0, result.output
assert "No background trace listener running" in result.output
def test_trace_default_target_uses_last_and_builds_first_trace(
runner, monkeypatch, traces
):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
seen = {}
def fake_build_tree(trace_obj, _console, verbose=False):
seen["trace_id"] = trace_obj["trace_id"]
seen["verbose"] = verbose
monkeypatch.setattr(trace_cmd, "_build_tree", fake_build_tree)
result = runner.invoke(trace, [])
assert result.exit_code == 0, result.output
assert seen["trace_id"] == traces[0]["trace_id"]
assert seen["verbose"] is False
def test_trace_list_any_prints_short_trace_ids(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["--list", "--no-interactive", "any"])
assert result.exit_code == 0, result.output
assert "Trace IDs:" in result.output
assert traces[0]["trace_id"][:8] in result.output
assert traces[1]["trace_id"][:8] in result.output
def test_trace_list_target_conflict_errors(runner, traces, monkeypatch):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["--list", traces[0]["trace_id"]])
assert result.exit_code != 0
assert "Target and --list cannot be used together." in _plain_output(result.output)
def test_trace_json_list_with_limit_outputs_trace_ids(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["--list", "any", "--json", "--limit", "1"])
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert payload == {"trace_ids": [traces[0]["trace_id"]]}
def test_trace_json_for_short_target_returns_one_trace(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
short_target = traces[0]["trace_id"][:8]
result = runner.invoke(trace, [short_target, "--json"])
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert len(payload["traces"]) == 1
assert payload["traces"][0]["trace_id"] == traces[0]["trace_id"]
@pytest.mark.parametrize(
("target", "message"),
[
("abc", "Trace ID must be 8 or 32 hex characters."),
("00000000", "Short trace ID must be 8 hex characters."),
("0" * 32, "Trace ID must be 32 hex characters."),
],
)
def test_trace_target_validation_errors(runner, target, message):
result = runner.invoke(trace, [target])
assert result.exit_code != 0
assert message in _plain_output(result.output)
def test_trace_where_invalid_format_errors(runner):
result = runner.invoke(trace, ["any", "--where", "bad-format"])
assert result.exit_code != 0
assert "Invalid --where filter(s): bad-format. Use key=value." in _plain_output(
result.output
)
with pytest.raises(click.ClickException) as excinfo:
trace_cmd._start_trace_listener("0.0.0.0", 4317)
assert "port in use" in str(excinfo.value)
def test_trace_where_unknown_key_errors(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["any", "--where", "not.a.real.key=value"])
assert result.exit_code != 0
assert "Unknown --where key(s): not.a.real.key" in _plain_output(result.output)
def test_trace_where_filters_to_matching_trace(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(
trace, ["any", "--where", "agent_id=weather_agent", "--json"]
)
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert [trace_item["trace_id"] for trace_item in payload["traces"]] == [
traces[0]["trace_id"],
traces[1]["trace_id"],
]
def test_trace_where_and_filters_can_exclude_all(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(
trace,
[
"any",
"--where",
"agent_id=weather_agent",
"--where",
"http.status_code=500",
"--json",
],
)
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert payload == {"traces": []}
def test_trace_filter_restricts_attributes_by_pattern(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["any", "--filter", "http.*", "--json"])
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
for trace_item in payload["traces"]:
for span in trace_item["spans"]:
for attr in span.get("attributes", []):
assert attr["key"].startswith("http.")
def test_trace_filter_unmatched_warns_and_returns_unfiltered(
runner, monkeypatch, traces
):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["any", "--filter", "not-found-*", "--json"])
assert result.exit_code == 0, result.output
assert (
"Filter key(s) not found: not-found-*. Returning unfiltered traces."
in result.output
)
payload = _json_from_output(result.output)
assert len(payload["traces"]) == len(traces)
def test_trace_since_can_filter_out_old_traces(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
monkeypatch.setattr(trace_cmd.time, "time", lambda: 1_999_999_999.0)
result = runner.invoke(trace, ["any", "--since", "1m", "--json"])
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert payload == {"traces": []}
def test_trace_negative_limit_errors(runner):
result = runner.invoke(trace, ["any", "--limit", "-1"])
assert result.exit_code != 0
assert "Limit must be greater than or equal to 0." in _plain_output(result.output)
def test_trace_empty_data_prints_no_traces_found(runner, monkeypatch):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: [])
result = runner.invoke(trace, [])
assert result.exit_code == 0, result.output
assert "No traces found." in result.output
def test_trace_invalid_filter_token_errors(runner):
result = runner.invoke(trace, ["any", "--filter", "http.method,"])
assert result.exit_code != 0
assert "Filter contains empty tokens." in _plain_output(result.output)
def test_trace_failure_json_any_contains_all_fixture_trace_ids(
runner, monkeypatch, failure_traces
):
monkeypatch.setattr(
trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(failure_traces)
)
result = runner.invoke(trace, ["any", "--json"])
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert [item["trace_id"] for item in payload["traces"]] == [
"f7a31829c4b5d6e8a9f0b1c2d3e4f5a6",
"a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6",
"b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7",
]
@pytest.mark.parametrize(
("status_code", "expected_trace_ids"),
[
("503", ["f7a31829c4b5d6e8a9f0b1c2d3e4f5a6"]),
("429", ["a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6"]),
("500", ["b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7"]),
],
)
def test_trace_failure_where_status_filters_expected_traces(
runner, monkeypatch, failure_traces, status_code, expected_trace_ids
):
monkeypatch.setattr(
trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(failure_traces)
)
result = runner.invoke(
trace, ["any", "--where", f"http.status_code={status_code}", "--json"]
)
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert [item["trace_id"] for item in payload["traces"]] == expected_trace_ids
def test_trace_failure_default_render_shows_service_unavailable_banner(
runner, monkeypatch, failure_traces
):
monkeypatch.setattr(
trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(failure_traces)
)
result = runner.invoke(trace, [])
assert result.exit_code == 0, result.output
assert "Service Unavailable" in result.output
assert "503" in result.output
def test_trace_failure_filter_keeps_http_status_code_attributes(
runner, monkeypatch, failure_traces
):
monkeypatch.setattr(
trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(failure_traces)
)
result = runner.invoke(trace, ["any", "--filter", "http.status_code", "--json"])
assert result.exit_code == 0, result.output
payload = _json_from_output(result.output)
assert payload["traces"], "Expected traces in failure fixture"
for trace_item in payload["traces"]:
for span in trace_item["spans"]:
keys = [attr["key"] for attr in span.get("attributes", [])]
assert set(keys).issubset({"http.status_code"})

View file

@ -390,7 +390,19 @@ properties:
type: boolean
opentracing_grpc_endpoint:
type: string
additionalProperties: false
span_attributes:
type: object
properties:
header_prefixes:
type: array
items:
type: string
static:
type: object
additionalProperties:
type: string
additionalProperties: false
additionalProperties: false
mode:
type: string
enum:
@ -403,7 +415,7 @@ properties:
type: string
model:
type: string
additionalProperties: false
additionalProperties: false
state_storage:
type: object
properties:

View file

@ -5,7 +5,7 @@ failed_files=()
for file in $(find . -name config.yaml -o -name plano_config_full_reference.yaml); do
echo "Validating ${file}..."
touch $(pwd)/${file}_rendered
if ! docker run --rm -v "$(pwd)/${file}:/app/plano_config.yaml:ro" -v "$(pwd)/${file}_rendered:/app/plano_config_rendered.yaml:rw" --entrypoint /bin/sh ${PLANO_DOCKER_IMAGE:-katanemo/plano:0.4.8} -c "python -m planoai.config_generator" 2>&1 > /dev/null ; then
if ! docker run --rm -v "$(pwd)/${file}:/app/plano_config.yaml:ro" -v "$(pwd)/${file}_rendered:/app/plano_config_rendered.yaml:rw" --entrypoint /bin/sh ${PLANO_DOCKER_IMAGE:-katanemo/plano:0.4.9} -c "python -m planoai.config_generator" 2>&1 > /dev/null ; then
echo "Validation failed for $file"
failed_files+=("$file")
fi

3
crates/Cargo.lock generated
View file

@ -436,11 +436,14 @@ name = "common"
version = "0.1.0"
dependencies = [
"axum",
"bytes",
"derivative",
"duration-string",
"governor",
"hermesllm",
"hex",
"http-body-util",
"hyper 1.6.0",
"log",
"pretty_assertions",
"proxy-wasm",

View file

@ -2,6 +2,8 @@ use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use common::configuration::SpanAttributes;
use common::errors::BrightStaffError;
use common::llm_providers::LlmProviders;
use hermesllm::apis::OpenAIMessage;
use hermesllm::clients::SupportedAPIsFromClient;
@ -19,17 +21,17 @@ use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use crate::tracing::{operation_component, set_service_name};
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
pub enum AgentFilterChainError {
#[error("Forwarded error: {0}")]
Brightstaff(#[from] BrightStaffError),
#[error("Agent selection error: {0}")]
Selection(#[from] AgentSelectionError),
#[error("Pipeline processing error: {0}")]
Pipeline(#[from] PipelineError),
#[error("Response handling error: {0}")]
Response(#[from] super::response_handler::ResponseError),
#[error("Request parsing error: {0}")]
RequestParsing(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
@ -42,8 +44,11 @@ pub async fn agent_chat(
_: String,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
span_attributes: Arc<Option<SpanAttributes>>,
llm_providers: Arc<RwLock<LlmProviders>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let custom_attrs =
collect_custom_trace_attributes(request.headers(), span_attributes.as_ref().as_ref());
// Extract request_id from headers or generate a new one
let request_id: String = match request
.headers()
@ -76,6 +81,7 @@ pub async fn agent_chat(
listeners,
llm_providers,
request_id,
custom_attrs,
)
.await
{
@ -103,16 +109,15 @@ pub async fn agent_chat(
"agent_response": body
});
let status_code = hyper::StatusCode::from_u16(*status)
.unwrap_or(hyper::StatusCode::INTERNAL_SERVER_ERROR);
let json_string = error_json.to_string();
let mut response =
Response::new(ResponseHandler::create_full_body(json_string));
*response.status_mut() = hyper::StatusCode::from_u16(*status)
.unwrap_or(hyper::StatusCode::BAD_REQUEST);
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
"application/json".parse().unwrap(),
);
return Ok(response);
return Ok(BrightStaffError::ForwardedError {
status_code,
message: json_string,
}
.into_response());
}
// Print detailed error information with full error chain for other errors
@ -145,8 +150,11 @@ pub async fn agent_chat(
// Log the error for debugging
info!(error = %error_json, "structured error info");
// Return JSON error response
Ok(ResponseHandler::create_json_error_response(&error_json))
Ok(BrightStaffError::ForwardedError {
status_code: StatusCode::BAD_REQUEST,
message: error_json.to_string(),
}
.into_response())
}
}
}
@ -161,6 +169,7 @@ async fn handle_agent_chat_inner(
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
llm_providers: Arc<RwLock<LlmProviders>>,
request_id: String,
custom_attrs: std::collections::HashMap<String, String>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(orchestrator_service);
@ -183,6 +192,9 @@ async fn handle_agent_chat_inner(
get_active_span(|span| {
span.update_name(listener.name.to_string());
for (key, value) in &custom_attrs {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
});
info!(listener = %listener.name, "handling request");
@ -249,10 +261,7 @@ async fn handle_agent_chat_inner(
None => {
let err_msg = "No model specified in request and no default provider configured";
warn!("{}", err_msg);
let mut bad_request =
Response::new(ResponseHandler::create_full_body(err_msg.to_string()));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
return Ok(BrightStaffError::NoModelSpecified.into_response());
}
}
}
@ -348,6 +357,9 @@ async fn handle_agent_chat_inner(
set_service_name(operation_component::AGENT);
get_active_span(|span| {
span.update_name(format!("{} /v1/chat/completions", agent_name));
for (key, value) in &custom_attrs {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
});
pipeline_processor

View file

@ -5,9 +5,10 @@ use hyper::header::HeaderMap;
use crate::handlers::agent_selector::{AgentSelectionError, AgentSelector};
use crate::handlers::pipeline_processor::PipelineProcessor;
use crate::handlers::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use common::errors::BrightStaffError;
use http_body_util::BodyExt;
use hyper::StatusCode;
/// Integration test that demonstrates the modular agent chat flow
/// This test shows how the three main components work together:
/// 1. AgentSelector - selects the appropriate agents based on orchestration
@ -128,8 +129,24 @@ mod tests {
}
// Test 4: Error Response Creation
let error_response = ResponseHandler::create_bad_request("Test error");
assert_eq!(error_response.status(), hyper::StatusCode::BAD_REQUEST);
let err = BrightStaffError::ModelNotFound("gpt-5-secret".to_string());
let response = err.into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
// Helper to extract body as JSON
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["error"]["code"], "ModelNotFound");
assert_eq!(
body["error"]["details"]["rejected_model_id"],
"gpt-5-secret"
);
assert!(body["error"]["message"]
.as_str()
.unwrap()
.contains("gpt-5-secret"));
println!("✅ All modular components working correctly!");
}
@ -148,12 +165,21 @@ mod tests {
AgentSelectionError::ListenerNotFound(_)
));
// Test error response creation
let error_response = ResponseHandler::create_internal_error("Pipeline failed");
assert_eq!(
error_response.status(),
hyper::StatusCode::INTERNAL_SERVER_ERROR
);
let technical_reason = "Database connection timed out";
let err = BrightStaffError::InternalServerError(technical_reason.to_string());
let response = err.into_response();
// --- 1. EXTRACT BYTES ---
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
// --- 2. DECLARE body_json HERE ---
let body_json: serde_json::Value =
serde_json::from_slice(&body_bytes).expect("Failed to parse JSON body");
// --- 3. USE body_json ---
assert_eq!(body_json["error"]["code"], "InternalServerError");
assert_eq!(body_json["error"]["details"]["reason"], technical_reason);
println!("✅ Error handling working correctly!");
}

View file

@ -1,5 +1,5 @@
use bytes::Bytes;
use common::configuration::ModelAlias;
use common::configuration::{ModelAlias, SpanAttributes};
use common::consts::{
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
@ -8,9 +8,9 @@ use hermesllm::apis::openai_responses::{InputParam, Tool as ResponsesTool};
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
use hermesllm::{ProviderRequest, ProviderRequestType};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use http_body_util::BodyExt;
use hyper::header::{self};
use hyper::{Request, Response, StatusCode};
use hyper::{Request, Response};
use opentelemetry::global;
use opentelemetry::trace::get_active_span;
use opentelemetry_http::HeaderInjector;
@ -28,13 +28,11 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
use crate::state::{
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
};
use crate::tracing::{llm as tracing_llm, operation_component, set_service_name};
use crate::tracing::{
collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name,
};
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
use common::errors::BrightStaffError;
pub async fn llm_chat(
request: Request<hyper::body::Incoming>,
@ -42,6 +40,7 @@ pub async fn llm_chat(
full_qualified_llm_provider_url: String,
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<LlmProviders>>,
span_attributes: Arc<Option<SpanAttributes>>,
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
@ -54,6 +53,8 @@ pub async fn llm_chat(
Some(id) => id,
None => uuid::Uuid::new_v4().to_string(),
};
let custom_attrs =
collect_custom_trace_attributes(&request_headers, span_attributes.as_ref().as_ref());
// Create a span with request_id that will be included in all log lines
let request_span = info_span!(
@ -75,6 +76,7 @@ pub async fn llm_chat(
full_qualified_llm_provider_url,
model_aliases,
llm_providers,
custom_attrs,
state_storage,
request_id,
request_path,
@ -91,6 +93,7 @@ async fn llm_chat_inner(
full_qualified_llm_provider_url: String,
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<LlmProviders>>,
custom_attrs: HashMap<String, String>,
state_storage: Option<Arc<dyn StateStorage>>,
request_id: String,
request_path: String,
@ -98,6 +101,11 @@ async fn llm_chat_inner(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
// Set service name for LLM operations
set_service_name(operation_component::LLM);
get_active_span(|span| {
for (key, value) in &custom_attrs {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
});
// Extract or generate traceparent - this establishes the trace context for all spans
let traceparent: String = match request_headers
@ -135,10 +143,11 @@ async fn llm_chat_inner(
error = %err,
"failed to parse request as ProviderRequestType"
);
let err_msg = format!("Failed to parse request: {}", err);
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
return Ok(BrightStaffError::InvalidRequest(format!(
"Failed to parse request: {}",
err
))
.into_response());
}
};
@ -165,9 +174,7 @@ async fn llm_chat_inner(
None => {
let err_msg = "No model specified in request and no default provider configured";
warn!("{}", err_msg);
let mut bad_request = Response::new(full(err_msg.to_string()));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
return Ok(BrightStaffError::NoModelSpecified.into_response());
}
}
} else {
@ -188,14 +195,8 @@ async fn llm_chat_inner(
.get(&alias_resolved_model)
.is_none()
{
let err_msg = format!(
"Model '{}' not found in configured providers",
alias_resolved_model
);
warn!(model = %alias_resolved_model, "model not found in configured providers");
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
return Ok(BrightStaffError::ModelNotFound(alias_resolved_model).into_response());
}
// Handle provider/model slug format (e.g., "openai/gpt-4")
@ -290,13 +291,10 @@ async fn llm_chat_inner(
Err(StateStorageError::NotFound(_)) => {
// Return 409 Conflict when previous_response_id not found
warn!(previous_response_id = %prev_resp_id, "previous response_id not found");
let err_msg = format!(
"Conversation state not found for previous_response_id: {}",
prev_resp_id
);
let mut conflict_response = Response::new(full(err_msg));
*conflict_response.status_mut() = StatusCode::CONFLICT;
return Ok(conflict_response);
return Ok(BrightStaffError::ConversationStateNotFound(
prev_resp_id.to_string(),
)
.into_response());
}
Err(e) => {
// Log warning but continue on other storage errors
@ -387,9 +385,11 @@ async fn llm_chat_inner(
model_name: "none".to_string(),
}
} else {
let mut internal_error = Response::new(full(err.message));
*internal_error.status_mut() = err.status_code;
return Ok(internal_error);
return Ok(BrightStaffError::ForwardedError {
status_code: err.status_code,
message: err.message,
}
.into_response());
}
}
};
@ -426,9 +426,7 @@ async fn llm_chat_inner(
None => {
let err_msg = "No configured model can serve OpenAI Responses API requests with non-function tools".to_string();
warn!(request_id = %request_id, error = %err_msg, "capability-aware routing failed");
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
return Ok(BrightStaffError::InvalidRequest(err_msg).into_response());
}
}
} else {
@ -486,10 +484,11 @@ async fn llm_chat_inner(
{
Ok(res) => res,
Err(err) => {
let err_msg = format!("Failed to send request: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return Ok(internal_error);
return Ok(BrightStaffError::InternalServerError(format!(
"Failed to send request: {}",
err
))
.into_response());
}
};
@ -546,12 +545,11 @@ async fn llm_chat_inner(
match response.body(streaming_response.body) {
Ok(response) => Ok(response),
Err(err) => {
let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(internal_error)
}
Err(err) => Ok(BrightStaffError::InternalServerError(format!(
"Failed to create response: {}",
err
))
.into_response()),
}
}

View file

@ -1,25 +1,17 @@
use bytes::Bytes;
use common::errors::BrightStaffError;
use hermesllm::apis::OpenAIApi;
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
use hermesllm::SseEvent;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::Frame;
use hyper::{Response, StatusCode};
use hyper::Response;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{info, warn, Instrument};
/// Errors that can occur during response handling
#[derive(Debug, thiserror::Error)]
pub enum ResponseError {
#[error("Failed to create response: {0}")]
ResponseCreationFailed(#[from] hyper::http::Error),
#[error("Stream error: {0}")]
StreamError(String),
}
/// Service for handling HTTP responses and streaming
pub struct ResponseHandler;
@ -35,40 +27,6 @@ impl ResponseHandler {
.boxed()
}
/// Create an error response with a given status code and message
pub fn create_error_response(
status: StatusCode,
message: &str,
) -> Response<BoxBody<Bytes, hyper::Error>> {
let mut response = Response::new(Self::create_full_body(message.to_string()));
*response.status_mut() = status;
response
}
/// Create a bad request response
pub fn create_bad_request(message: &str) -> Response<BoxBody<Bytes, hyper::Error>> {
Self::create_error_response(StatusCode::BAD_REQUEST, message)
}
/// Create an internal server error response
pub fn create_internal_error(message: &str) -> Response<BoxBody<Bytes, hyper::Error>> {
Self::create_error_response(StatusCode::INTERNAL_SERVER_ERROR, message)
}
/// Create a JSON error response
pub fn create_json_error_response(
error_json: &serde_json::Value,
) -> Response<BoxBody<Bytes, hyper::Error>> {
let json_string = error_json.to_string();
let mut response = Response::new(Self::create_full_body(json_string));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
"application/json".parse().unwrap(),
);
response
}
/// Create a streaming response from a reqwest response.
/// The spawned streaming task is instrumented with both `agent_span` and `orchestrator_span`
/// so their durations reflect the actual time spent streaming to the client.
@ -77,13 +35,13 @@ impl ResponseHandler {
llm_response: reqwest::Response,
agent_span: tracing::Span,
orchestrator_span: tracing::Span,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ResponseError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, BrightStaffError> {
// Copy headers from the original response
let response_headers = llm_response.headers();
let mut response_builder = Response::builder();
let headers = response_builder.headers_mut().ok_or_else(|| {
ResponseError::StreamError("Failed to get mutable headers".to_string())
BrightStaffError::StreamError("Failed to get mutable headers".to_string())
})?;
for (header_name, header_value) in response_headers.iter() {
@ -123,7 +81,7 @@ impl ResponseHandler {
response_builder
.body(stream_body)
.map_err(ResponseError::from)
.map_err(BrightStaffError::from)
}
/// Collect the full response body as a string
@ -136,7 +94,7 @@ impl ResponseHandler {
pub async fn collect_full_response(
&self,
llm_response: reqwest::Response,
) -> Result<String, ResponseError> {
) -> Result<String, BrightStaffError> {
use hermesllm::apis::streaming_shapes::sse::SseStreamIter;
let response_headers = llm_response.headers();
@ -144,10 +102,9 @@ impl ResponseHandler {
.get(hyper::header::CONTENT_TYPE)
.is_some_and(|v| v.to_str().unwrap_or("").contains("text/event-stream"));
let response_bytes = llm_response
.bytes()
.await
.map_err(|e| ResponseError::StreamError(format!("Failed to read response: {}", e)))?;
let response_bytes = llm_response.bytes().await.map_err(|e| {
BrightStaffError::StreamError(format!("Failed to read response: {}", e))
})?;
if is_sse_streaming {
let client_api =
@ -185,7 +142,7 @@ impl ResponseHandler {
} else {
// If not SSE, treat as regular text response
let response_text = String::from_utf8(response_bytes.to_vec()).map_err(|e| {
ResponseError::StreamError(format!("Failed to decode response: {}", e))
BrightStaffError::StreamError(format!("Failed to decode response: {}", e))
})?;
Ok(response_text)
@ -204,42 +161,6 @@ mod tests {
use super::*;
use hyper::StatusCode;
#[test]
fn test_create_bad_request() {
let response = ResponseHandler::create_bad_request("Invalid request");
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[test]
fn test_create_internal_error() {
let response = ResponseHandler::create_internal_error("Server error");
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
fn test_create_error_response() {
let response =
ResponseHandler::create_error_response(StatusCode::NOT_FOUND, "Resource not found");
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[test]
fn test_create_json_error_response() {
let error_json = serde_json::json!({
"error": {
"type": "TestError",
"message": "Test error message"
}
});
let response = ResponseHandler::create_json_error_response(&error_json);
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(
response.headers().get("content-type").unwrap(),
"application/json"
);
}
#[tokio::test]
async fn test_create_streaming_response_with_mock() {
use mockito::Server;

View file

@ -114,6 +114,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
));
let model_aliases = Arc::new(plano_config.model_aliases.clone());
let span_attributes = Arc::new(
plano_config
.tracing
.as_ref()
.and_then(|tracing| tracing.span_attributes.clone()),
);
// Initialize trace collector and start background flusher
// Tracing is enabled if the tracing config is present in plano_config.yaml
@ -173,6 +179,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let span_attributes = span_attributes.clone();
let state_storage = state_storage.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
@ -183,6 +190,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let model_aliases = Arc::clone(&model_aliases);
let agents_list = agents_list.clone();
let listeners = listeners.clone();
let span_attributes = span_attributes.clone();
let state_storage = state_storage.clone();
async move {
@ -202,6 +210,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fully_qualified_url,
agents_list,
listeners,
span_attributes,
llm_providers,
)
.with_context(parent_cx)
@ -220,6 +229,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fully_qualified_url,
model_aliases,
llm_providers,
span_attributes,
state_storage,
)
.with_context(parent_cx)

View file

@ -0,0 +1,156 @@
use std::collections::HashMap;
use common::configuration::SpanAttributes;
use common::traces::SpanBuilder;
use hyper::header::HeaderMap;
pub fn collect_custom_trace_attributes(
headers: &HeaderMap,
span_attributes: Option<&SpanAttributes>,
) -> HashMap<String, String> {
let mut attributes = HashMap::new();
let Some(span_attributes) = span_attributes else {
return attributes;
};
if let Some(static_attributes) = span_attributes.static_attributes.as_ref() {
for (key, value) in static_attributes {
attributes.insert(key.clone(), value.clone());
}
}
let Some(header_prefixes) = span_attributes.header_prefixes.as_deref() else {
return attributes;
};
if header_prefixes.is_empty() {
return attributes;
}
for (name, value) in headers.iter() {
let header_name = name.as_str();
let matched_prefix = header_prefixes
.iter()
.find(|prefix| header_name.starts_with(prefix.as_str()))
.map(String::as_str);
let Some(prefix) = matched_prefix else {
continue;
};
let Some(raw_value) = value.to_str().ok().map(str::trim) else {
continue;
};
let suffix = header_name.strip_prefix(prefix).unwrap_or("");
let suffix_key = suffix.trim_start_matches('-').replace('-', ".");
if suffix_key.is_empty() {
continue;
}
attributes.insert(suffix_key, raw_value.to_string());
}
attributes
}
pub fn append_span_attributes(
mut span_builder: SpanBuilder,
attributes: &HashMap<String, String>,
) -> SpanBuilder {
for (key, value) in attributes {
span_builder = span_builder.with_attribute(key, value);
}
span_builder
}
#[cfg(test)]
mod tests {
use super::collect_custom_trace_attributes;
use common::configuration::SpanAttributes;
use hyper::header::{HeaderMap, HeaderValue};
use std::collections::HashMap;
#[test]
fn extracts_headers_by_prefix() {
let mut headers = HeaderMap::new();
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
headers.insert("x-katanemo-user-id", HeaderValue::from_static("usr_789"));
headers.insert("x-katanemo-admin-level", HeaderValue::from_static("3"));
headers.insert("x-other-id", HeaderValue::from_static("ignored"));
let attrs = collect_custom_trace_attributes(
&headers,
Some(&SpanAttributes {
header_prefixes: Some(vec!["x-katanemo-".to_string()]),
static_attributes: None,
}),
);
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
assert_eq!(attrs.get("admin.level"), Some(&"3".to_string()));
assert!(!attrs.contains_key("other.id"));
}
#[test]
fn returns_empty_when_prefixes_missing_or_empty() {
let mut headers = HeaderMap::new();
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
let attrs_none = collect_custom_trace_attributes(
&headers,
Some(&SpanAttributes {
header_prefixes: None,
static_attributes: None,
}),
);
assert!(attrs_none.is_empty());
let attrs_empty = collect_custom_trace_attributes(
&headers,
Some(&SpanAttributes {
header_prefixes: Some(Vec::new()),
static_attributes: None,
}),
);
assert!(attrs_empty.is_empty());
}
#[test]
fn supports_multiple_prefixes() {
let mut headers = HeaderMap::new();
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
headers.insert("x-tenant-user-id", HeaderValue::from_static("usr_789"));
let attrs = collect_custom_trace_attributes(
&headers,
Some(&SpanAttributes {
header_prefixes: Some(vec!["x-katanemo-".to_string(), "x-tenant-".to_string()]),
static_attributes: None,
}),
);
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
}
#[test]
fn header_attributes_override_static_attributes() {
let mut headers = HeaderMap::new();
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
let mut static_attributes = HashMap::new();
static_attributes.insert("tenant.id".to_string(), "ten_static".to_string());
static_attributes.insert("environment".to_string(), "prod".to_string());
let attrs = collect_custom_trace_attributes(
&headers,
Some(&SpanAttributes {
header_prefixes: Some(vec!["x-katanemo-".to_string()]),
static_attributes: Some(static_attributes),
}),
);
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
assert_eq!(attrs.get("environment"), Some(&"prod".to_string()));
}
}

View file

@ -1,9 +1,11 @@
mod constants;
mod custom_attributes;
mod service_name_exporter;
pub use constants::{
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
};
pub use custom_attributes::{append_span_attributes, collect_custom_trace_attributes};
pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY};
use opentelemetry::trace::get_active_span;

View file

@ -20,6 +20,9 @@ urlencoding = "2.1.3"
url = "2.5.4"
hermesllm = { version = "0.1.0", path = "../hermesllm" }
serde_with = "3.13.0"
hyper = "1.0"
bytes = "1.0"
http-body-util = "0.1"
[features]
default = []
@ -30,3 +33,6 @@ serde_json = "1.0.64"
serial_test = "3.2"
axum = "0.7"
tokio = { version = "1.44", features = ["sync", "time", "macros", "rt"] }
hyper = { version = "1.0", features = ["full"] }
bytes = "1.0"
http-body-util = "0.1"

View file

@ -92,6 +92,14 @@ pub struct Tracing {
pub trace_arch_internal: Option<bool>,
pub random_sampling: Option<u32>,
pub opentracing_grpc_endpoint: Option<String>,
pub span_attributes: Option<SpanAttributes>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SpanAttributes {
pub header_prefixes: Option<Vec<String>>,
#[serde(rename = "static")]
pub static_attributes: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]

View file

@ -1,9 +1,13 @@
use proxy_wasm::types::Status;
use crate::{api::open_ai::ChatCompletionChunkResponseError, ratelimit};
use bytes::Bytes;
use hermesllm::apis::openai::OpenAIError;
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{Error as HyperError, Response, StatusCode};
use proxy_wasm::types::Status;
use serde_json::json;
use thiserror::Error;
#[derive(thiserror::Error, Debug)]
#[derive(Error, Debug)]
pub enum ClientError {
#[error("Error dispatching HTTP call to `{upstream_name}/{path}`, error: {internal_status:?}")]
DispatchError {
@ -13,7 +17,7 @@ pub enum ClientError {
},
}
#[derive(thiserror::Error, Debug)]
#[derive(Error, Debug)]
pub enum ServerError {
#[error(transparent)]
HttpDispatch(ClientError),
@ -43,3 +47,174 @@ pub enum ServerError {
#[error("error parsing openai message: {0}")]
OpenAIPError(#[from] OpenAIError),
}
// -----------------------------------------------------------------------------
// BrightStaff Errors (Standardized)
// -----------------------------------------------------------------------------
#[derive(Debug, Error)]
pub enum BrightStaffError {
#[error("The requested model '{0}' does not exist")]
ModelNotFound(String),
#[error("No model specified in request and no default provider configured")]
NoModelSpecified,
#[error("Conversation state not found for previous_response_id: {0}")]
ConversationStateNotFound(String),
#[error("Internal server error")]
InternalServerError(String),
#[error("Invalid request")]
InvalidRequest(String),
#[error("{message}")]
ForwardedError {
status_code: StatusCode,
message: String,
},
#[error("Stream error: {0}")]
StreamError(String),
#[error("Failed to create response: {0}")]
ResponseCreationFailed(#[from] hyper::http::Error),
}
impl BrightStaffError {
pub fn into_response(self) -> Response<BoxBody<Bytes, HyperError>> {
let (status, code, details) = match &self {
BrightStaffError::ModelNotFound(model_name) => (
StatusCode::NOT_FOUND,
"ModelNotFound",
json!({ "rejected_model_id": model_name }),
),
BrightStaffError::NoModelSpecified => {
(StatusCode::BAD_REQUEST, "NoModelSpecified", json!({}))
}
BrightStaffError::ConversationStateNotFound(prev_resp_id) => (
StatusCode::CONFLICT,
"ConversationStateNotFound",
json!({ "previous_response_id": prev_resp_id }),
),
BrightStaffError::InternalServerError(reason) => (
StatusCode::INTERNAL_SERVER_ERROR,
"InternalServerError",
// Passing the reason into details for easier debugging
json!({ "reason": reason }),
),
BrightStaffError::InvalidRequest(reason) => (
StatusCode::BAD_REQUEST,
"InvalidRequest",
json!({ "reason": reason }),
),
BrightStaffError::ForwardedError {
status_code,
message,
} => (*status_code, "ForwardedError", json!({ "reason": message })),
BrightStaffError::StreamError(reason) => (
StatusCode::BAD_REQUEST,
"StreamError",
json!({ "reason": reason }),
),
BrightStaffError::ResponseCreationFailed(reason) => (
StatusCode::BAD_REQUEST,
"ResponseCreationFailed",
json!({ "reason": reason.to_string() }),
),
};
let body_json = json!({
"error": {
"code": code,
"message": self.to_string(),
"details": details
}
});
// 1. Create the concrete body
let full_body = Full::new(Bytes::from(body_json.to_string()));
// 2. Convert it to BoxBody
// We map_err because Full never fails, but BoxBody expects a HyperError
let boxed_body = full_body
.map_err(|never| match never {}) // This handles the "Infallible" error type
.boxed();
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(boxed_body)
.unwrap_or_else(|_| {
Response::new(
Full::new(Bytes::from("Internal Error"))
.map_err(|never| match never {})
.boxed(),
)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use http_body_util::BodyExt; // For .collect().await
#[tokio::test]
async fn test_model_not_found_format() {
let err = BrightStaffError::ModelNotFound("gpt-5-secret".to_string());
let response = err.into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
// Helper to extract body as JSON
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["error"]["code"], "ModelNotFound");
assert_eq!(
body["error"]["details"]["rejected_model_id"],
"gpt-5-secret"
);
assert!(body["error"]["message"]
.as_str()
.unwrap()
.contains("gpt-5-secret"));
}
#[tokio::test]
async fn test_forwarded_error_preserves_status() {
let err = BrightStaffError::ForwardedError {
status_code: StatusCode::TOO_MANY_REQUESTS,
message: "Rate limit exceeded on agent side".to_string(),
};
let response = err.into_response();
assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS);
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["error"]["code"], "ForwardedError");
}
#[tokio::test]
async fn test_hyper_error_wrapping() {
// Manually trigger a hyper error by creating an invalid URI/Header
let hyper_err = hyper::http::Response::builder()
.status(1000) // Invalid status
.body(())
.unwrap_err();
let err = BrightStaffError::ResponseCreationFailed(hyper_err);
let response = err.into_response();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
}

View file

@ -55,3 +55,6 @@ listeners:
tracing:
random_sampling: 100
span_attributes:
header_prefixes:
- x-acme-

View file

@ -3,9 +3,16 @@
### Travel Agent Chat Completion Request
POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1
Content-Type: application/json
X-Acme-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3
X-Acme-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a
X-Acme-User-Id: usr_19df7e6751b846f9ba026776e3c12abe
X-Acme-Admin-Level: 3
X-Acme-Environment: production
X-Acme-Is-Internal: false
X-Acme-Cost-Center: HD100
{
"model": "gpt-4o",
"model": "gpt-5.2",
"messages": [
{
"role": "user",
@ -20,7 +27,28 @@ Content-Type: application/json
"content": "What is one Alaska flight that goes direct to Atlanta from Seattle?"
}
],
"max_tokens": 1000,
"max_completion_tokens": 1000,
"stream": false,
"temperature": 1.0
}
### Travel Agent Request (prefix mismatch - ignored)
POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1
Content-Type: application/json
X-Other-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3
X-Other-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a
X-Other-User-Id: usr_19df7e6751b846f9ba026776e3c12abe
{
"model": "gpt-5.2",
"messages": [
{
"role": "user",
"content": "What's the weather in Seattle?"
}
],
"max_completion_tokens": 1000,
"stream": false,
"temperature": 1.0
}

View file

@ -15,9 +15,9 @@ Make sure your machine is up to date with [latest version of plano]([url](https:
```bash
(venv) $ planoai up --service plano --foreground
# Or if installed with uv: uvx planoai up --service plano --foreground
2025-05-30 18:00:09,953 - planoai.main - INFO - Starting plano cli version: 0.4.8
2025-05-30 18:00:09,953 - planoai.main - INFO - Starting plano cli version: 0.4.9
2025-05-30 18:00:09,953 - planoai.main - INFO - Validating /Users/adilhafeez/src/intelligent-prompt-gateway/demos/llm_routing/preference_based_routing/config.yaml
2025-05-30 18:00:10,422 - cli.core - INFO - Starting plano gateway, image name: plano, tag: katanemo/plano:0.4.8
2025-05-30 18:00:10,422 - cli.core - INFO - Starting plano gateway, image name: plano, tag: katanemo/plano:0.4.9
2025-05-30 18:00:10,662 - cli.core - INFO - plano status: running, health status: starting
2025-05-30 18:00:11,712 - cli.core - INFO - plano status: running, health status: starting
2025-05-30 18:00:12,761 - cli.core - INFO - plano is running and is healthy!

Binary file not shown.

After

Width:  |  Height:  |  Size: 321 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

View file

@ -0,0 +1,18 @@
/* Fix: Prevent "Copy code" button label from appearing in clipboard content.
*
* sphinxawesome_theme inserts a copy button inside <pre> elements. When
* clipboard.js selects all children of the <pre> to copy, the button's
* sr-only text ("Copy code") is included in the selection. This listener
* intercepts the copy event and strips that trailing label from the data
* written to the clipboard.
*/
document.addEventListener('copy', function (e) {
if (!e.clipboardData) { return; }
var selection = window.getSelection();
if (!selection) { return; }
var text = selection.toString();
var clean = text.replace(/\nCopy code\s*$/, '');
if (clean === text) { return; }
e.clipboardData.setData('text/plain', clean);
e.preventDefault();
}, true);

View file

@ -17,7 +17,7 @@ from sphinxawesome_theme.postprocess import Icons
project = "Plano Docs"
copyright = "2025, Katanemo Labs, Inc"
author = "Katanemo Labs, Inc"
release = " v0.4.8"
release = " v0.4.9"
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
@ -116,6 +116,7 @@ html_theme_options = asdict(theme_options)
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ["_static"]
html_css_files = ["css/custom.css"]
html_js_files = ["js/fix-copy.js"]
pygments_style = "lovelace"
pygments_style_dark = "github-dark"

View file

@ -37,7 +37,7 @@ Plano's CLI allows you to manage and interact with the Plano efficiently. To ins
.. code-block:: console
$ uv tool install planoai==0.4.8
$ uv tool install planoai==0.4.9
**Option 2: Install with pip (Traditional)**
@ -45,7 +45,7 @@ Plano's CLI allows you to manage and interact with the Plano efficiently. To ins
$ python -m venv venv
$ source venv/bin/activate # On Windows, use: venv\Scripts\activate
$ pip install planoai==0.4.8
$ pip install planoai==0.4.9
.. _llm_routing_quickstart:
@ -90,7 +90,7 @@ Start Plano:
$ planoai up plano_config.yaml
# Or if installed with uv tool: uvx planoai up plano_config.yaml
2024-12-05 11:24:51,288 - planoai.main - INFO - Starting plano cli version: 0.4.8
2024-12-05 11:24:51,288 - planoai.main - INFO - Starting plano cli version: 0.4.9
2024-12-05 11:24:51,825 - planoai.utils - INFO - Schema validation successful!
2024-12-05 11:24:51,825 - planoai.main - INFO - Starting plano
...

View file

@ -142,6 +142,109 @@ In your observability platform (Jaeger, Grafana Tempo, Datadog, etc.), filter tr
For complete details on all available signals, detection methods, and best practices, see the :doc:`../../concepts/signals` guide.
Custom Span Attributes
-------------------------------------------
Plano can automatically attach **custom span attributes** derived from request headers and **static** attributes
defined in configuration. This lets you stamp
traces with identifiers like workspace, tenant, or user IDs without changing application code or adding
custom instrumentation.
**Why This Is Useful**
- **Tenant-aware debugging**: Filter traces by ``workspace.id`` or ``tenant.id``.
- **Customer-specific visibility**: Attribute performance or errors to a specific customer.
- **Low overhead**: No code changes in agents or clients—just headers.
How It Works
~~~~~~~~~~~~
You configure one or more header prefixes. Any incoming HTTP header whose name starts with one of these
prefixes is captured as a span attribute. You can also provide static attributes that are always injected.
- The **prefix is only for matching**, not the resulting attribute key.
- The attribute key is the header name **with the prefix removed**, then hyphens converted to dots.
.. note::
Custom span attributes are attached to LLM spans when handling ``/v1/...`` requests via ``llm_chat``. For orchestrator requests to ``/agents/...``,
these attributes are added to both the orchestrator selection span and to each agent span created by ``agent_chat``.
**Example**
Configured prefix::
tracing:
span_attributes:
header_prefixes:
- x-katanemo-
Incoming headers::
X-Katanemo-Workspace-Id: ws_123
X-Katanemo-Tenant-Id: ten_456
Resulting span attributes::
workspace.id = "ws_123"
tenant.id = "ten_456"
Configuration
~~~~~~~~~~~~~
Add the prefix list under ``tracing`` in your config:
.. code-block:: yaml
tracing:
random_sampling: 100
span_attributes:
header_prefixes:
- x-katanemo-
static:
environment: production
service.version: "1.0.0"
Static attributes are always injected alongside any header-derived attributes. If a header-derived
attribute key matches a static key, the header value overrides the static value.
You can provide multiple prefixes:
.. code-block:: yaml
tracing:
span_attributes:
header_prefixes:
- x-katanemo-
- x-tenant-
static:
environment: production
service.version: "1.0.0"
Notes and Examples
~~~~~~~~~~~~~~~~~~
- **Prefix must match exactly**: ``katanemo-`` does not match ``x-katanemo-`` headers.
- **Trailing dash is recommended**: Without it, ``x-katanemo`` would also match ``x-katanemo-foo`` and
``x-katanemofoo``.
- **Keys are always strings**: Values are captured as string attributes.
**Prefix mismatch example**
Config::
tracing:
span_attributes:
header_prefixes:
- x-katanemo-
Request headers::
X-Other-User-Id: usr_999
Result: no attributes are captured from ``X-Other-User-Id``.
Benefits of Using ``Traceparent`` Headers
-----------------------------------------
@ -497,55 +600,7 @@ tools like AWS X-Ray and Datadog, enhancing observability and facilitating faste
Additional Resources
--------------------
CLI Reference
~~~~~~~~~~~~~
``planoai trace``
Trace requests captured by the local OTLP listener.
**Synopsis**
.. code-block:: console
$ planoai trace [TARGET] [OPTIONS]
**Targets**
- ``last`` (default): show the most recent trace.
- ``any``: allow interactive selection when available.
- ``<trace-id>``: full 32-hex trace ID.
- ``<short-id>``: first 8 hex characters.
**Options**
- ``--filter <pattern>``: limit displayed attributes to matching keys (supports ``*``).
- ``--where <key=value>``: match traces containing a specific attribute (repeatable, AND).
- ``--list``: list trace IDs only.
- ``--no-interactive``: disable interactive prompts/selections.
- ``--limit <n>``: limit the number of traces returned.
- ``--since <window>``: look back window (``5m``, ``2h``, ``1d``).
- ``--json``: output raw JSON instead of formatted output.
- ``--verbose, -v``: show all span attributes. By default, inbound/outbound
spans are displayed in a compact view.
**Environment**
- ``PLANO_TRACE_PORT``: gRPC port used by ``planoai trace`` to query traces
(defaults to ``4317``).
``planoai trace listen``
Start a local OTLP/gRPC listener.
**Synopsis**
.. code-block:: console
$ planoai trace listen [OPTIONS]
**Options**
- ``--host <host>``: bind address (default: ``0.0.0.0``).
- ``--port <port>``: gRPC listener port (default: ``4317``).
For full command documentation (including ``planoai trace`` and all other CLI commands), see :ref:`cli_reference`.
External References
~~~~~~~~~~~~~~~~~~~

View file

@ -165,7 +165,7 @@ Then set the environment variable before running Plano:
./plano
.. warning::
**Special Characters in Passwords**: If your password contains special characters like ``#``, ``@``, or ``&``, you must URL-encode them in the connection string. For example, ``MyPass#123`` becomes ``MyPass%23123``.
**Special Characters in Passwords**: If your password contains special characters like ``#``, ``@``, or ``&``, you must URL-encode them in the connection string. For example, ``P@ss#123`` becomes ``P%40ss%23123``.
Supabase Connection Strings
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -202,14 +202,14 @@ Use the direct connection (port 5432):
state_storage:
type: postgres
connection_string: "postgresql://postgres.myproject:$DB_PASSWORD@aws-0-us-west-2.pooler.supabase.com:5432/postgres"
connection_string: "postgresql://postgres.[YOUR-PROJECT-REF]:$DB_PASSWORD@aws-0-[REGION].pooler.supabase.com:5432/postgres"
Then set the environment variable:
.. code-block:: bash
# If your password is "MyPass#123", encode it as "MyPass%23123"
export DB_PASSWORD="MyPass%23123"
# If your password is "P@ss#123", encode it as "P%40ss%23123"
export DB_PASSWORD="<your-url-encoded-password>"
Troubleshooting
---------------

View file

@ -62,4 +62,5 @@ Built by contributors to the widely adopted `Envoy Proxy <https://www.envoyproxy
resources/tech_overview/tech_overview
resources/deployment
resources/configuration_reference
resources/cli_reference
resources/llms_txt

View file

@ -0,0 +1,302 @@
.. _cli_reference:
CLI Reference
=============
This reference documents the full ``planoai`` command-line interface for day-to-day development, local testing, and operational workflows.
Use this page as the canonical source for command syntax, options, and recommended usage patterns.
Quick Navigation
----------------
- :ref:`cli_reference_global`
- :ref:`cli_reference_up`
- :ref:`cli_reference_down`
- :ref:`cli_reference_build`
- :ref:`cli_reference_logs`
- :ref:`cli_reference_init`
- :ref:`cli_reference_trace`
- :ref:`cli_reference_prompt_targets`
- :ref:`cli_reference_cli_agent`
.. _cli_reference_global:
Global CLI Usage
----------------
**Command**
.. code-block:: console
$ planoai [COMMAND] [OPTIONS]
**Common global options**
- ``--help``: Show the top-level command menu.
- ``--version``: Show installed CLI version and update status.
**Help patterns**
.. code-block:: console
$ planoai --help
$ planoai trace --help
$ planoai init --help
.. figure:: /_static/img/cli-default-command.png
:width: 100%
:alt: planoai default command screenshot
``planoai`` command showing the top-level command menu.
.. _cli_reference_up:
planoai up
----------
Start Plano using a configuration file.
**Synopsis**
.. code-block:: console
$ planoai up [FILE] [--path <dir>] [--foreground] [--with-tracing] [--tracing-port <port>]
**Arguments**
- ``FILE`` (optional): explicit path to config file.
**Options**
- ``--path <dir>``: directory to search for config (default ``.``).
- ``--foreground``: run Plano in foreground.
- ``--with-tracing``: start local OTLP/gRPC trace collector.
- ``--tracing-port <port>``: collector port (default ``4317``).
.. note::
If you use ``--with-tracing``, ensure that port 4317 is free and not already in use by Jaeger or any other observability services or processes. If port 4317 is occupied, the command will fail to start the trace collector.
**Examples**
.. code-block:: console
$ planoai up config.yaml
$ planoai up --path ./deploy
$ planoai up --with-tracing
$ planoai up --with-tracing --tracing-port 4318
.. _cli_reference_down:
planoai down
------------
Stop Plano (container/process stack managed by the CLI).
**Synopsis**
.. code-block:: console
$ planoai down
.. _cli_reference_build:
planoai build
-------------
Build Plano Docker image from repository source.
**Synopsis**
.. code-block:: console
$ planoai build
.. _cli_reference_logs:
planoai logs
------------
Stream Plano logs.
**Synopsis**
.. code-block:: console
$ planoai logs [--follow] [--debug]
**Options**
- ``--follow``: stream logs continuously.
- ``--debug``: include additional gateway/debug streams.
**Examples**
.. code-block:: console
$ planoai logs
$ planoai logs --follow
$ planoai logs --follow --debug
.. _cli_reference_init:
planoai init
------------
Generate a new ``config.yaml`` using an interactive wizard, built-in templates, or a clean empty file.
**Synopsis**
.. code-block:: console
$ planoai init [--template <id> | --clean] [--output <path>] [--force] [--list-templates]
**Options**
- ``--template <id>``: create config from a built-in template id.
- ``--clean``: create an empty config file.
- ``--output, -o <path>``: output path (default ``config.yaml``).
- ``--force``: overwrite existing output file.
- ``--list-templates``: print available template IDs and exit.
**Examples**
.. code-block:: console
$ planoai init
$ planoai init --list-templates
$ planoai init --template coding_agent_routing
$ planoai init --clean --output ./config/config.yaml
.. figure:: /_static/img/cli-init-command.png
:width: 100%
:alt: planoai init command screenshot
``planoai init --list-templates`` showing built-in starter templates.
.. _cli_reference_trace:
planoai trace
-------------
Inspect request traces from the local OTLP listener.
**Synopsis**
.. code-block:: console
$ planoai trace [TARGET] [OPTIONS]
**Targets**
- ``last`` (default): show most recent trace.
- ``any``: consider all traces (interactive selection when terminal supports it).
- ``listen``: start local OTLP listener.
- ``down``: stop background listener.
- ``<trace-id>``: full 32-hex trace id.
- ``<short-id>``: first 8 hex chars of trace id.
**Display options**
- ``--filter <pattern>``: keep only matching attribute keys (supports ``*`` via "glob" syntax).
- ``--where <key=value>``: locate traces containing key/value (repeatable, AND semantics).
- ``--list``: list trace IDs instead of full trace output (use with ``--no-interactive`` to fetch plain-text trace IDs only).
- ``--no-interactive``: disable interactive selection prompts.
- ``--limit <n>``: limit returned traces.
- ``--since <window>``: lookback window such as ``5m``, ``2h``, ``1d``.
- ``--json``: emit JSON payloads.
- ``--verbose``, ``-v``: show full attribute output (disable compact trimming). Useful for debugging internal attributes.
**Listener options (for ``TARGET=listen``)**
- ``--host <host>``: bind host (default ``0.0.0.0``).
- ``--port <port>``: bind port (default ``4317``).
.. note::
When using ``listen``, ensure that port 4317 is free and not already in use by Jaeger or any other observability services or processes. If port 4317 is occupied, the command will fail to start the trace collector. You cannot use other services on the same port when running.
**Environment**
- ``PLANO_TRACE_PORT``: query port used by ``planoai trace`` when reading traces (default ``4317``).
**Examples**
.. code-block:: console
# Start/stop listener
$ planoai trace listen
$ planoai trace down
# Basic inspection
$ planoai trace
$ planoai trace 7f4e9a1c
$ planoai trace 7f4e9a1c0d9d4a0bb9bf5a8a7d13f62a
# Filtering and automation
$ planoai trace --where llm.model=openai/gpt-5.2 --since 30m
$ planoai trace --filter "http.*"
$ planoai trace --list --limit 5
$ planoai trace --where http.status_code=500 --json
.. figure:: /_static/img/cli-trace-command.png
:width: 100%
:alt: planoai trace command screenshot
``planoai trace`` command showing trace inspection and filtering capabilities.
**Operational notes**
- ``--host`` and ``--port`` are valid only when ``TARGET`` is ``listen``.
- ``--list`` cannot be combined with a specific trace-id target.
.. _cli_reference_prompt_targets:
planoai prompt_targets
----------------------
Generate prompt-target metadata from Python methods.
**Synopsis**
.. code-block:: console
$ planoai prompt_targets --file <python-file>
**Options**
- ``--file, --f <python-file>``: required path to a ``.py`` source file.
.. _cli_reference_cli_agent:
planoai cli_agent
-----------------
Start an interactive CLI agent session against a running Plano deployment.
**Synopsis**
.. code-block:: console
$ planoai cli_agent claude [FILE] [--path <dir>] [--settings '<json>']
**Arguments**
- ``type``: currently ``claude``.
- ``FILE`` (optional): config file path.
**Options**
- ``--path <dir>``: directory containing config file.
- ``--settings <json>``: JSON settings payload for agent startup.

View file

@ -64,8 +64,8 @@ After setting up the database table, configure your application to use Supabase
**Example:**
```bash
# If your password is "MyPass#123", encode it as "MyPass%23123"
export DATABASE_URL="postgresql://postgres.myproject:MyPass%23123@aws-0-us-west-2.pooler.supabase.com:5432/postgres"
# If your password is "P@ss#123", encode it as "P%40ss%23123"
export DATABASE_URL="postgresql://postgres.[YOUR-PROJECT-REF]:<your-url-encoded-password>@aws-0-[REGION].pooler.supabase.com:5432/postgres"
```
### Testing the Connection

View file

@ -25,7 +25,7 @@ Create a ``docker-compose.yml`` file with the following configuration:
# docker-compose.yml
services:
plano:
image: katanemo/plano:0.4.8
image: katanemo/plano:0.4.9
container_name: plano
ports:
- "10000:10000" # ingress (client -> plano)

View file

@ -28,7 +28,7 @@ EXTRACTION_MODEL = "openai/gpt-4o-mini"
# FlightAware AeroAPI configuration
AEROAPI_BASE_URL = "https://aeroapi.flightaware.com/aeroapi"
AEROAPI_KEY = os.getenv("AEROAPI_KEY", "ESVFX7TJLxB7OTuayUv0zTQBryA3tOPr")
AEROAPI_KEY = os.getenv("AEROAPI_KEY")
# HTTP client for API calls
http_client = httpx.AsyncClient(timeout=30.0)