From 94355091099fa4721819dd83e52afc1f50adfc24 Mon Sep 17 00:00:00 2001 From: Musa Date: Fri, 13 Feb 2026 16:38:54 -0800 Subject: [PATCH] refactor: simplify trace listener initialization and remove debug mode handling --- cli/planoai/trace_cmd.py | 140 +++++++++++-------------------------- cli/test/test_trace_cmd.py | 31 +++----- 2 files changed, 50 insertions(+), 121 deletions(-) diff --git a/cli/planoai/trace_cmd.py b/cli/planoai/trace_cmd.py index 6452b38e..382980e9 100644 --- a/cli/planoai/trace_cmd.py +++ b/cli/planoai/trace_cmd.py @@ -484,12 +484,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer): """gRPC servicer that receives OTLP ExportTraceServiceRequest and merges incoming spans into the global _TRACE_STORE by trace_id.""" - _console = Console(stderr=True) - - # we have to stream events otherwise the payload is too slow to process - def __init__(self, stream_events: bool = True) -> None: - self._stream_events = stream_events - def Export(self, request, context): # noqa: N802 for resource_spans in request.resource_spans: service_name = "unknown" @@ -507,28 +501,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer): continue span_dict = _proto_span_to_dict(span, service_name) _TRACE_STORE.merge_spans(trace_id, [span_dict]) - short_id = trace_id[:8] - short_span = span.span_id.hex()[:8] - span_start = ( - datetime.fromtimestamp( - span.start_time_unix_nano / 1_000_000_000, tz=timezone.utc - ) - .astimezone() - .strftime("%H:%M:%S.%f")[:-3] - ) - dur_ns = span.end_time_unix_nano - span.start_time_unix_nano - dur_s = dur_ns / 1_000_000_000 - dur_str = f"{dur_s:.3f}".rstrip("0").rstrip(".") - dur_str = f"{dur_str}s" - if self._stream_events: - self._console.print( - f"[dim]{span_start}[/dim], " - f"trace=[yellow]{short_id}[/yellow], " - f"span=[yellow]{short_span}[/yellow], " - f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] " - f"[cyan]{span.name}[/cyan] " - f"[dim]({dur_str})[/dim]" - ) return trace_service_pb2.ExportTraceServiceResponse() @@ -551,9 +523,7 @@ class _TraceQueryHandler(grpc.GenericRpcHandler): return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8") -def _create_trace_server( - host: str, grpc_port: int, stream_events: bool = False -) -> grpc.Server: +def _create_trace_server(host: str, grpc_port: int) -> grpc.Server: """Create, bind, and start an OTLP/gRPC trace-collection server. Returns the running ``grpc.Server``. The caller is responsible @@ -564,35 +534,19 @@ def _create_trace_server( handlers=[_TraceQueryHandler()], ) trace_service_pb2_grpc.add_TraceServiceServicer_to_server( - _OTLPTraceServicer(stream_events=stream_events), grpc_server + _OTLPTraceServicer(), grpc_server ) grpc_server.add_insecure_port(f"{host}:{grpc_port}") grpc_server.start() return grpc_server -def _start_trace_listener_foreground( - host: str, grpc_port: int, debug: bool = False -) -> None: +def _start_trace_listener_foreground(host: str, grpc_port: int) -> None: """Start the OTLP/gRPC listener in foreground (blocking). This is the actual server implementation that blocks until interrupted. """ - console = Console() - grpc_server = _create_trace_server(host, grpc_port, stream_events=debug) - - if debug: - console.print() - console.print( - f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]" - ) - console.print( - f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]" - ) - console.print( - "[dim]Debug mode: streaming trace events. Press Ctrl+C to stop.[/dim]" - ) - console.print() + grpc_server = _create_trace_server(host, grpc_port) # Save PID for background mode _save_listener_pid(os.getpid()) @@ -600,21 +554,20 @@ def _start_trace_listener_foreground( try: grpc_server.wait_for_termination() except KeyboardInterrupt: - if debug: - console.print("\n[dim]Stopping trace listener...[/dim]") + pass finally: grpc_server.stop(grace=2) if os.path.exists(TRACE_LISTENER_PID_FILE): os.remove(TRACE_LISTENER_PID_FILE) -def _start_trace_listener(host: str, grpc_port: int, debug: bool = False) -> None: +def _start_trace_listener(host: str, grpc_port: int) -> None: """Start the OTLP/gRPC listener. Args: host: Host to bind to grpc_port: Port to bind to - debug: If True, runs in foreground with event streaming. If False, daemonizes. + Starts listener as a background process. """ console = Console() @@ -633,44 +586,37 @@ def _start_trace_listener(host: str, grpc_port: int, debug: bool = False) -> Non console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]") return - if debug: - # Run in foreground with debug output - _start_trace_listener_foreground(host, grpc_port, debug=True) - else: - # Fork to background - pid = os.fork() - if pid > 0: - # Parent process - time.sleep(0.5) # Give child time to start - if _is_port_in_use(grpc_port): - console.print() - console.print( - f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]" - ) - console.print( - f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]" - ) - console.print(f"[dim]Process ID: {pid}[/dim]") - console.print( - "[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]" - ) - console.print( - "[dim]Use [cyan]planoai trace listen --debug[/cyan] to see live events.[/dim]" - ) - console.print() - else: - console.print(f"[red]✗[/red] Failed to start trace listener") - return + # Fork to background + pid = os.fork() + if pid > 0: + # Parent process + time.sleep(0.5) # Give child time to start + if _is_port_in_use(grpc_port): + console.print() + console.print( + f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]" + ) + console.print( + f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]" + ) + console.print(f"[dim]Process ID: {pid}[/dim]") + console.print( + "[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]" + ) + console.print() else: - # Child process - become daemon - os.setsid() # Create new session - devnull = os.open(os.devnull, os.O_RDWR) - os.dup2(devnull, 0) - os.dup2(devnull, 1) - os.dup2(devnull, 2) - if devnull > 2: - os.close(devnull) - _start_trace_listener_foreground(host, grpc_port, debug=False) + console.print(f"[red]✗[/red] Failed to start trace listener") + return + else: + # Child process - become daemon + os.setsid() # Create new session + devnull = os.open(os.devnull, os.O_RDWR) + os.dup2(devnull, 0) + os.dup2(devnull, 1) + os.dup2(devnull, 2) + if devnull > 2: + os.close(devnull) + _start_trace_listener_foreground(host, grpc_port) def start_trace_listener_background( @@ -681,7 +627,7 @@ def start_trace_listener_background( Returns the running ``grpc.Server`` so the caller can call ``server.stop()`` later. """ - return _create_trace_server(host, grpc_port, stream_events=False) + return _create_trace_server(host, grpc_port) def _span_time_ns(span: dict[str, Any], key: str) -> int: @@ -1185,11 +1131,6 @@ def _run_trace_show( show_default=True, help="gRPC port for receiving OTLP traces when target is 'listen'.", ) -@click.option( - "--debug", - is_flag=True, - help="When used with target 'listen', keep terminal active and stream trace events.", -) @click.option( "--verbose", "-v", @@ -1209,7 +1150,6 @@ def trace( json_out, host, port, - debug, verbose, ): """Trace requests from the local OTLP listener.""" @@ -1228,11 +1168,9 @@ def trace( ) if target == "listen" and not has_show_options: - _start_trace_listener(host, port, debug=debug) + _start_trace_listener(host, port) return - if debug and target != "listen": - raise click.ClickException("--debug is only valid with target 'listen'.") if (host != "0.0.0.0" or port != DEFAULT_GRPC_PORT) and target != "listen": raise click.ClickException("--host/--port are only valid with target 'listen'.") diff --git a/cli/test/test_trace_cmd.py b/cli/test/test_trace_cmd.py index 374848dd..b3eeec81 100644 --- a/cli/test/test_trace_cmd.py +++ b/cli/test/test_trace_cmd.py @@ -67,40 +67,38 @@ def failure_traces() -> list[dict]: return copy.deepcopy(_load_failure_traces()) -def test_trace_listen_starts_listener_with_debug_and_custom_bind_after_target( +def test_trace_listen_starts_listener_with_custom_bind_after_target( runner, monkeypatch ): seen = {} - def fake_start(host: str, port: int, debug: bool) -> None: + def fake_start(host: str, port: int) -> None: seen["host"] = host seen["port"] = port - seen["debug"] = debug monkeypatch.setattr(trace_cmd, "_start_trace_listener", fake_start) - result = runner.invoke( - trace, ["listen", "--host", "127.0.0.1", "--port", "9876", "--debug"] - ) + result = runner.invoke(trace, ["listen", "--host", "127.0.0.1", "--port", "9876"]) assert result.exit_code == 0, result.output - assert seen == {"host": "127.0.0.1", "port": 9876, "debug": True} + assert seen == {"host": "127.0.0.1", "port": 9876} -def test_trace_listen_starts_listener_with_debug_before_target(runner, monkeypatch): +def test_trace_listen_starts_listener_with_custom_bind_before_target( + runner, monkeypatch +): seen = {} - def fake_start(host: str, port: int, debug: bool) -> None: + def fake_start(host: str, port: int) -> None: seen["host"] = host seen["port"] = port - seen["debug"] = debug monkeypatch.setattr(trace_cmd, "_start_trace_listener", fake_start) - result = runner.invoke(trace, ["--debug", "listen"]) + result = runner.invoke(trace, ["--host", "127.0.0.1", "--port", "9876", "listen"]) assert result.exit_code == 0, result.output - assert seen == {"host": "0.0.0.0", "port": 4317, "debug": True} + assert seen == {"host": "127.0.0.1", "port": 9876} def test_trace_down_prints_success_when_listener_stopped(runner, monkeypatch): @@ -121,13 +119,6 @@ def test_trace_down_prints_no_listener_when_not_running(runner, monkeypatch): assert "No background trace listener running" in result.output -def test_trace_debug_requires_listen_target(runner): - result = runner.invoke(trace, ["--debug", "any"]) - - assert result.exit_code != 0 - assert "--debug is only valid with target 'listen'." in _plain_output(result.output) - - def test_trace_host_port_requires_listen_target(runner): result = runner.invoke(trace, ["--host", "127.0.0.1", "any"]) @@ -159,7 +150,7 @@ def test_trace_default_target_uses_last_and_builds_first_trace( def test_trace_list_any_prints_short_trace_ids(runner, monkeypatch, traces): monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces)) - result = runner.invoke(trace, ["--list", "any"]) + result = runner.invoke(trace, ["--list", "--no-interactive", "any"]) assert result.exit_code == 0, result.output assert "Trace IDs:" in result.output