feat: add trace listener process management and foreground mode

This commit is contained in:
Musa 2026-02-12 15:34:47 -08:00
parent 3c8e899de3
commit 6131589277
5 changed files with 2676 additions and 62 deletions

View file

@ -1,7 +1,10 @@
import json
import os
import re
import signal
import string
import subprocess
import sys
import threading
import time
from collections import OrderedDict
@ -26,6 +29,7 @@ from planoai.consts import PLANO_COLOR
DEFAULT_GRPC_PORT = 4317
MAX_TRACES = 50
MAX_SPANS_PER_TRACE = 500
TRACE_LISTENER_PID_FILE = os.path.expanduser("~/.plano_trace_listener.pid")
@dataclass
@ -46,6 +50,60 @@ class TraceSummary:
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
def _is_port_in_use(port: int) -> bool:
"""Check whether a local TCP listener is accepting connections on a port."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
return s.connect_ex(("127.0.0.1", port)) == 0
def _save_listener_pid(pid: int) -> None:
"""Save the listener process PID to a file."""
with open(TRACE_LISTENER_PID_FILE, "w") as f:
f.write(str(pid))
def _get_listener_pid() -> int | None:
"""Get the saved listener process PID, if it exists and is running."""
if not os.path.exists(TRACE_LISTENER_PID_FILE):
return None
try:
with open(TRACE_LISTENER_PID_FILE, "r") as f:
pid = int(f.read().strip())
# Check if process is still running
os.kill(pid, 0) # Doesn't actually kill, just checks if process exists
return pid
except (ValueError, ProcessLookupError, OSError):
# PID file is invalid or process doesn't exist
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
return None
def _stop_background_listener() -> bool:
"""Stop the background listener process if it's running."""
pid = _get_listener_pid()
if pid is None:
return False
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.5) # Give it time to shut down gracefully
try:
os.kill(pid, 0) # Check if still running
os.kill(pid, signal.SIGKILL) # Force kill if still alive
except ProcessLookupError:
pass # Already dead
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
return True
except ProcessLookupError:
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
return False
def _parse_filter_patterns(filter_patterns: tuple[str, ...]) -> list[str]:
parts: list[str] = []
for raw in filter_patterns:
@ -428,6 +486,10 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
_console = Console(stderr=True)
# we have to stream events otherwise the payload is too slow to process
def __init__(self, stream_events: bool = True) -> None:
self._stream_events = stream_events
def Export(self, request, context): # noqa: N802
for resource_spans in request.resource_spans:
service_name = "unknown"
@ -458,14 +520,15 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
dur_s = dur_ns / 1_000_000_000
dur_str = f"{dur_s:.3f}".rstrip("0").rstrip(".")
dur_str = f"{dur_str}s"
self._console.print(
f"[dim]{span_start}[/dim], "
f"trace=[yellow]{short_id}[/yellow], "
f"span=[yellow]{short_span}[/yellow], "
f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] "
f"[cyan]{span.name}[/cyan] "
f"[dim]({dur_str})[/dim]"
)
if self._stream_events:
self._console.print(
f"[dim]{span_start}[/dim], "
f"trace=[yellow]{short_id}[/yellow], "
f"span=[yellow]{short_span}[/yellow], "
f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] "
f"[cyan]{span.name}[/cyan] "
f"[dim]({dur_str})[/dim]"
)
return trace_service_pb2.ExportTraceServiceResponse()
@ -488,7 +551,9 @@ class _TraceQueryHandler(grpc.GenericRpcHandler):
return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8")
def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
def _create_trace_server(
host: str, grpc_port: int, stream_events: bool = False
) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace-collection server.
Returns the running ``grpc.Server``. The caller is responsible
@ -499,31 +564,113 @@ def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
handlers=[_TraceQueryHandler()],
)
trace_service_pb2_grpc.add_TraceServiceServicer_to_server(
_OTLPTraceServicer(), grpc_server
_OTLPTraceServicer(stream_events=stream_events), grpc_server
)
grpc_server.add_insecure_port(f"{host}:{grpc_port}")
grpc_server.start()
return grpc_server
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener and block until interrupted."""
console = Console()
grpc_server = _create_trace_server(host, grpc_port)
def _start_trace_listener_foreground(
host: str, grpc_port: int, debug: bool = False
) -> None:
"""Start the OTLP/gRPC listener in foreground (blocking).
This is the actual server implementation that blocks until interrupted.
"""
console = Console()
grpc_server = _create_trace_server(host, grpc_port, stream_events=debug)
if debug:
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(
"[dim]Debug mode: streaming trace events. Press Ctrl+C to stop.[/dim]"
)
console.print()
# Save PID for background mode
_save_listener_pid(os.getpid())
console.print()
console.print(f"[bold {PLANO_COLOR}]Listening for traces...[/bold {PLANO_COLOR}]")
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print("[dim]Press Ctrl+C to stop.[/dim]")
console.print()
try:
grpc_server.wait_for_termination()
except KeyboardInterrupt:
pass
if debug:
console.print("\n[dim]Stopping trace listener...[/dim]")
finally:
grpc_server.stop(grace=2)
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
def _start_trace_listener(host: str, grpc_port: int, debug: bool = False) -> None:
"""Start the OTLP/gRPC listener.
Args:
host: Host to bind to
grpc_port: Port to bind to
debug: If True, runs in foreground with event streaming. If False, daemonizes.
"""
console = Console()
# Check if port is already in use
if _is_port_in_use(grpc_port):
existing_pid = _get_listener_pid()
if existing_pid:
console.print(
f"[yellow]⚠[/yellow] Trace listener already running on port [cyan]{grpc_port}[/cyan] (PID: {existing_pid})"
)
else:
console.print(
f"[red]✗[/red] Port [cyan]{grpc_port}[/cyan] is already in use by another process"
)
console.print(f"\n[dim]Check what's using the port:[/dim]")
console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]")
return
if debug:
# Run in foreground with debug output
_start_trace_listener_foreground(host, grpc_port, debug=True)
else:
# Fork to background
pid = os.fork()
if pid > 0:
# Parent process
time.sleep(0.5) # Give child time to start
if _is_port_in_use(grpc_port):
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print(
"[dim]Use [cyan]planoai trace listen --debug[/cyan] to see live events.[/dim]"
)
console.print()
else:
console.print(f"[red]✗[/red] Failed to start trace listener")
return
else:
# Child process - become daemon
os.setsid() # Create new session
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
if devnull > 2:
os.close(devnull)
_start_trace_listener_foreground(host, grpc_port, debug=False)
def start_trace_listener_background(
@ -534,7 +681,7 @@ def start_trace_listener_background(
Returns the running ``grpc.Server`` so the caller can call
``server.stop()`` later.
"""
return _create_trace_server(host, grpc_port)
return _create_trace_server(host, grpc_port, stream_events=False)
def _span_time_ns(span: dict[str, Any], key: str) -> int:
@ -562,13 +709,13 @@ def _trace_summary(trace: dict[str, Any]) -> TraceSummary:
def _service_color(service: str) -> str:
service = service.lower()
if "inbound" in service:
return "white"
return "#4860fa"
if "outbound" in service:
return "white"
return "#57d9a9"
if "orchestrator" in service:
return PLANO_COLOR
if "routing" in service:
return "magenta"
return "#e3a2fa"
if "agent" in service:
return "cyan"
if "llm" in service:
@ -576,6 +723,79 @@ def _service_color(service: str) -> str:
return "white"
def _error_symbol(status_code: str) -> str:
code = int(status_code) if status_code.isdigit() else 0
if code >= 500:
return "💥" # Server error - something broke
elif code == 429:
return "🚦" # Rate limited
elif code == 404:
return "🔍" # Not found
elif code == 403:
return "🚫" # Forbidden
elif code == 401:
return "🔐" # Unauthorized
elif code >= 400:
return "⚠️" # Client error
else:
return "" # Generic error
def _error_description(status_code: str) -> str:
"""Return a developer-friendly description of the error."""
code = int(status_code) if status_code.isdigit() else 0
if code == 500:
return "Internal Server Error"
elif code == 502:
return "Bad Gateway"
elif code == 503:
return "Service Unavailable"
elif code == 504:
return "Gateway Timeout"
elif code >= 500:
return "Server Error"
elif code == 429:
return "Rate Limit Exceeded"
elif code == 404:
return "Not Found"
elif code == 403:
return "Forbidden - Access Denied"
elif code == 401:
return "Unauthorized - Auth Required"
elif code == 400:
return "Bad Request"
elif code >= 400:
return "Client Error"
else:
return "Error"
def _detect_error(span: dict[str, Any]) -> tuple[bool, str, str]:
"""Detect if span has an error and return (has_error, status_code, error_msg).
Returns:
tuple: (has_error, status_code, error_description)
"""
attrs = _attrs(span)
status_code = attrs.get("http.status_code", "")
# Check for non-2xx status codes
if status_code and status_code.isdigit():
code = int(status_code)
if code >= 400:
return True, status_code, _error_description(status_code)
# Check for explicit error attributes
if "error.message" in attrs:
return True, status_code or "unknown", attrs["error.message"]
if "exception.message" in attrs:
return True, status_code or "unknown", attrs["exception.message"]
return False, "", ""
# Attributes to show for inbound/outbound spans when not verbose (trimmed view).
_INBOUND_OUTBOUND_ATTR_KEYS = (
"http.method",
@ -599,10 +819,20 @@ def _trim_attrs_for_display(
def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
# Error attributes always come first
error_priority = [
"http.status_code",
"error.type",
"error.message",
"error.stack",
"exception.type",
"exception.message",
]
# Then regular priority attributes
priority = [
"http.method",
"http.target",
"http.status_code",
"guid:x-request-id",
"request_size",
"response_size",
@ -619,7 +849,10 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
"llm.duration_ms",
"llm.response_bytes",
]
prioritized = [(k, attrs[k]) for k in priority if k in attrs]
# Combine error priority with regular priority
full_priority = error_priority + priority
prioritized = [(k, attrs[k]) for k in full_priority if k in attrs]
prioritized_keys = {k for k, _ in prioritized}
remaining = [(k, v) for k, v in attrs.items() if k not in prioritized_keys]
remaining.sort(key=lambda item: item[0])
@ -627,8 +860,14 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
def _display_attr_value(key: str, value: str) -> str:
if key == "http.status_code" and value != "200":
return f"{value} ⚠️"
if key == "http.status_code":
if value.isdigit():
code = int(value)
if code >= 400:
return f"{value} {_error_symbol(value)}"
elif code >= 200 and code < 300:
return f"{value}"
return value
return value
@ -648,7 +887,7 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
)
spans.sort(key=lambda s: _span_time_ns(s, "startTimeUnixNano"))
tree = Tree("", guide_style="dim")
tree = Tree("", guide_style="dim #5b5a5c bold")
for span in spans:
service = span.get("service", "plano(unknown)")
@ -656,22 +895,52 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
offset_ms = max(
0, (_span_time_ns(span, "startTimeUnixNano") - start_ns) / 1_000_000
)
color = _service_color(service)
label = Text(f"{offset_ms:.0f}ms ", style="yellow")
label.append(service, style=f"bold {color}")
if name:
label.append(f" {name}", style="dim white")
# Check for errors in this span
has_error, error_code, error_desc = _detect_error(span)
if has_error:
# Create error banner above the span
error_banner = Text()
error_banner.append(error_desc, style="bright_red")
tree.add(error_banner)
# Style the span label in light red
label = Text(f"{offset_ms:.0f}ms ", style="#ff6b6b")
label.append(service, style="bold #ff6b6b")
if name:
label.append(f" {name}", style="#ff6b6b italic")
else:
# Normal styling
color = _service_color(service)
label = Text(f"{offset_ms:.0f}ms ", style="#949c99")
label.append(service, style=f"bold {color}")
if name:
label.append(f" {name}", style="dim white bold italic")
node = tree.add(label)
attrs = _trim_attrs_for_display(_attrs(span), service, verbose)
sorted_items = list(_sorted_attr_items(attrs))
for idx, (key, value) in enumerate(sorted_items):
attr_line = Text()
attr_line.append(f"{key}: ", style="white")
attr_line.append(
_display_attr_value(key, str(value)),
style=f"{PLANO_COLOR}",
)
# attribute key
attr_line.append(f"{key}: ", style="#a4a9aa")
# attribute value
if key == "http.status_code" and value.isdigit():
val_int = int(value)
val_style = "bold red" if val_int >= 400 else "green"
attr_line.append(_display_attr_value(key, str(value)), style=val_style)
elif key in [
"error.message",
"exception.message",
"error.type",
"exception.type",
]:
attr_line.append(_display_attr_value(key, str(value)), style="red")
else:
attr_line.append(
_display_attr_value(key, str(value)), style=f"{PLANO_COLOR} bold"
)
if idx == len(sorted_items) - 1:
attr_line.append("\n")
node.add(attr_line)
@ -882,7 +1151,7 @@ def _run_trace_show(
_build_tree(trace_obj, console, verbose=verbose)
@click.group(invoke_without_command=True)
@click.command()
@click.argument("target", required=False)
@click.option(
"--filter",
@ -908,6 +1177,19 @@ def _run_trace_show(
@click.option("--limit", type=int, default=None, help="Limit results.")
@click.option("--since", default=None, help="Look back window (e.g. 5m, 2h, 1d).")
@click.option("--json", "json_out", is_flag=True, help="Output raw JSON.")
@click.option("--host", default="0.0.0.0", show_default=True)
@click.option(
"--port",
type=int,
default=DEFAULT_GRPC_PORT,
show_default=True,
help="gRPC port for receiving OTLP traces when target is 'listen'.",
)
@click.option(
"--debug",
is_flag=True,
help="When used with target 'listen', keep terminal active and stream trace events.",
)
@click.option(
"--verbose",
"-v",
@ -925,12 +1207,14 @@ def trace(
limit,
since,
json_out,
host,
port,
debug,
verbose,
):
"""Trace requests from the local OTLP listener."""
if ctx.invoked_subcommand:
return
if target == "listen" and not any(
# Handle operational shortcuts when invoked as target values.
has_show_options = any(
[
filter_patterns,
where_filters,
@ -941,9 +1225,25 @@ def trace(
json_out,
verbose,
]
):
_start_trace_listener("0.0.0.0", DEFAULT_GRPC_PORT)
)
if target == "listen" and not has_show_options:
_start_trace_listener(host, port, debug=debug)
return
if debug and target != "listen":
raise click.ClickException("--debug is only valid with target 'listen'.")
if (host != "0.0.0.0" or port != DEFAULT_GRPC_PORT) and target != "listen":
raise click.ClickException("--host/--port are only valid with target 'listen'.")
if target in ("stop", "down") and not has_show_options:
console = Console()
if _stop_background_listener():
console.print(f"[green]✓[/green] Trace listener stopped")
else:
console.print(f"[dim]No background trace listener running[/dim]")
return
_run_trace_show(
target,
filter_patterns,
@ -955,17 +1255,3 @@ def trace(
json_out,
verbose,
)
@trace.command("listen")
@click.option("--host", default="0.0.0.0", show_default=True)
@click.option(
"--port",
type=int,
default=DEFAULT_GRPC_PORT,
show_default=True,
help="gRPC port for receiving OTLP traces.",
)
def trace_listen(host: str, port: int) -> None:
"""Listen for OTLP/gRPC traces."""
_start_trace_listener(host, port)