refactor: integrate trace listener management into runtime module and streamline PID handling

This commit is contained in:
Musa 2026-02-13 20:03:57 -08:00
parent 81a62e1eed
commit 5b91703454
2 changed files with 164 additions and 102 deletions

View file

@ -1,7 +1,6 @@
import json
import os
import re
import signal
import string
import subprocess
import sys
@ -25,11 +24,11 @@ from rich.text import Text
from rich.tree import Tree
from planoai.consts import PLANO_COLOR
from planoai import trace_listener_runtime
DEFAULT_GRPC_PORT = 4317
MAX_TRACES = 50
MAX_SPANS_PER_TRACE = 500
TRACE_LISTENER_PID_FILE = os.path.expanduser("~/.plano/run/trace_listener.pid")
@dataclass
@ -59,50 +58,14 @@ def _is_port_in_use(port: int) -> bool:
return s.connect_ex(("127.0.0.1", port)) == 0
def _save_listener_pid(pid: int) -> None:
"""Save the listener process PID to a file."""
os.makedirs(os.path.dirname(TRACE_LISTENER_PID_FILE), exist_ok=True)
with open(TRACE_LISTENER_PID_FILE, "w") as f:
f.write(str(pid))
def _get_listener_pid() -> int | None:
"""Get the saved listener process PID, if it exists and is running."""
if not os.path.exists(TRACE_LISTENER_PID_FILE):
return None
try:
with open(TRACE_LISTENER_PID_FILE, "r") as f:
pid = int(f.read().strip())
# Check if process is still running
os.kill(pid, 0) # Doesn't actually kill, just checks if process exists
return pid
except (ValueError, ProcessLookupError, OSError):
# PID file is invalid or process doesn't exist
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
return None
"""Return persisted listener PID if process is alive."""
return trace_listener_runtime.get_listener_pid()
def _stop_background_listener() -> bool:
"""Stop the background listener process if it's running."""
pid = _get_listener_pid()
if pid is None:
return False
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.5) # Give it time to shut down gracefully
try:
os.kill(pid, 0) # Check if still running
os.kill(pid, signal.SIGKILL) # Force kill if still alive
except ProcessLookupError:
pass # Already dead
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
return True
except ProcessLookupError:
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
return False
"""Stop persisted listener process if one is running."""
return trace_listener_runtime.stop_listener_process()
def _parse_filter_patterns(filter_patterns: tuple[str, ...]) -> list[str]:
@ -524,12 +487,8 @@ class _TraceQueryHandler(grpc.GenericRpcHandler):
return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8")
def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace-collection server.
Returns the running ``grpc.Server``. The caller is responsible
for calling ``server.stop()`` when done.
"""
def _start_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace server."""
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4),
handlers=[_TraceQueryHandler()],
@ -542,44 +501,41 @@ def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
return grpc_server
def _start_trace_listener_foreground(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener in foreground (blocking).
This is the actual server implementation that blocks until interrupted.
"""
grpc_server = _create_trace_server(host, grpc_port)
# Save PID for background mode
_save_listener_pid(os.getpid())
def _serve_trace_listener(host: str, grpc_port: int) -> None:
"""Run the listener loop until process termination."""
# Persist PID immediately after fork, before server startup.
# This ensures the PID file exists even if server initialization fails.
trace_listener_runtime.write_listener_pid(os.getpid())
try:
grpc_server = _start_trace_server(host, grpc_port)
grpc_server.wait_for_termination()
except KeyboardInterrupt:
pass
finally:
grpc_server.stop(grace=2)
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
# Best-effort cleanup; server may not exist if startup failed.
try:
grpc_server.stop(grace=2)
except NameError:
pass
trace_listener_runtime.remove_listener_pid()
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener.
"""Start the OTLP/gRPC listener as a daemon process."""
Args:
host: Host to bind to
grpc_port: Port to bind to
Starts listener as a background process.
"""
console = Console()
# Check if port is already in use
# Check if the requested port is already in use.
if _is_port_in_use(grpc_port):
existing_pid = _get_listener_pid()
if existing_pid:
# If the process PID is known, inform user that our listener is already running.
console.print(
f"[yellow]⚠[/yellow] Trace listener already running on port [cyan]{grpc_port}[/cyan] (PID: {existing_pid})"
)
else:
# If port is taken but no tracked listener PID exists, warn user of unknown conflict.
console.print(
f"[red]✗[/red] Port [cyan]{grpc_port}[/cyan] is already in use by another process"
)
@ -587,48 +543,46 @@ def _start_trace_listener(host: str, grpc_port: int) -> None:
console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]")
return
# Fork to background
pid = os.fork()
if pid > 0:
# Parent process
time.sleep(0.5) # Give child time to start
if _is_port_in_use(grpc_port):
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print()
else:
console.print(f"[red]✗[/red] Failed to start trace listener")
# Fork/daemonize and run the trace server in the background.
try:
pid = trace_listener_runtime.daemonize_and_run(
lambda: _serve_trace_listener(host, grpc_port)
)
except OSError as e:
console.print(f"[red]✗[/red] Failed to start trace listener: {e}")
return
if pid is None:
# We're in the child process; daemonize_and_run never returns here.
return
# In the parent process: wait briefly for the background process to bind the port.
time.sleep(0.5) # Give child process time to start and bind to the port.
if _is_port_in_use(grpc_port):
# Success: the trace listener started and bound the port.
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print()
else:
# Child process - become daemon
os.setsid() # Create new session
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
if devnull > 2:
os.close(devnull)
_start_trace_listener_foreground(host, grpc_port)
# Failure: trace listener child process did not successfully start.
console.print(f"[red]✗[/red] Failed to start trace listener")
def start_trace_listener_background(
host: str = "0.0.0.0", grpc_port: int = DEFAULT_GRPC_PORT
) -> grpc.Server:
"""Start the trace listener in the background (non-blocking).
Returns the running ``grpc.Server`` so the caller can call
``server.stop()`` later.
"""
return _create_trace_server(host, grpc_port)
"""Start the trace server in-process and return ``grpc.Server`` handle."""
return _start_trace_server(host, grpc_port)
def _span_time_ns(span: dict[str, Any], key: str) -> int:

View file

@ -0,0 +1,108 @@
"""
Trace listener process runtime utilities.
"""
import os
import signal
import time
from collections.abc import Callable
# Canonical PID file used by `planoai trace listen/down`.
TRACE_LISTENER_PID_PATH = os.path.expanduser("~/.plano/run/trace_listener.pid")
def write_listener_pid(pid: int) -> None:
"""Persist listener PID for later management commands."""
# Ensure parent directory exists for first-time installs.
os.makedirs(os.path.dirname(TRACE_LISTENER_PID_PATH), exist_ok=True)
with open(TRACE_LISTENER_PID_PATH, "w") as f:
f.write(str(pid))
def remove_listener_pid() -> None:
"""Remove persisted listener PID file if present."""
# Best-effort cleanup; missing file is not an error.
if os.path.exists(TRACE_LISTENER_PID_PATH):
os.remove(TRACE_LISTENER_PID_PATH)
def get_listener_pid() -> int | None:
"""Return listener PID if present and process is alive."""
if not os.path.exists(TRACE_LISTENER_PID_PATH):
return None
try:
# Parse persisted PID.
with open(TRACE_LISTENER_PID_PATH, "r") as f:
pid = int(f.read().strip())
# Signal 0 performs liveness check without sending a real signal.
os.kill(pid, 0)
return pid
except (ValueError, ProcessLookupError, OSError):
# Stale or malformed PID file: clean it up to prevent repeated confusion.
remove_listener_pid()
return None
def stop_listener_process(grace_seconds: float = 0.5) -> bool:
"""Stop persisted listener process, returning True if one was stopped."""
pid = get_listener_pid()
if pid is None:
return False
try:
# Try graceful shutdown first.
os.kill(pid, signal.SIGTERM)
# Allow the process a short window to exit cleanly.
time.sleep(grace_seconds)
try:
# If still alive, force terminate.
os.kill(pid, 0)
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
# Already exited after SIGTERM.
pass
remove_listener_pid()
return True
except ProcessLookupError:
# Process disappeared between checks; treat as already stopped.
remove_listener_pid()
return False
def daemonize_and_run(run_forever: Callable[[], None]) -> int | None:
"""
Fork and detach process to create a Unix daemon.
Returns:
- Parent process: child PID (> 0), allowing caller to report startup.
- Child process: never returns; runs callback in daemon context until termination.
Raises:
- OSError: if fork fails (e.g., resource limits exceeded).
"""
# Duplicate current process. Raises OSError if fork fails.
pid = os.fork()
if pid > 0:
# Parent returns child PID to caller.
return pid
# Child: detach from controlling terminal/session.
# This prevents SIGHUP when parent terminal closes and ensures
# the daemon cannot reacquire a controlling terminal.
os.setsid()
# Redirect stdin/stdout/stderr to /dev/null so daemon is terminal-independent.
# This prevents broken pipe errors and ensures no output leaks to the parent terminal.
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0) # stdin
os.dup2(devnull, 1) # stdout
os.dup2(devnull, 2) # stderr
if devnull > 2:
os.close(devnull)
# Run the daemon main loop (expected to block until process termination).
run_forever()
# If callback unexpectedly returns, exit cleanly to avoid returning to parent context.
os._exit(0)