refactor: simplify trace listener initialization and remove debug mode handling

This commit is contained in:
Musa 2026-02-13 16:38:54 -08:00
parent 8396a86279
commit 9435509109
2 changed files with 50 additions and 121 deletions

View file

@ -484,12 +484,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
"""gRPC servicer that receives OTLP ExportTraceServiceRequest and
merges incoming spans into the global _TRACE_STORE by trace_id."""
_console = Console(stderr=True)
# we have to stream events otherwise the payload is too slow to process
def __init__(self, stream_events: bool = True) -> None:
self._stream_events = stream_events
def Export(self, request, context): # noqa: N802
for resource_spans in request.resource_spans:
service_name = "unknown"
@ -507,28 +501,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
continue
span_dict = _proto_span_to_dict(span, service_name)
_TRACE_STORE.merge_spans(trace_id, [span_dict])
short_id = trace_id[:8]
short_span = span.span_id.hex()[:8]
span_start = (
datetime.fromtimestamp(
span.start_time_unix_nano / 1_000_000_000, tz=timezone.utc
)
.astimezone()
.strftime("%H:%M:%S.%f")[:-3]
)
dur_ns = span.end_time_unix_nano - span.start_time_unix_nano
dur_s = dur_ns / 1_000_000_000
dur_str = f"{dur_s:.3f}".rstrip("0").rstrip(".")
dur_str = f"{dur_str}s"
if self._stream_events:
self._console.print(
f"[dim]{span_start}[/dim], "
f"trace=[yellow]{short_id}[/yellow], "
f"span=[yellow]{short_span}[/yellow], "
f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] "
f"[cyan]{span.name}[/cyan] "
f"[dim]({dur_str})[/dim]"
)
return trace_service_pb2.ExportTraceServiceResponse()
@ -551,9 +523,7 @@ class _TraceQueryHandler(grpc.GenericRpcHandler):
return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8")
def _create_trace_server(
host: str, grpc_port: int, stream_events: bool = False
) -> grpc.Server:
def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace-collection server.
Returns the running ``grpc.Server``. The caller is responsible
@ -564,35 +534,19 @@ def _create_trace_server(
handlers=[_TraceQueryHandler()],
)
trace_service_pb2_grpc.add_TraceServiceServicer_to_server(
_OTLPTraceServicer(stream_events=stream_events), grpc_server
_OTLPTraceServicer(), grpc_server
)
grpc_server.add_insecure_port(f"{host}:{grpc_port}")
grpc_server.start()
return grpc_server
def _start_trace_listener_foreground(
host: str, grpc_port: int, debug: bool = False
) -> None:
def _start_trace_listener_foreground(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener in foreground (blocking).
This is the actual server implementation that blocks until interrupted.
"""
console = Console()
grpc_server = _create_trace_server(host, grpc_port, stream_events=debug)
if debug:
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(
"[dim]Debug mode: streaming trace events. Press Ctrl+C to stop.[/dim]"
)
console.print()
grpc_server = _create_trace_server(host, grpc_port)
# Save PID for background mode
_save_listener_pid(os.getpid())
@ -600,21 +554,20 @@ def _start_trace_listener_foreground(
try:
grpc_server.wait_for_termination()
except KeyboardInterrupt:
if debug:
console.print("\n[dim]Stopping trace listener...[/dim]")
pass
finally:
grpc_server.stop(grace=2)
if os.path.exists(TRACE_LISTENER_PID_FILE):
os.remove(TRACE_LISTENER_PID_FILE)
def _start_trace_listener(host: str, grpc_port: int, debug: bool = False) -> None:
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener.
Args:
host: Host to bind to
grpc_port: Port to bind to
debug: If True, runs in foreground with event streaming. If False, daemonizes.
Starts listener as a background process.
"""
console = Console()
@ -633,44 +586,37 @@ def _start_trace_listener(host: str, grpc_port: int, debug: bool = False) -> Non
console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]")
return
if debug:
# Run in foreground with debug output
_start_trace_listener_foreground(host, grpc_port, debug=True)
else:
# Fork to background
pid = os.fork()
if pid > 0:
# Parent process
time.sleep(0.5) # Give child time to start
if _is_port_in_use(grpc_port):
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print(
"[dim]Use [cyan]planoai trace listen --debug[/cyan] to see live events.[/dim]"
)
console.print()
else:
console.print(f"[red]✗[/red] Failed to start trace listener")
return
# Fork to background
pid = os.fork()
if pid > 0:
# Parent process
time.sleep(0.5) # Give child time to start
if _is_port_in_use(grpc_port):
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print()
else:
# Child process - become daemon
os.setsid() # Create new session
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
if devnull > 2:
os.close(devnull)
_start_trace_listener_foreground(host, grpc_port, debug=False)
console.print(f"[red]✗[/red] Failed to start trace listener")
return
else:
# Child process - become daemon
os.setsid() # Create new session
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0)
os.dup2(devnull, 1)
os.dup2(devnull, 2)
if devnull > 2:
os.close(devnull)
_start_trace_listener_foreground(host, grpc_port)
def start_trace_listener_background(
@ -681,7 +627,7 @@ def start_trace_listener_background(
Returns the running ``grpc.Server`` so the caller can call
``server.stop()`` later.
"""
return _create_trace_server(host, grpc_port, stream_events=False)
return _create_trace_server(host, grpc_port)
def _span_time_ns(span: dict[str, Any], key: str) -> int:
@ -1185,11 +1131,6 @@ def _run_trace_show(
show_default=True,
help="gRPC port for receiving OTLP traces when target is 'listen'.",
)
@click.option(
"--debug",
is_flag=True,
help="When used with target 'listen', keep terminal active and stream trace events.",
)
@click.option(
"--verbose",
"-v",
@ -1209,7 +1150,6 @@ def trace(
json_out,
host,
port,
debug,
verbose,
):
"""Trace requests from the local OTLP listener."""
@ -1228,11 +1168,9 @@ def trace(
)
if target == "listen" and not has_show_options:
_start_trace_listener(host, port, debug=debug)
_start_trace_listener(host, port)
return
if debug and target != "listen":
raise click.ClickException("--debug is only valid with target 'listen'.")
if (host != "0.0.0.0" or port != DEFAULT_GRPC_PORT) and target != "listen":
raise click.ClickException("--host/--port are only valid with target 'listen'.")

View file

@ -67,40 +67,38 @@ def failure_traces() -> list[dict]:
return copy.deepcopy(_load_failure_traces())
def test_trace_listen_starts_listener_with_debug_and_custom_bind_after_target(
def test_trace_listen_starts_listener_with_custom_bind_after_target(
runner, monkeypatch
):
seen = {}
def fake_start(host: str, port: int, debug: bool) -> None:
def fake_start(host: str, port: int) -> None:
seen["host"] = host
seen["port"] = port
seen["debug"] = debug
monkeypatch.setattr(trace_cmd, "_start_trace_listener", fake_start)
result = runner.invoke(
trace, ["listen", "--host", "127.0.0.1", "--port", "9876", "--debug"]
)
result = runner.invoke(trace, ["listen", "--host", "127.0.0.1", "--port", "9876"])
assert result.exit_code == 0, result.output
assert seen == {"host": "127.0.0.1", "port": 9876, "debug": True}
assert seen == {"host": "127.0.0.1", "port": 9876}
def test_trace_listen_starts_listener_with_debug_before_target(runner, monkeypatch):
def test_trace_listen_starts_listener_with_custom_bind_before_target(
runner, monkeypatch
):
seen = {}
def fake_start(host: str, port: int, debug: bool) -> None:
def fake_start(host: str, port: int) -> None:
seen["host"] = host
seen["port"] = port
seen["debug"] = debug
monkeypatch.setattr(trace_cmd, "_start_trace_listener", fake_start)
result = runner.invoke(trace, ["--debug", "listen"])
result = runner.invoke(trace, ["--host", "127.0.0.1", "--port", "9876", "listen"])
assert result.exit_code == 0, result.output
assert seen == {"host": "0.0.0.0", "port": 4317, "debug": True}
assert seen == {"host": "127.0.0.1", "port": 9876}
def test_trace_down_prints_success_when_listener_stopped(runner, monkeypatch):
@ -121,13 +119,6 @@ def test_trace_down_prints_no_listener_when_not_running(runner, monkeypatch):
assert "No background trace listener running" in result.output
def test_trace_debug_requires_listen_target(runner):
result = runner.invoke(trace, ["--debug", "any"])
assert result.exit_code != 0
assert "--debug is only valid with target 'listen'." in _plain_output(result.output)
def test_trace_host_port_requires_listen_target(runner):
result = runner.invoke(trace, ["--host", "127.0.0.1", "any"])
@ -159,7 +150,7 @@ def test_trace_default_target_uses_last_and_builds_first_trace(
def test_trace_list_any_prints_short_trace_ids(runner, monkeypatch, traces):
monkeypatch.setattr(trace_cmd, "_fetch_traces_raw", lambda: copy.deepcopy(traces))
result = runner.invoke(trace, ["--list", "any"])
result = runner.invoke(trace, ["--list", "--no-interactive", "any"])
assert result.exit_code == 0, result.output
assert "Trace IDs:" in result.output