mirror of
https://github.com/katanemo/plano.git
synced 2026-04-26 17:26:26 +02:00
add support for background trace collection and tracing output (#749)
* feat: add trace listener process management and foreground mode * docs: add CLI reference documentation and update index * fix: test coverage failing * refactor: simplify trace listener initialization and remove debug mode handling * docs: add CLI command screenshots to reference documentation * fix: update trace listener PID file path * refactor: integrate trace listener management into runtime module and streamline PID handling * adjusting trace command for feedback on PR
This commit is contained in:
parent
54bc8e5e52
commit
ed64230833
11 changed files with 2965 additions and 153 deletions
|
|
@ -2,8 +2,11 @@ import json
|
|||
import os
|
||||
import re
|
||||
import string
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from http import HTTPStatus
|
||||
from collections import OrderedDict
|
||||
from concurrent import futures
|
||||
from dataclasses import dataclass
|
||||
|
|
@ -22,6 +25,7 @@ from rich.text import Text
|
|||
from rich.tree import Tree
|
||||
|
||||
from planoai.consts import PLANO_COLOR
|
||||
from planoai import trace_listener_runtime
|
||||
|
||||
DEFAULT_GRPC_PORT = 4317
|
||||
MAX_TRACES = 50
|
||||
|
|
@ -35,7 +39,7 @@ class TraceListenerBindError(RuntimeError):
|
|||
def _trace_listener_bind_error_message(address: str) -> str:
|
||||
return (
|
||||
f"Failed to start OTLP listener on {address}: address is already in use.\n"
|
||||
"Stop the process using that port or run `planoai trace listen --port <PORT>`."
|
||||
"Stop the process using that port or run `planoai trace listen`."
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -57,6 +61,25 @@ class TraceSummary:
|
|||
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
def _is_port_in_use(host: str, port: int) -> bool:
|
||||
"""Check whether a TCP listener is accepting connections on host:port."""
|
||||
import socket
|
||||
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.settimeout(0.2)
|
||||
return s.connect_ex((host, port)) == 0
|
||||
|
||||
|
||||
def _get_listener_pid() -> int | None:
|
||||
"""Return persisted listener PID if process is alive."""
|
||||
return trace_listener_runtime.get_listener_pid()
|
||||
|
||||
|
||||
def _stop_background_listener() -> bool:
|
||||
"""Stop persisted listener process if one is running."""
|
||||
return trace_listener_runtime.stop_listener_process()
|
||||
|
||||
|
||||
def _parse_filter_patterns(filter_patterns: tuple[str, ...]) -> list[str]:
|
||||
parts: list[str] = []
|
||||
for raw in filter_patterns:
|
||||
|
|
@ -437,8 +460,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
|
|||
"""gRPC servicer that receives OTLP ExportTraceServiceRequest and
|
||||
merges incoming spans into the global _TRACE_STORE by trace_id."""
|
||||
|
||||
_console = Console(stderr=True)
|
||||
|
||||
def Export(self, request, context): # noqa: N802
|
||||
for resource_spans in request.resource_spans:
|
||||
service_name = "unknown"
|
||||
|
|
@ -456,27 +477,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
|
|||
continue
|
||||
span_dict = _proto_span_to_dict(span, service_name)
|
||||
_TRACE_STORE.merge_spans(trace_id, [span_dict])
|
||||
short_id = trace_id[:8]
|
||||
short_span = span.span_id.hex()[:8]
|
||||
span_start = (
|
||||
datetime.fromtimestamp(
|
||||
span.start_time_unix_nano / 1_000_000_000, tz=timezone.utc
|
||||
)
|
||||
.astimezone()
|
||||
.strftime("%H:%M:%S.%f")[:-3]
|
||||
)
|
||||
dur_ns = span.end_time_unix_nano - span.start_time_unix_nano
|
||||
dur_s = dur_ns / 1_000_000_000
|
||||
dur_str = f"{dur_s:.3f}".rstrip("0").rstrip(".")
|
||||
dur_str = f"{dur_str}s"
|
||||
self._console.print(
|
||||
f"[dim]{span_start}[/dim], "
|
||||
f"trace=[yellow]{short_id}[/yellow], "
|
||||
f"span=[yellow]{short_span}[/yellow], "
|
||||
f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] "
|
||||
f"[cyan]{span.name}[/cyan] "
|
||||
f"[dim]({dur_str})[/dim]"
|
||||
)
|
||||
|
||||
return trace_service_pb2.ExportTraceServiceResponse()
|
||||
|
||||
|
|
@ -499,12 +499,8 @@ class _TraceQueryHandler(grpc.GenericRpcHandler):
|
|||
return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
|
||||
def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
|
||||
"""Create, bind, and start an OTLP/gRPC trace-collection server.
|
||||
|
||||
Returns the running ``grpc.Server``. The caller is responsible
|
||||
for calling ``server.stop()`` when done.
|
||||
"""
|
||||
def _start_trace_server(host: str, grpc_port: int) -> grpc.Server:
|
||||
"""Create, bind, and start an OTLP/gRPC trace server."""
|
||||
grpc_server = grpc.server(
|
||||
futures.ThreadPoolExecutor(max_workers=4),
|
||||
handlers=[_TraceQueryHandler()],
|
||||
|
|
@ -525,38 +521,88 @@ def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
|
|||
return grpc_server
|
||||
|
||||
|
||||
def _start_trace_listener(host: str, grpc_port: int) -> None:
|
||||
"""Start the OTLP/gRPC listener and block until interrupted."""
|
||||
console = Console()
|
||||
try:
|
||||
grpc_server = _create_trace_server(host, grpc_port)
|
||||
except TraceListenerBindError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
def _serve_trace_listener(host: str, grpc_port: int) -> None:
|
||||
"""Run the listener loop until process termination."""
|
||||
# Persist PID immediately after fork, before server startup.
|
||||
# This ensures the PID file exists even if server initialization fails.
|
||||
trace_listener_runtime.write_listener_pid(os.getpid())
|
||||
|
||||
console.print()
|
||||
console.print(f"[bold {PLANO_COLOR}]Listening for traces...[/bold {PLANO_COLOR}]")
|
||||
console.print(
|
||||
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
|
||||
)
|
||||
console.print("[dim]Press Ctrl+C to stop.[/dim]")
|
||||
console.print()
|
||||
try:
|
||||
grpc_server = _start_trace_server(host, grpc_port)
|
||||
grpc_server.wait_for_termination()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
grpc_server.stop(grace=2)
|
||||
# Best-effort cleanup; server may not exist if startup failed.
|
||||
try:
|
||||
grpc_server.stop(grace=2)
|
||||
except NameError:
|
||||
pass
|
||||
trace_listener_runtime.remove_listener_pid()
|
||||
|
||||
|
||||
def _start_trace_listener(host: str, grpc_port: int) -> None:
|
||||
"""Start the OTLP/gRPC listener as a daemon process."""
|
||||
|
||||
console = Console()
|
||||
|
||||
# Check if the requested port is already in use.
|
||||
if _is_port_in_use(host, grpc_port):
|
||||
existing_pid = _get_listener_pid()
|
||||
if existing_pid:
|
||||
# If the process PID is known, inform user that our listener is already running.
|
||||
console.print(
|
||||
f"[yellow]⚠[/yellow] Trace listener already running on port [cyan]{grpc_port}[/cyan] (PID: {existing_pid})"
|
||||
)
|
||||
else:
|
||||
# If port is taken but no tracked listener PID exists, warn user of unknown conflict.
|
||||
console.print(
|
||||
f"[red]✗[/red] Port [cyan]{grpc_port}[/cyan] is already in use by another process"
|
||||
)
|
||||
console.print(f"\n[dim]Check what's using the port:[/dim]")
|
||||
console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]")
|
||||
return
|
||||
|
||||
# Fork/daemonize and run the trace server in the background.
|
||||
try:
|
||||
pid = trace_listener_runtime.daemonize_and_run(
|
||||
lambda: _serve_trace_listener(host, grpc_port)
|
||||
)
|
||||
except OSError as e:
|
||||
console.print(f"[red]✗[/red] Failed to start trace listener: {e}")
|
||||
return
|
||||
|
||||
if pid is None:
|
||||
# We're in the child process; daemonize_and_run never returns here.
|
||||
return
|
||||
|
||||
# In the parent process: wait briefly for the background process to bind the port.
|
||||
time.sleep(0.5) # Give child process time to start and bind to the port.
|
||||
|
||||
if _is_port_in_use(host, grpc_port):
|
||||
# Success: the trace listener started and bound the port.
|
||||
console.print()
|
||||
console.print(
|
||||
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
|
||||
)
|
||||
console.print(
|
||||
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
|
||||
)
|
||||
console.print(f"[dim]Process ID: {pid}[/dim]")
|
||||
console.print(
|
||||
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
|
||||
)
|
||||
console.print()
|
||||
else:
|
||||
# Failure: trace listener child process did not successfully start.
|
||||
console.print(f"[red]✗[/red] Failed to start trace listener")
|
||||
|
||||
|
||||
def start_trace_listener_background(
|
||||
host: str = "0.0.0.0", grpc_port: int = DEFAULT_GRPC_PORT
|
||||
) -> grpc.Server:
|
||||
"""Start the trace listener in the background (non-blocking).
|
||||
|
||||
Returns the running ``grpc.Server`` so the caller can call
|
||||
``server.stop()`` later.
|
||||
"""
|
||||
return _create_trace_server(host, grpc_port)
|
||||
"""Start the trace server in-process and return ``grpc.Server`` handle."""
|
||||
return _start_trace_server(host, grpc_port)
|
||||
|
||||
|
||||
def _span_time_ns(span: dict[str, Any], key: str) -> int:
|
||||
|
|
@ -584,13 +630,13 @@ def _trace_summary(trace: dict[str, Any]) -> TraceSummary:
|
|||
def _service_color(service: str) -> str:
|
||||
service = service.lower()
|
||||
if "inbound" in service:
|
||||
return "white"
|
||||
return "#4860fa"
|
||||
if "outbound" in service:
|
||||
return "white"
|
||||
return "#57d9a9"
|
||||
if "orchestrator" in service:
|
||||
return PLANO_COLOR
|
||||
if "routing" in service:
|
||||
return "magenta"
|
||||
return "#e3a2fa"
|
||||
if "agent" in service:
|
||||
return "cyan"
|
||||
if "llm" in service:
|
||||
|
|
@ -598,6 +644,63 @@ def _service_color(service: str) -> str:
|
|||
return "white"
|
||||
|
||||
|
||||
def _error_symbol(status_code: str) -> str:
|
||||
code = int(status_code) if status_code.isdigit() else 0
|
||||
|
||||
if code >= 500:
|
||||
return "💥" # Server error - something broke
|
||||
elif code == 429:
|
||||
return "🚦" # Rate limited
|
||||
elif code == 404:
|
||||
return "🔍" # Not found
|
||||
elif code == 403:
|
||||
return "🚫" # Forbidden
|
||||
elif code == 401:
|
||||
return "🔐" # Unauthorized
|
||||
elif code >= 400:
|
||||
return "⚠️" # Client error
|
||||
else:
|
||||
return "❓" # Generic error
|
||||
|
||||
|
||||
def _error_description(status_code: str) -> str:
|
||||
"""Return a developer-friendly description of the error."""
|
||||
code = int(status_code) if status_code.isdigit() else 0
|
||||
|
||||
if code < 400:
|
||||
return "Error"
|
||||
try:
|
||||
return HTTPStatus(code).phrase
|
||||
except ValueError:
|
||||
if code >= 500:
|
||||
return "Server Error"
|
||||
return "Client Error"
|
||||
|
||||
|
||||
def _detect_error(span: dict[str, Any]) -> tuple[bool, str, str]:
|
||||
"""Detect if span has an error and return (has_error, status_code, error_msg).
|
||||
|
||||
Returns:
|
||||
tuple: (has_error, status_code, error_description)
|
||||
"""
|
||||
attrs = _attrs(span)
|
||||
status_code = attrs.get("http.status_code", "")
|
||||
|
||||
# Check for non-2xx status codes
|
||||
if status_code and status_code.isdigit():
|
||||
code = int(status_code)
|
||||
if code >= 400:
|
||||
return True, status_code, _error_description(status_code)
|
||||
|
||||
# Check for explicit error attributes
|
||||
if "error.message" in attrs:
|
||||
return True, status_code or "unknown", attrs["error.message"]
|
||||
if "exception.message" in attrs:
|
||||
return True, status_code or "unknown", attrs["exception.message"]
|
||||
|
||||
return False, "", ""
|
||||
|
||||
|
||||
# Attributes to show for inbound/outbound spans when not verbose (trimmed view).
|
||||
_INBOUND_OUTBOUND_ATTR_KEYS = (
|
||||
"http.method",
|
||||
|
|
@ -621,10 +724,20 @@ def _trim_attrs_for_display(
|
|||
|
||||
|
||||
def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
|
||||
# Error attributes always come first
|
||||
error_priority = [
|
||||
"http.status_code",
|
||||
"error.type",
|
||||
"error.message",
|
||||
"error.stack",
|
||||
"exception.type",
|
||||
"exception.message",
|
||||
]
|
||||
|
||||
# Then regular priority attributes
|
||||
priority = [
|
||||
"http.method",
|
||||
"http.target",
|
||||
"http.status_code",
|
||||
"guid:x-request-id",
|
||||
"request_size",
|
||||
"response_size",
|
||||
|
|
@ -641,7 +754,10 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
|
|||
"llm.duration_ms",
|
||||
"llm.response_bytes",
|
||||
]
|
||||
prioritized = [(k, attrs[k]) for k in priority if k in attrs]
|
||||
|
||||
# Combine error priority with regular priority
|
||||
full_priority = error_priority + priority
|
||||
prioritized = [(k, attrs[k]) for k in full_priority if k in attrs]
|
||||
prioritized_keys = {k for k, _ in prioritized}
|
||||
remaining = [(k, v) for k, v in attrs.items() if k not in prioritized_keys]
|
||||
remaining.sort(key=lambda item: item[0])
|
||||
|
|
@ -649,8 +765,14 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
|
|||
|
||||
|
||||
def _display_attr_value(key: str, value: str) -> str:
|
||||
if key == "http.status_code" and value != "200":
|
||||
return f"{value} ⚠️"
|
||||
if key == "http.status_code":
|
||||
if value.isdigit():
|
||||
code = int(value)
|
||||
if code >= 400:
|
||||
return f"{value} {_error_symbol(value)}"
|
||||
elif code >= 200 and code < 300:
|
||||
return f"{value}"
|
||||
return value
|
||||
return value
|
||||
|
||||
|
||||
|
|
@ -670,7 +792,7 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
|
|||
)
|
||||
|
||||
spans.sort(key=lambda s: _span_time_ns(s, "startTimeUnixNano"))
|
||||
tree = Tree("", guide_style="dim")
|
||||
tree = Tree("", guide_style="dim #5b5a5c bold")
|
||||
|
||||
for span in spans:
|
||||
service = span.get("service", "plano(unknown)")
|
||||
|
|
@ -678,22 +800,52 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
|
|||
offset_ms = max(
|
||||
0, (_span_time_ns(span, "startTimeUnixNano") - start_ns) / 1_000_000
|
||||
)
|
||||
color = _service_color(service)
|
||||
label = Text(f"{offset_ms:.0f}ms ", style="yellow")
|
||||
label.append(service, style=f"bold {color}")
|
||||
if name:
|
||||
label.append(f" {name}", style="dim white")
|
||||
|
||||
# Check for errors in this span
|
||||
has_error, error_code, error_desc = _detect_error(span)
|
||||
|
||||
if has_error:
|
||||
# Create error banner above the span
|
||||
error_banner = Text()
|
||||
error_banner.append(error_desc, style="bright_red")
|
||||
tree.add(error_banner)
|
||||
|
||||
# Style the span label in light red
|
||||
label = Text(f"{offset_ms:.0f}ms ", style="#ff6b6b")
|
||||
label.append(service, style="bold #ff6b6b")
|
||||
if name:
|
||||
label.append(f" {name}", style="#ff6b6b italic")
|
||||
else:
|
||||
# Normal styling
|
||||
color = _service_color(service)
|
||||
label = Text(f"{offset_ms:.0f}ms ", style="#949c99")
|
||||
label.append(service, style=f"bold {color}")
|
||||
if name:
|
||||
label.append(f" {name}", style="dim white bold italic")
|
||||
|
||||
node = tree.add(label)
|
||||
attrs = _trim_attrs_for_display(_attrs(span), service, verbose)
|
||||
sorted_items = list(_sorted_attr_items(attrs))
|
||||
for idx, (key, value) in enumerate(sorted_items):
|
||||
attr_line = Text()
|
||||
attr_line.append(f"{key}: ", style="white")
|
||||
attr_line.append(
|
||||
_display_attr_value(key, str(value)),
|
||||
style=f"{PLANO_COLOR}",
|
||||
)
|
||||
# attribute key
|
||||
attr_line.append(f"{key}: ", style="#a4a9aa")
|
||||
# attribute value
|
||||
if key == "http.status_code" and value.isdigit():
|
||||
val_int = int(value)
|
||||
val_style = "bold red" if val_int >= 400 else "green"
|
||||
attr_line.append(_display_attr_value(key, str(value)), style=val_style)
|
||||
elif key in [
|
||||
"error.message",
|
||||
"exception.message",
|
||||
"error.type",
|
||||
"exception.type",
|
||||
]:
|
||||
attr_line.append(_display_attr_value(key, str(value)), style="red")
|
||||
else:
|
||||
attr_line.append(
|
||||
_display_attr_value(key, str(value)), style=f"{PLANO_COLOR} bold"
|
||||
)
|
||||
if idx == len(sorted_items) - 1:
|
||||
attr_line.append("\n")
|
||||
node.add(attr_line)
|
||||
|
|
@ -904,7 +1056,7 @@ def _run_trace_show(
|
|||
_build_tree(trace_obj, console, verbose=verbose)
|
||||
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@click.command()
|
||||
@click.argument("target", required=False)
|
||||
@click.option(
|
||||
"--filter",
|
||||
|
|
@ -950,9 +1102,8 @@ def trace(
|
|||
verbose,
|
||||
):
|
||||
"""Trace requests from the local OTLP listener."""
|
||||
if ctx.invoked_subcommand:
|
||||
return
|
||||
if target == "listen" and not any(
|
||||
# Handle operational shortcuts when invoked as target values.
|
||||
has_show_options = any(
|
||||
[
|
||||
filter_patterns,
|
||||
where_filters,
|
||||
|
|
@ -963,9 +1114,20 @@ def trace(
|
|||
json_out,
|
||||
verbose,
|
||||
]
|
||||
):
|
||||
)
|
||||
|
||||
if target == "listen" and not has_show_options:
|
||||
_start_trace_listener("0.0.0.0", DEFAULT_GRPC_PORT)
|
||||
return
|
||||
|
||||
if target in ("stop", "down") and not has_show_options:
|
||||
console = Console()
|
||||
if _stop_background_listener():
|
||||
console.print(f"[green]✓[/green] Trace listener stopped")
|
||||
else:
|
||||
console.print(f"[dim]No background trace listener running[/dim]")
|
||||
return
|
||||
|
||||
_run_trace_show(
|
||||
target,
|
||||
filter_patterns,
|
||||
|
|
@ -977,17 +1139,3 @@ def trace(
|
|||
json_out,
|
||||
verbose,
|
||||
)
|
||||
|
||||
|
||||
@trace.command("listen")
|
||||
@click.option("--host", default="0.0.0.0", show_default=True)
|
||||
@click.option(
|
||||
"--port",
|
||||
type=int,
|
||||
default=DEFAULT_GRPC_PORT,
|
||||
show_default=True,
|
||||
help="gRPC port for receiving OTLP traces.",
|
||||
)
|
||||
def trace_listen(host: str, port: int) -> None:
|
||||
"""Listen for OTLP/gRPC traces."""
|
||||
_start_trace_listener(host, port)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue