diff --git a/cli/planoai/trace_cmd.py b/cli/planoai/trace_cmd.py index b3f4cc78..e7d333f5 100644 --- a/cli/planoai/trace_cmd.py +++ b/cli/planoai/trace_cmd.py @@ -1,7 +1,6 @@ import json import os import re -import signal import string import subprocess import sys @@ -25,11 +24,11 @@ from rich.text import Text from rich.tree import Tree from planoai.consts import PLANO_COLOR +from planoai import trace_listener_runtime DEFAULT_GRPC_PORT = 4317 MAX_TRACES = 50 MAX_SPANS_PER_TRACE = 500 -TRACE_LISTENER_PID_FILE = os.path.expanduser("~/.plano/run/trace_listener.pid") @dataclass @@ -59,50 +58,14 @@ def _is_port_in_use(port: int) -> bool: return s.connect_ex(("127.0.0.1", port)) == 0 -def _save_listener_pid(pid: int) -> None: - """Save the listener process PID to a file.""" - os.makedirs(os.path.dirname(TRACE_LISTENER_PID_FILE), exist_ok=True) - with open(TRACE_LISTENER_PID_FILE, "w") as f: - f.write(str(pid)) - - def _get_listener_pid() -> int | None: - """Get the saved listener process PID, if it exists and is running.""" - if not os.path.exists(TRACE_LISTENER_PID_FILE): - return None - try: - with open(TRACE_LISTENER_PID_FILE, "r") as f: - pid = int(f.read().strip()) - # Check if process is still running - os.kill(pid, 0) # Doesn't actually kill, just checks if process exists - return pid - except (ValueError, ProcessLookupError, OSError): - # PID file is invalid or process doesn't exist - if os.path.exists(TRACE_LISTENER_PID_FILE): - os.remove(TRACE_LISTENER_PID_FILE) - return None + """Return persisted listener PID if process is alive.""" + return trace_listener_runtime.get_listener_pid() def _stop_background_listener() -> bool: - """Stop the background listener process if it's running.""" - pid = _get_listener_pid() - if pid is None: - return False - try: - os.kill(pid, signal.SIGTERM) - time.sleep(0.5) # Give it time to shut down gracefully - try: - os.kill(pid, 0) # Check if still running - os.kill(pid, signal.SIGKILL) # Force kill if still alive - except ProcessLookupError: - pass # Already dead - if os.path.exists(TRACE_LISTENER_PID_FILE): - os.remove(TRACE_LISTENER_PID_FILE) - return True - except ProcessLookupError: - if os.path.exists(TRACE_LISTENER_PID_FILE): - os.remove(TRACE_LISTENER_PID_FILE) - return False + """Stop persisted listener process if one is running.""" + return trace_listener_runtime.stop_listener_process() def _parse_filter_patterns(filter_patterns: tuple[str, ...]) -> list[str]: @@ -524,12 +487,8 @@ class _TraceQueryHandler(grpc.GenericRpcHandler): return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8") -def _create_trace_server(host: str, grpc_port: int) -> grpc.Server: - """Create, bind, and start an OTLP/gRPC trace-collection server. - - Returns the running ``grpc.Server``. The caller is responsible - for calling ``server.stop()`` when done. - """ +def _start_trace_server(host: str, grpc_port: int) -> grpc.Server: + """Create, bind, and start an OTLP/gRPC trace server.""" grpc_server = grpc.server( futures.ThreadPoolExecutor(max_workers=4), handlers=[_TraceQueryHandler()], @@ -542,44 +501,41 @@ def _create_trace_server(host: str, grpc_port: int) -> grpc.Server: return grpc_server -def _start_trace_listener_foreground(host: str, grpc_port: int) -> None: - """Start the OTLP/gRPC listener in foreground (blocking). - - This is the actual server implementation that blocks until interrupted. - """ - grpc_server = _create_trace_server(host, grpc_port) - - # Save PID for background mode - _save_listener_pid(os.getpid()) +def _serve_trace_listener(host: str, grpc_port: int) -> None: + """Run the listener loop until process termination.""" + # Persist PID immediately after fork, before server startup. + # This ensures the PID file exists even if server initialization fails. + trace_listener_runtime.write_listener_pid(os.getpid()) try: + grpc_server = _start_trace_server(host, grpc_port) grpc_server.wait_for_termination() except KeyboardInterrupt: pass finally: - grpc_server.stop(grace=2) - if os.path.exists(TRACE_LISTENER_PID_FILE): - os.remove(TRACE_LISTENER_PID_FILE) + # Best-effort cleanup; server may not exist if startup failed. + try: + grpc_server.stop(grace=2) + except NameError: + pass + trace_listener_runtime.remove_listener_pid() def _start_trace_listener(host: str, grpc_port: int) -> None: - """Start the OTLP/gRPC listener. + """Start the OTLP/gRPC listener as a daemon process.""" - Args: - host: Host to bind to - grpc_port: Port to bind to - Starts listener as a background process. - """ console = Console() - # Check if port is already in use + # Check if the requested port is already in use. if _is_port_in_use(grpc_port): existing_pid = _get_listener_pid() if existing_pid: + # If the process PID is known, inform user that our listener is already running. console.print( f"[yellow]⚠[/yellow] Trace listener already running on port [cyan]{grpc_port}[/cyan] (PID: {existing_pid})" ) else: + # If port is taken but no tracked listener PID exists, warn user of unknown conflict. console.print( f"[red]✗[/red] Port [cyan]{grpc_port}[/cyan] is already in use by another process" ) @@ -587,48 +543,46 @@ def _start_trace_listener(host: str, grpc_port: int) -> None: console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]") return - # Fork to background - pid = os.fork() - if pid > 0: - # Parent process - time.sleep(0.5) # Give child time to start - if _is_port_in_use(grpc_port): - console.print() - console.print( - f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]" - ) - console.print( - f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]" - ) - console.print(f"[dim]Process ID: {pid}[/dim]") - console.print( - "[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]" - ) - console.print() - else: - console.print(f"[red]✗[/red] Failed to start trace listener") + # Fork/daemonize and run the trace server in the background. + try: + pid = trace_listener_runtime.daemonize_and_run( + lambda: _serve_trace_listener(host, grpc_port) + ) + except OSError as e: + console.print(f"[red]✗[/red] Failed to start trace listener: {e}") return + + if pid is None: + # We're in the child process; daemonize_and_run never returns here. + return + + # In the parent process: wait briefly for the background process to bind the port. + time.sleep(0.5) # Give child process time to start and bind to the port. + + if _is_port_in_use(grpc_port): + # Success: the trace listener started and bound the port. + console.print() + console.print( + f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]" + ) + console.print( + f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]" + ) + console.print(f"[dim]Process ID: {pid}[/dim]") + console.print( + "[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]" + ) + console.print() else: - # Child process - become daemon - os.setsid() # Create new session - devnull = os.open(os.devnull, os.O_RDWR) - os.dup2(devnull, 0) - os.dup2(devnull, 1) - os.dup2(devnull, 2) - if devnull > 2: - os.close(devnull) - _start_trace_listener_foreground(host, grpc_port) + # Failure: trace listener child process did not successfully start. + console.print(f"[red]✗[/red] Failed to start trace listener") def start_trace_listener_background( host: str = "0.0.0.0", grpc_port: int = DEFAULT_GRPC_PORT ) -> grpc.Server: - """Start the trace listener in the background (non-blocking). - - Returns the running ``grpc.Server`` so the caller can call - ``server.stop()`` later. - """ - return _create_trace_server(host, grpc_port) + """Start the trace server in-process and return ``grpc.Server`` handle.""" + return _start_trace_server(host, grpc_port) def _span_time_ns(span: dict[str, Any], key: str) -> int: diff --git a/cli/planoai/trace_listener_runtime.py b/cli/planoai/trace_listener_runtime.py new file mode 100644 index 00000000..5ddeb6ee --- /dev/null +++ b/cli/planoai/trace_listener_runtime.py @@ -0,0 +1,108 @@ +""" +Trace listener process runtime utilities. +""" + +import os +import signal +import time +from collections.abc import Callable + +# Canonical PID file used by `planoai trace listen/down`. +TRACE_LISTENER_PID_PATH = os.path.expanduser("~/.plano/run/trace_listener.pid") + + +def write_listener_pid(pid: int) -> None: + """Persist listener PID for later management commands.""" + # Ensure parent directory exists for first-time installs. + os.makedirs(os.path.dirname(TRACE_LISTENER_PID_PATH), exist_ok=True) + with open(TRACE_LISTENER_PID_PATH, "w") as f: + f.write(str(pid)) + + +def remove_listener_pid() -> None: + """Remove persisted listener PID file if present.""" + # Best-effort cleanup; missing file is not an error. + if os.path.exists(TRACE_LISTENER_PID_PATH): + os.remove(TRACE_LISTENER_PID_PATH) + + +def get_listener_pid() -> int | None: + """Return listener PID if present and process is alive.""" + if not os.path.exists(TRACE_LISTENER_PID_PATH): + return None + + try: + # Parse persisted PID. + with open(TRACE_LISTENER_PID_PATH, "r") as f: + pid = int(f.read().strip()) + # Signal 0 performs liveness check without sending a real signal. + os.kill(pid, 0) + return pid + except (ValueError, ProcessLookupError, OSError): + # Stale or malformed PID file: clean it up to prevent repeated confusion. + remove_listener_pid() + return None + + +def stop_listener_process(grace_seconds: float = 0.5) -> bool: + """Stop persisted listener process, returning True if one was stopped.""" + pid = get_listener_pid() + if pid is None: + return False + + try: + # Try graceful shutdown first. + os.kill(pid, signal.SIGTERM) + # Allow the process a short window to exit cleanly. + time.sleep(grace_seconds) + try: + # If still alive, force terminate. + os.kill(pid, 0) + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + # Already exited after SIGTERM. + pass + remove_listener_pid() + return True + except ProcessLookupError: + # Process disappeared between checks; treat as already stopped. + remove_listener_pid() + return False + + +def daemonize_and_run(run_forever: Callable[[], None]) -> int | None: + """ + Fork and detach process to create a Unix daemon. + + Returns: + - Parent process: child PID (> 0), allowing caller to report startup. + - Child process: never returns; runs callback in daemon context until termination. + + Raises: + - OSError: if fork fails (e.g., resource limits exceeded). + """ + # Duplicate current process. Raises OSError if fork fails. + pid = os.fork() + if pid > 0: + # Parent returns child PID to caller. + return pid + + # Child: detach from controlling terminal/session. + # This prevents SIGHUP when parent terminal closes and ensures + # the daemon cannot reacquire a controlling terminal. + os.setsid() + + # Redirect stdin/stdout/stderr to /dev/null so daemon is terminal-independent. + # This prevents broken pipe errors and ensures no output leaks to the parent terminal. + devnull = os.open(os.devnull, os.O_RDWR) + os.dup2(devnull, 0) # stdin + os.dup2(devnull, 1) # stdout + os.dup2(devnull, 2) # stderr + if devnull > 2: + os.close(devnull) + + # Run the daemon main loop (expected to block until process termination). + run_forever() + + # If callback unexpectedly returns, exit cleanly to avoid returning to parent context. + os._exit(0)