merge main into model-listener-filter-chain

This commit is contained in:
Adil Hafeez 2026-03-10 06:52:19 +00:00
commit aeb8aa9a54
99 changed files with 5792 additions and 655 deletions

View file

@ -1,3 +1,3 @@
"""Plano CLI - Intelligent Prompt Gateway."""
__version__ = "0.4.8"
__version__ = "0.4.11"

View file

@ -5,5 +5,17 @@ PLANO_COLOR = "#969FF4"
SERVICE_NAME_ARCHGW = "plano"
PLANO_DOCKER_NAME = "plano"
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.8")
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT = "http://host.docker.internal:4317"
PLANO_DOCKER_IMAGE = os.getenv("PLANO_DOCKER_IMAGE", "katanemo/plano:0.4.11")
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317"
# Native mode constants
PLANO_HOME = os.path.join(os.path.expanduser("~"), ".plano")
PLANO_RUN_DIR = os.path.join(PLANO_HOME, "run")
PLANO_BIN_DIR = os.path.join(PLANO_HOME, "bin")
PLANO_PLUGINS_DIR = os.path.join(PLANO_HOME, "plugins")
ENVOY_VERSION = "v1.37.0" # keep in sync with Dockerfile ARG ENVOY_VERSION
NATIVE_PID_FILE = os.path.join(PLANO_RUN_DIR, "plano.pid")
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317"
PLANO_GITHUB_REPO = "katanemo/archgw"
PLANO_RELEASE_BASE_URL = f"https://github.com/{PLANO_GITHUB_REPO}/releases/download"

View file

@ -33,8 +33,11 @@ def _get_gateway_ports(plano_config_file: str) -> list[int]:
with open(plano_config_file) as f:
plano_config_dict = yaml.safe_load(f)
model_providers = plano_config_dict.get("llm_providers") or plano_config_dict.get(
"model_providers"
)
listeners, _, _ = convert_legacy_listeners(
plano_config_dict.get("listeners"), plano_config_dict.get("llm_providers")
plano_config_dict.get("listeners"), model_providers
)
all_ports = [listener.get("port") for listener in listeners]

View file

@ -40,11 +40,35 @@ def docker_remove_container(container: str) -> str:
return result.returncode
def _prepare_docker_config(plano_config_file: str) -> str:
"""Copy config to a temp file, replacing localhost with host.docker.internal.
Configs use localhost for native-first mode, but Docker containers need
host.docker.internal to reach services on the host.
"""
import tempfile
with open(plano_config_file, "r") as f:
content = f.read()
if "localhost" not in content:
return plano_config_file
content = content.replace("localhost", "host.docker.internal")
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", prefix="plano_config_", delete=False
)
tmp.write(content)
tmp.close()
return tmp.name
def docker_start_plano_detached(
plano_config_file: str,
env: dict,
gateway_ports: list[int],
) -> str:
docker_config = _prepare_docker_config(plano_config_file)
env_args = [item for key, value in env.items() for item in ["-e", f"{key}={value}"]]
port_mappings = [
@ -58,7 +82,7 @@ def docker_start_plano_detached(
port_mappings_args = [item for port in port_mappings for item in ("-p", port)]
volume_mappings = [
f"{plano_config_file}:/app/plano_config.yaml:ro",
f"{docker_config}:/app/plano_config.yaml:ro",
]
volume_mappings_args = [
item for volume in volume_mappings for item in ("-v", volume)

View file

@ -30,6 +30,7 @@ from planoai.init_cmd import init as init_cmd
from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_background
from planoai.consts import (
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT,
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT,
PLANO_DOCKER_IMAGE,
PLANO_DOCKER_NAME,
)
@ -130,7 +131,13 @@ def main(ctx, version):
@click.command()
def build():
@click.option(
"--docker",
default=False,
help="Build the Docker image instead of native binaries.",
is_flag=True,
)
def build(docker):
"""Build Plano from source. Works from any directory within the repo."""
# Find the repo root
@ -141,6 +148,68 @@ def build():
)
sys.exit(1)
if not docker:
import shutil
crates_dir = os.path.join(repo_root, "crates")
console = _console()
_print_cli_header(console)
if not shutil.which("cargo"):
console.print(
"[red]✗[/red] [bold]cargo[/bold] not found. "
"Install Rust: [cyan]https://rustup.rs[/cyan]"
)
sys.exit(1)
console.print("[dim]Building WASM plugins (wasm32-wasip1)...[/dim]")
try:
subprocess.run(
[
"cargo",
"build",
"--release",
"--target",
"wasm32-wasip1",
"-p",
"llm_gateway",
"-p",
"prompt_gateway",
],
cwd=crates_dir,
check=True,
)
log.info("WASM plugins built")
except subprocess.CalledProcessError as e:
console.print(f"[red]✗[/red] WASM build failed: {e}")
sys.exit(1)
console.print("[dim]Building brightstaff (native)...[/dim]")
try:
subprocess.run(
[
"cargo",
"build",
"--release",
"-p",
"brightstaff",
],
cwd=crates_dir,
check=True,
)
log.info("brightstaff built")
except subprocess.CalledProcessError as e:
console.print(f"[red]✗[/red] brightstaff build failed: {e}")
sys.exit(1)
wasm_dir = os.path.join(crates_dir, "target", "wasm32-wasip1", "release")
native_dir = os.path.join(crates_dir, "target", "release")
console.print(f"\n[bold]Build artifacts:[/bold]")
console.print(f" {os.path.join(wasm_dir, 'prompt_gateway.wasm')}")
console.print(f" {os.path.join(wasm_dir, 'llm_gateway.wasm')}")
console.print(f" {os.path.join(native_dir, 'brightstaff')}")
return
dockerfile_path = os.path.join(repo_root, "Dockerfile")
if not os.path.exists(dockerfile_path):
@ -192,7 +261,13 @@ def build():
help="Port for the OTLP trace collector (default: 4317).",
show_default=True,
)
def up(file, path, foreground, with_tracing, tracing_port):
@click.option(
"--docker",
default=False,
help="Run Plano inside Docker instead of natively.",
is_flag=True,
)
def up(file, path, foreground, with_tracing, tracing_port, docker):
"""Starts Plano."""
from rich.status import Status
@ -209,26 +284,51 @@ def up(file, path, foreground, with_tracing, tracing_port):
)
sys.exit(1)
with Status(
"[dim]Validating configuration[/dim]", spinner="dots", spinner_style="dim"
):
(
validation_return_code,
_,
validation_stderr,
) = docker_validate_plano_schema(plano_config_file)
if not docker:
from planoai.native_runner import native_validate_config
if validation_return_code != 0:
console.print(f"[red]✗[/red] Validation failed")
if validation_stderr:
console.print(f" [dim]{validation_stderr.strip()}[/dim]")
sys.exit(1)
with Status(
"[dim]Validating configuration[/dim]",
spinner="dots",
spinner_style="dim",
):
try:
native_validate_config(plano_config_file)
except SystemExit:
console.print(f"[red]✗[/red] Validation failed")
sys.exit(1)
except Exception as e:
console.print(f"[red]✗[/red] Validation failed")
console.print(f" [dim]{str(e).strip()}[/dim]")
sys.exit(1)
else:
with Status(
"[dim]Validating configuration (Docker)[/dim]",
spinner="dots",
spinner_style="dim",
):
(
validation_return_code,
_,
validation_stderr,
) = docker_validate_plano_schema(plano_config_file)
console.print(f"[green]✓[/green] Configuration valid")
if validation_return_code != 0:
console.print(f"[red]✗[/red] Validation failed")
if validation_stderr:
console.print(f" [dim]{validation_stderr.strip()}[/dim]")
sys.exit(1)
log.info("Configuration valid")
# Set up environment
default_otel = (
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT
if docker
else DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT
)
env_stage = {
"OTEL_TRACING_GRPC_ENDPOINT": DEFAULT_OTEL_TRACING_GRPC_ENDPOINT,
"OTEL_TRACING_GRPC_ENDPOINT": default_otel,
}
env = os.environ.copy()
env.pop("PATH", None)
@ -296,13 +396,20 @@ def up(file, path, foreground, with_tracing, tracing_port):
sys.exit(1)
# Update the OTEL endpoint so the gateway sends traces to the right port
env_stage[
"OTEL_TRACING_GRPC_ENDPOINT"
] = f"http://host.docker.internal:{tracing_port}"
tracing_host = "host.docker.internal" if docker else "localhost"
otel_endpoint = f"http://{tracing_host}:{tracing_port}"
env_stage["OTEL_TRACING_GRPC_ENDPOINT"] = otel_endpoint
env.update(env_stage)
try:
start_plano(plano_config_file, env, foreground=foreground)
if not docker:
from planoai.native_runner import start_native
start_native(
plano_config_file, env, foreground=foreground, with_tracing=with_tracing
)
else:
start_plano(plano_config_file, env, foreground=foreground)
# When tracing is enabled but --foreground is not, keep the process
# alive so the OTLP collector continues to receive spans.
@ -320,15 +427,31 @@ def up(file, path, foreground, with_tracing, tracing_port):
@click.command()
def down():
@click.option(
"--docker",
default=False,
help="Stop a Docker-based Plano instance.",
is_flag=True,
)
def down(docker):
"""Stops Plano."""
console = _console()
_print_cli_header(console)
with console.status(
f"[{PLANO_COLOR}]Shutting down Plano...[/{PLANO_COLOR}]", spinner="dots"
):
stop_docker_container()
if not docker:
from planoai.native_runner import stop_native
with console.status(
f"[{PLANO_COLOR}]Shutting down Plano...[/{PLANO_COLOR}]",
spinner="dots",
):
stop_native()
else:
with console.status(
f"[{PLANO_COLOR}]Shutting down Plano (Docker)...[/{PLANO_COLOR}]",
spinner="dots",
):
stop_docker_container()
@click.command()
@ -360,9 +483,21 @@ def generate_prompt_targets(file):
is_flag=True,
)
@click.option("--follow", help="Follow the logs", is_flag=True)
def logs(debug, follow):
@click.option(
"--docker",
default=False,
help="Stream logs from a Docker-based Plano instance.",
is_flag=True,
)
def logs(debug, follow, docker):
"""Stream logs from access logs services."""
if not docker:
from planoai.native_runner import native_logs
native_logs(debug=debug, follow=follow)
return
plano_process = None
try:
if debug:

View file

@ -0,0 +1,325 @@
import gzip
import os
import platform
import shutil
import sys
import tarfile
import tempfile
import planoai
from planoai.consts import (
ENVOY_VERSION,
PLANO_BIN_DIR,
PLANO_PLUGINS_DIR,
PLANO_RELEASE_BASE_URL,
)
from planoai.utils import find_repo_root, getLogger
log = getLogger(__name__)
def _get_platform_slug():
"""Return the platform slug for binary downloads."""
system = platform.system().lower()
machine = platform.machine().lower()
mapping = {
("linux", "x86_64"): "linux-amd64",
("linux", "aarch64"): "linux-arm64",
("darwin", "arm64"): "darwin-arm64",
}
slug = mapping.get((system, machine))
if slug is None:
if system == "darwin" and machine == "x86_64":
print(
"Error: macOS x86_64 (Intel) is not supported. "
"Pre-built binaries are only available for Apple Silicon (arm64)."
)
sys.exit(1)
print(
f"Error: Unsupported platform {system}/{machine}. "
"Supported platforms: linux-amd64, linux-arm64, darwin-arm64"
)
sys.exit(1)
return slug
def _download_file(url, dest, label=None):
"""Download a file from *url* to *dest* with a progress bar."""
import urllib.request
import urllib.error
if label is None:
label = os.path.basename(dest)
try:
response = urllib.request.urlopen(url)
total = int(response.headers.get("Content-Length", 0))
downloaded = 0
block_size = 64 * 1024
with open(dest, "wb") as f:
while True:
chunk = response.read(block_size)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total > 0:
pct = downloaded * 100 // total
bar_len = 30
filled = bar_len * downloaded // total
bar = "" * filled + "" * (bar_len - filled)
mb = downloaded / (1024 * 1024)
total_mb = total / (1024 * 1024)
print(
f"\r {label} {bar} {pct}% ({mb:.1f}/{total_mb:.1f} MB)",
end="",
flush=True,
)
print() # newline after progress bar
except urllib.error.URLError as e:
print(f"\nError downloading {label}: {e}")
print(f" URL: {url}")
print("Please check your internet connection and try again.")
sys.exit(1)
def ensure_envoy_binary():
"""Download Envoy binary if not already present or version changed. Returns path to binary."""
envoy_path = os.path.join(PLANO_BIN_DIR, "envoy")
version_path = os.path.join(PLANO_BIN_DIR, "envoy.version")
if os.path.exists(envoy_path) and os.access(envoy_path, os.X_OK):
# Check if cached binary matches the pinned version
if os.path.exists(version_path):
with open(version_path, "r") as f:
cached_version = f.read().strip()
if cached_version == ENVOY_VERSION:
log.info(f"Envoy {ENVOY_VERSION} (cached)")
return envoy_path
log.info(
f"Envoy version changed ({cached_version}{ENVOY_VERSION}), re-downloading..."
)
else:
log.info("Envoy binary found (unknown version, re-downloading...)")
slug = _get_platform_slug()
url = (
f"https://github.com/tetratelabs/archive-envoy/releases/download/"
f"{ENVOY_VERSION}/envoy-{ENVOY_VERSION}-{slug}.tar.xz"
)
os.makedirs(PLANO_BIN_DIR, exist_ok=True)
with tempfile.NamedTemporaryFile(suffix=".tar.xz", delete=False) as tmp:
tmp_path = tmp.name
try:
_download_file(url, tmp_path, label=f"Envoy {ENVOY_VERSION}")
log.info(f"Extracting Envoy {ENVOY_VERSION}...")
with tarfile.open(tmp_path, "r:xz") as tar:
# Find the envoy binary inside the archive
envoy_member = None
for member in tar.getmembers():
if member.name.endswith("/bin/envoy") or member.name == "bin/envoy":
envoy_member = member
break
if envoy_member is None:
print("Error: Could not find envoy binary in the downloaded archive.")
print("Archive contents:")
for member in tar.getmembers():
print(f" {member.name}")
sys.exit(1)
# Extract just the binary
f = tar.extractfile(envoy_member)
if f is None:
print("Error: Could not extract envoy binary from archive.")
sys.exit(1)
with open(envoy_path, "wb") as out:
out.write(f.read())
os.chmod(envoy_path, 0o755)
with open(version_path, "w") as f:
f.write(ENVOY_VERSION)
return envoy_path
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
def _find_local_wasm_plugins():
"""Check for WASM plugins built from source. Returns (prompt_gw, llm_gw) or None."""
repo_root = find_repo_root()
if not repo_root:
return None
wasm_dir = os.path.join(repo_root, "crates", "target", "wasm32-wasip1", "release")
prompt_gw = os.path.join(wasm_dir, "prompt_gateway.wasm")
llm_gw = os.path.join(wasm_dir, "llm_gateway.wasm")
if os.path.exists(prompt_gw) and os.path.exists(llm_gw):
return prompt_gw, llm_gw
return None
def _find_local_brightstaff():
"""Check for brightstaff binary built from source. Returns path or None."""
repo_root = find_repo_root()
if not repo_root:
return None
path = os.path.join(repo_root, "crates", "target", "release", "brightstaff")
if os.path.exists(path) and os.access(path, os.X_OK):
return path
return None
def ensure_wasm_plugins():
"""Find or download WASM plugins. Checks: local build → cached download → fresh download."""
# 1. Local source build (inside repo)
local = _find_local_wasm_plugins()
if local:
log.info("Using locally-built WASM plugins")
return local
# 2. Cached download
version = planoai.__version__
version_path = os.path.join(PLANO_PLUGINS_DIR, "wasm.version")
prompt_gw_path = os.path.join(PLANO_PLUGINS_DIR, "prompt_gateway.wasm")
llm_gw_path = os.path.join(PLANO_PLUGINS_DIR, "llm_gateway.wasm")
if os.path.exists(prompt_gw_path) and os.path.exists(llm_gw_path):
if os.path.exists(version_path):
with open(version_path, "r") as f:
cached_version = f.read().strip()
if cached_version == version:
log.info(f"WASM plugins {version} (cached)")
return prompt_gw_path, llm_gw_path
log.info(
f"WASM plugins version changed ({cached_version}{version}), re-downloading..."
)
else:
log.info("WASM plugins found (unknown version, re-downloading...)")
# 3. Download from GitHub releases (gzipped)
os.makedirs(PLANO_PLUGINS_DIR, exist_ok=True)
for name, dest in [
("prompt_gateway.wasm", prompt_gw_path),
("llm_gateway.wasm", llm_gw_path),
]:
gz_name = f"{name}.gz"
url = f"{PLANO_RELEASE_BASE_URL}/{version}/{gz_name}"
gz_dest = dest + ".gz"
_download_file(url, gz_dest, label=f"{name} ({version})")
log.info(f"Decompressing {name}...")
with gzip.open(gz_dest, "rb") as f_in, open(dest, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.unlink(gz_dest)
with open(version_path, "w") as f:
f.write(version)
return prompt_gw_path, llm_gw_path
def ensure_brightstaff_binary():
"""Find or download brightstaff binary. Checks: local build → cached download → fresh download."""
# 1. Local source build (inside repo)
local = _find_local_brightstaff()
if local:
log.info("Using locally-built brightstaff")
return local
# 2. Cached download
version = planoai.__version__
brightstaff_path = os.path.join(PLANO_BIN_DIR, "brightstaff")
version_path = os.path.join(PLANO_BIN_DIR, "brightstaff.version")
if os.path.exists(brightstaff_path) and os.access(brightstaff_path, os.X_OK):
if os.path.exists(version_path):
with open(version_path, "r") as f:
cached_version = f.read().strip()
if cached_version == version:
log.info(f"brightstaff {version} (cached)")
return brightstaff_path
log.info(
f"brightstaff version changed ({cached_version}{version}), re-downloading..."
)
else:
log.info("brightstaff found (unknown version, re-downloading...)")
# 3. Download from GitHub releases (gzipped)
slug = _get_platform_slug()
filename = f"brightstaff-{slug}.gz"
url = f"{PLANO_RELEASE_BASE_URL}/{version}/{filename}"
os.makedirs(PLANO_BIN_DIR, exist_ok=True)
gz_path = brightstaff_path + ".gz"
_download_file(url, gz_path, label=f"brightstaff ({version}, {slug})")
log.info("Decompressing brightstaff...")
with gzip.open(gz_path, "rb") as f_in, open(brightstaff_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.unlink(gz_path)
os.chmod(brightstaff_path, 0o755)
with open(version_path, "w") as f:
f.write(version)
return brightstaff_path
def find_wasm_plugins():
"""Find WASM plugin files built from source. Returns (prompt_gateway_path, llm_gateway_path)."""
repo_root = find_repo_root()
if not repo_root:
print(
"Error: Could not find repository root. "
"Make sure you're inside the plano repository."
)
sys.exit(1)
wasm_dir = os.path.join(repo_root, "crates", "target", "wasm32-wasip1", "release")
prompt_gw = os.path.join(wasm_dir, "prompt_gateway.wasm")
llm_gw = os.path.join(wasm_dir, "llm_gateway.wasm")
missing = []
if not os.path.exists(prompt_gw):
missing.append("prompt_gateway.wasm")
if not os.path.exists(llm_gw):
missing.append("llm_gateway.wasm")
if missing:
print(f"Error: WASM plugins not found: {', '.join(missing)}")
print(f" Expected at: {wasm_dir}/")
print(" Run 'planoai build' first to build them.")
sys.exit(1)
return prompt_gw, llm_gw
def find_brightstaff_binary():
"""Find the brightstaff binary built from source. Returns path."""
repo_root = find_repo_root()
if not repo_root:
print(
"Error: Could not find repository root. "
"Make sure you're inside the plano repository."
)
sys.exit(1)
brightstaff_path = os.path.join(
repo_root, "crates", "target", "release", "brightstaff"
)
if not os.path.exists(brightstaff_path):
print(f"Error: brightstaff binary not found at {brightstaff_path}")
print(" Run 'planoai build' first to build it.")
sys.exit(1)
return brightstaff_path

View file

@ -0,0 +1,463 @@
import contextlib
import io
import json
import os
import signal
import subprocess
import sys
import time
from planoai.consts import (
NATIVE_PID_FILE,
PLANO_RUN_DIR,
)
from planoai.docker_cli import health_check_endpoint
from planoai.native_binaries import (
ensure_brightstaff_binary,
ensure_envoy_binary,
ensure_wasm_plugins,
)
from planoai.utils import find_repo_root, getLogger
log = getLogger(__name__)
def _find_config_dir():
"""Locate the directory containing plano_config_schema.yaml and envoy.template.yaml.
Checks package data first (pip-installed), then falls back to the repo checkout.
"""
import planoai
pkg_data = os.path.join(os.path.dirname(planoai.__file__), "data")
if os.path.isdir(pkg_data) and os.path.exists(
os.path.join(pkg_data, "plano_config_schema.yaml")
):
return pkg_data
repo_root = find_repo_root()
if repo_root:
config_dir = os.path.join(repo_root, "config")
if os.path.isdir(config_dir):
return config_dir
print(
"Error: Could not find config templates. "
"Make sure you're inside the plano repository or have the planoai package installed."
)
sys.exit(1)
@contextlib.contextmanager
def _temporary_env(overrides):
"""Context manager that sets env vars from *overrides* and restores originals on exit."""
saved = {}
for key, value in overrides.items():
saved[key] = os.environ.get(key)
os.environ[key] = value
try:
yield
finally:
for key, original in saved.items():
if original is None:
os.environ.pop(key, None)
else:
os.environ[key] = original
def render_native_config(plano_config_file, env, with_tracing=False):
"""Render envoy and plano configs for native mode. Returns (envoy_config_path, plano_config_rendered_path)."""
import yaml
os.makedirs(PLANO_RUN_DIR, exist_ok=True)
prompt_gw_path, llm_gw_path = ensure_wasm_plugins()
# If --with-tracing, inject tracing config if not already present
effective_config_file = os.path.abspath(plano_config_file)
if with_tracing:
with open(plano_config_file, "r") as f:
config_data = yaml.safe_load(f)
tracing = config_data.get("tracing", {})
if not tracing.get("random_sampling"):
tracing["random_sampling"] = 100
config_data["tracing"] = tracing
effective_config_file = os.path.join(
PLANO_RUN_DIR, "config_with_tracing.yaml"
)
with open(effective_config_file, "w") as f:
yaml.dump(config_data, f, default_flow_style=False)
envoy_config_path = os.path.join(PLANO_RUN_DIR, "envoy.yaml")
plano_config_rendered_path = os.path.join(
PLANO_RUN_DIR, "plano_config_rendered.yaml"
)
# Set environment variables that config_generator.validate_and_render_schema() reads
config_dir = _find_config_dir()
overrides = {
"PLANO_CONFIG_FILE": effective_config_file,
"PLANO_CONFIG_SCHEMA_FILE": os.path.join(
config_dir, "plano_config_schema.yaml"
),
"TEMPLATE_ROOT": config_dir,
"ENVOY_CONFIG_TEMPLATE_FILE": "envoy.template.yaml",
"PLANO_CONFIG_FILE_RENDERED": plano_config_rendered_path,
"ENVOY_CONFIG_FILE_RENDERED": envoy_config_path,
}
# Also propagate caller env vars (API keys, OTEL endpoint, etc.)
for key, value in env.items():
if key not in overrides:
overrides[key] = value
with _temporary_env(overrides):
from planoai.config_generator import validate_and_render_schema
# Suppress verbose print output from config_generator
with contextlib.redirect_stdout(io.StringIO()):
validate_and_render_schema()
# Post-process envoy.yaml: replace Docker WASM plugin paths with local paths
with open(envoy_config_path, "r") as f:
envoy_content = f.read()
envoy_content = envoy_content.replace(
"/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm", prompt_gw_path
)
envoy_content = envoy_content.replace(
"/etc/envoy/proxy-wasm-plugins/llm_gateway.wasm", llm_gw_path
)
# Replace /var/log/ paths with local log directory (non-root friendly)
log_dir = os.path.join(PLANO_RUN_DIR, "logs")
os.makedirs(log_dir, exist_ok=True)
envoy_content = envoy_content.replace("/var/log/", log_dir + "/")
# Replace Linux CA cert path with platform-appropriate path
import platform
if platform.system() == "Darwin":
envoy_content = envoy_content.replace(
"/etc/ssl/certs/ca-certificates.crt", "/etc/ssl/cert.pem"
)
with open(envoy_config_path, "w") as f:
f.write(envoy_content)
# Run envsubst-equivalent on both rendered files using the caller's env
with _temporary_env(env):
for filepath in [envoy_config_path, plano_config_rendered_path]:
with open(filepath, "r") as f:
content = f.read()
content = os.path.expandvars(content)
with open(filepath, "w") as f:
f.write(content)
return envoy_config_path, plano_config_rendered_path
def start_native(plano_config_file, env, foreground=False, with_tracing=False):
"""Start Envoy and brightstaff natively."""
from planoai.core import _get_gateway_ports
# Stop any existing instance first
if os.path.exists(NATIVE_PID_FILE):
log.info("Stopping existing Plano instance...")
stop_native()
envoy_path = ensure_envoy_binary()
ensure_wasm_plugins()
brightstaff_path = ensure_brightstaff_binary()
envoy_config_path, plano_config_rendered_path = render_native_config(
plano_config_file, env, with_tracing=with_tracing
)
log.info("Configuration rendered")
log_dir = os.path.join(PLANO_RUN_DIR, "logs")
os.makedirs(log_dir, exist_ok=True)
log_level = env.get("LOG_LEVEL", "info")
# Start brightstaff
brightstaff_env = os.environ.copy()
brightstaff_env["RUST_LOG"] = log_level
brightstaff_env["PLANO_CONFIG_PATH_RENDERED"] = plano_config_rendered_path
# Propagate API keys and other env vars
for key, value in env.items():
brightstaff_env[key] = value
brightstaff_pid = _daemon_exec(
[brightstaff_path],
brightstaff_env,
os.path.join(log_dir, "brightstaff.log"),
)
log.info(f"Started brightstaff (PID {brightstaff_pid})")
# Start envoy
envoy_pid = _daemon_exec(
[
envoy_path,
"-c",
envoy_config_path,
"--component-log-level",
f"wasm:{log_level}",
"--log-format",
"[%Y-%m-%d %T.%e][%l] %v",
],
brightstaff_env,
os.path.join(log_dir, "envoy.log"),
)
log.info(f"Started envoy (PID {envoy_pid})")
# Save PIDs
os.makedirs(PLANO_RUN_DIR, exist_ok=True)
with open(NATIVE_PID_FILE, "w") as f:
json.dump(
{
"envoy_pid": envoy_pid,
"brightstaff_pid": brightstaff_pid,
},
f,
)
# Health check
gateway_ports = _get_gateway_ports(plano_config_file)
log.info("Waiting for listeners to become healthy...")
start_time = time.time()
timeout = 60
while True:
all_healthy = True
for port in gateway_ports:
if not health_check_endpoint(f"http://localhost:{port}/healthz"):
all_healthy = False
if all_healthy:
log.info("Plano is running (native mode)")
for port in gateway_ports:
log.info(f" http://localhost:{port}")
break
# Check if processes are still alive
if not _is_pid_alive(brightstaff_pid):
log.error("brightstaff exited unexpectedly")
log.error(f" Check logs: {os.path.join(log_dir, 'brightstaff.log')}")
_kill_pid(envoy_pid)
sys.exit(1)
if not _is_pid_alive(envoy_pid):
log.error("envoy exited unexpectedly")
log.error(f" Check logs: {os.path.join(log_dir, 'envoy.log')}")
_kill_pid(brightstaff_pid)
sys.exit(1)
if time.time() - start_time > timeout:
log.error(f"Health check timed out after {timeout}s")
log.error(f" Check logs in: {log_dir}")
stop_native()
sys.exit(1)
time.sleep(1)
if foreground:
log.info("Running in foreground. Press Ctrl+C to stop.")
log.info(f"Logs: {log_dir}")
try:
import glob
access_logs = sorted(glob.glob(os.path.join(log_dir, "access_*.log")))
tail_proc = subprocess.Popen(
[
"tail",
"-f",
os.path.join(log_dir, "envoy.log"),
os.path.join(log_dir, "brightstaff.log"),
]
+ access_logs,
stdout=sys.stdout,
stderr=sys.stderr,
)
tail_proc.wait()
except KeyboardInterrupt:
log.info("Stopping Plano...")
if tail_proc.poll() is None:
tail_proc.terminate()
stop_native()
else:
log.info(f"Logs: {log_dir}")
log.info("Run 'planoai down' to stop.")
def _daemon_exec(args, env, log_path):
"""Start a fully daemonized process via double-fork. Returns the child PID."""
log_fd = os.open(log_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644)
pid = os.fork()
if pid > 0:
# Parent: close our copy of the log fd and wait for intermediate child
os.close(log_fd)
os.waitpid(pid, 0)
# Read the grandchild PID from the pipe
grandchild_pid_path = os.path.join(PLANO_RUN_DIR, f".daemon_pid_{pid}")
deadline = time.time() + 5
while time.time() < deadline:
if os.path.exists(grandchild_pid_path):
with open(grandchild_pid_path, "r") as f:
grandchild_pid = int(f.read().strip())
os.unlink(grandchild_pid_path)
return grandchild_pid
time.sleep(0.05)
raise RuntimeError(f"Timed out waiting for daemon PID from {args[0]}")
# First child: create new session and fork again
os.setsid()
grandchild_pid = os.fork()
if grandchild_pid > 0:
# Intermediate child: write grandchild PID and exit
pid_path = os.path.join(PLANO_RUN_DIR, f".daemon_pid_{os.getpid()}")
with open(pid_path, "w") as f:
f.write(str(grandchild_pid))
os._exit(0)
# Grandchild: this is the actual daemon
os.dup2(log_fd, 1) # stdout -> log
os.dup2(log_fd, 2) # stderr -> log
os.close(log_fd)
# Close stdin
devnull = os.open(os.devnull, os.O_RDONLY)
os.dup2(devnull, 0)
os.close(devnull)
os.execve(args[0], args, env)
def _is_pid_alive(pid):
"""Check if a process with the given PID is still running."""
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
return True # Process exists but we can't signal it
def _kill_pid(pid):
"""Send SIGTERM to a PID, ignoring errors."""
try:
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, PermissionError):
pass
def stop_native():
"""Stop natively-running Envoy and brightstaff processes."""
if not os.path.exists(NATIVE_PID_FILE):
log.info("No native Plano instance found (PID file missing).")
return
with open(NATIVE_PID_FILE, "r") as f:
pids = json.load(f)
envoy_pid = pids.get("envoy_pid")
brightstaff_pid = pids.get("brightstaff_pid")
for name, pid in [("envoy", envoy_pid), ("brightstaff", brightstaff_pid)]:
if pid is None:
continue
try:
os.kill(pid, signal.SIGTERM)
log.info(f"Sent SIGTERM to {name} (PID {pid})")
except ProcessLookupError:
log.info(f"{name} (PID {pid}) already stopped")
continue
except PermissionError:
log.error(f"Permission denied stopping {name} (PID {pid})")
continue
# Wait for graceful shutdown
deadline = time.time() + 10
while time.time() < deadline:
try:
os.kill(pid, 0) # Check if still alive
time.sleep(0.5)
except ProcessLookupError:
break
else:
# Still alive after timeout, force kill
try:
os.kill(pid, signal.SIGKILL)
log.info(f"Sent SIGKILL to {name} (PID {pid})")
except ProcessLookupError:
pass
os.unlink(NATIVE_PID_FILE)
log.info("Plano stopped (native mode).")
def native_validate_config(plano_config_file):
"""Validate config in-process without Docker."""
config_dir = _find_config_dir()
# Create temp dir for rendered output (we just want validation)
os.makedirs(PLANO_RUN_DIR, exist_ok=True)
overrides = {
"PLANO_CONFIG_FILE": os.path.abspath(plano_config_file),
"PLANO_CONFIG_SCHEMA_FILE": os.path.join(
config_dir, "plano_config_schema.yaml"
),
"TEMPLATE_ROOT": config_dir,
"ENVOY_CONFIG_TEMPLATE_FILE": "envoy.template.yaml",
"PLANO_CONFIG_FILE_RENDERED": os.path.join(
PLANO_RUN_DIR, "plano_config_rendered.yaml"
),
"ENVOY_CONFIG_FILE_RENDERED": os.path.join(PLANO_RUN_DIR, "envoy.yaml"),
}
with _temporary_env(overrides):
from planoai.config_generator import validate_and_render_schema
# Suppress verbose print output from config_generator
with contextlib.redirect_stdout(io.StringIO()):
validate_and_render_schema()
def native_logs(debug=False, follow=False):
"""Stream logs from native-mode Plano."""
import glob as glob_mod
log_dir = os.path.join(PLANO_RUN_DIR, "logs")
if not os.path.isdir(log_dir):
log.error(f"No native log directory found at {log_dir}")
log.error("Is Plano running? Start it with: planoai up <config.yaml>")
sys.exit(1)
log_files = sorted(glob_mod.glob(os.path.join(log_dir, "access_*.log")))
if debug:
log_files.extend(
[
os.path.join(log_dir, "envoy.log"),
os.path.join(log_dir, "brightstaff.log"),
]
)
# Filter to files that exist
log_files = [f for f in log_files if os.path.exists(f)]
if not log_files:
log.error(f"No log files found in {log_dir}")
sys.exit(1)
tail_args = ["tail"]
if follow:
tail_args.append("-f")
tail_args.extend(log_files)
try:
proc = subprocess.Popen(tail_args, stdout=sys.stdout, stderr=sys.stderr)
proc.wait()
except KeyboardInterrupt:
if proc.poll() is None:
proc.terminate()

View file

@ -23,7 +23,7 @@ model_providers:
# Ollama Models
- model: ollama/llama3.1
base_url: http://host.docker.internal:11434
base_url: http://localhost:11434
# Model aliases - friendly names that map to actual provider names

View file

@ -2,8 +2,11 @@ import json
import os
import re
import string
import subprocess
import sys
import threading
import time
from http import HTTPStatus
from collections import OrderedDict
from concurrent import futures
from dataclasses import dataclass
@ -22,6 +25,7 @@ from rich.text import Text
from rich.tree import Tree
from planoai.consts import PLANO_COLOR
from planoai import trace_listener_runtime
DEFAULT_GRPC_PORT = 4317
MAX_TRACES = 50
@ -35,7 +39,7 @@ class TraceListenerBindError(RuntimeError):
def _trace_listener_bind_error_message(address: str) -> str:
return (
f"Failed to start OTLP listener on {address}: address is already in use.\n"
"Stop the process using that port or run `planoai trace listen --port <PORT>`."
"Stop the process using that port or run `planoai trace listen`."
)
@ -57,6 +61,25 @@ class TraceSummary:
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
def _is_port_in_use(host: str, port: int) -> bool:
"""Check whether a TCP listener is accepting connections on host:port."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
return s.connect_ex((host, port)) == 0
def _get_listener_pid() -> int | None:
"""Return persisted listener PID if process is alive."""
return trace_listener_runtime.get_listener_pid()
def _stop_background_listener() -> bool:
"""Stop persisted listener process if one is running."""
return trace_listener_runtime.stop_listener_process()
def _parse_filter_patterns(filter_patterns: tuple[str, ...]) -> list[str]:
parts: list[str] = []
for raw in filter_patterns:
@ -437,8 +460,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
"""gRPC servicer that receives OTLP ExportTraceServiceRequest and
merges incoming spans into the global _TRACE_STORE by trace_id."""
_console = Console(stderr=True)
def Export(self, request, context): # noqa: N802
for resource_spans in request.resource_spans:
service_name = "unknown"
@ -456,27 +477,6 @@ class _OTLPTraceServicer(trace_service_pb2_grpc.TraceServiceServicer):
continue
span_dict = _proto_span_to_dict(span, service_name)
_TRACE_STORE.merge_spans(trace_id, [span_dict])
short_id = trace_id[:8]
short_span = span.span_id.hex()[:8]
span_start = (
datetime.fromtimestamp(
span.start_time_unix_nano / 1_000_000_000, tz=timezone.utc
)
.astimezone()
.strftime("%H:%M:%S.%f")[:-3]
)
dur_ns = span.end_time_unix_nano - span.start_time_unix_nano
dur_s = dur_ns / 1_000_000_000
dur_str = f"{dur_s:.3f}".rstrip("0").rstrip(".")
dur_str = f"{dur_str}s"
self._console.print(
f"[dim]{span_start}[/dim], "
f"trace=[yellow]{short_id}[/yellow], "
f"span=[yellow]{short_span}[/yellow], "
f"[bold {_service_color(service_name)}]{service_name}[/bold {_service_color(service_name)}] "
f"[cyan]{span.name}[/cyan] "
f"[dim]({dur_str})[/dim]"
)
return trace_service_pb2.ExportTraceServiceResponse()
@ -499,12 +499,8 @@ class _TraceQueryHandler(grpc.GenericRpcHandler):
return json.dumps({"traces": traces}, separators=(",", ":")).encode("utf-8")
def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace-collection server.
Returns the running ``grpc.Server``. The caller is responsible
for calling ``server.stop()`` when done.
"""
def _start_trace_server(host: str, grpc_port: int) -> grpc.Server:
"""Create, bind, and start an OTLP/gRPC trace server."""
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4),
handlers=[_TraceQueryHandler()],
@ -525,38 +521,88 @@ def _create_trace_server(host: str, grpc_port: int) -> grpc.Server:
return grpc_server
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener and block until interrupted."""
console = Console()
try:
grpc_server = _create_trace_server(host, grpc_port)
except TraceListenerBindError as exc:
raise click.ClickException(str(exc)) from exc
def _serve_trace_listener(host: str, grpc_port: int) -> None:
"""Run the listener loop until process termination."""
# Persist PID immediately after fork, before server startup.
# This ensures the PID file exists even if server initialization fails.
trace_listener_runtime.write_listener_pid(os.getpid())
console.print()
console.print(f"[bold {PLANO_COLOR}]Listening for traces...[/bold {PLANO_COLOR}]")
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print("[dim]Press Ctrl+C to stop.[/dim]")
console.print()
try:
grpc_server = _start_trace_server(host, grpc_port)
grpc_server.wait_for_termination()
except KeyboardInterrupt:
pass
finally:
grpc_server.stop(grace=2)
# Best-effort cleanup; server may not exist if startup failed.
try:
grpc_server.stop(grace=2)
except NameError:
pass
trace_listener_runtime.remove_listener_pid()
def _start_trace_listener(host: str, grpc_port: int) -> None:
"""Start the OTLP/gRPC listener as a daemon process."""
console = Console()
# Check if the requested port is already in use.
if _is_port_in_use(host, grpc_port):
existing_pid = _get_listener_pid()
if existing_pid:
# If the process PID is known, inform user that our listener is already running.
console.print(
f"[yellow]⚠[/yellow] Trace listener already running on port [cyan]{grpc_port}[/cyan] (PID: {existing_pid})"
)
else:
# If port is taken but no tracked listener PID exists, warn user of unknown conflict.
console.print(
f"[red]✗[/red] Port [cyan]{grpc_port}[/cyan] is already in use by another process"
)
console.print(f"\n[dim]Check what's using the port:[/dim]")
console.print(f" [cyan]lsof -i :{grpc_port}[/cyan]")
return
# Fork/daemonize and run the trace server in the background.
try:
pid = trace_listener_runtime.daemonize_and_run(
lambda: _serve_trace_listener(host, grpc_port)
)
except OSError as e:
console.print(f"[red]✗[/red] Failed to start trace listener: {e}")
return
if pid is None:
# We're in the child process; daemonize_and_run never returns here.
return
# In the parent process: wait briefly for the background process to bind the port.
time.sleep(0.5) # Give child process time to start and bind to the port.
if _is_port_in_use(host, grpc_port):
# Success: the trace listener started and bound the port.
console.print()
console.print(
f"[bold {PLANO_COLOR}]Trace listener started[/bold {PLANO_COLOR}]"
)
console.print(
f"[green]●[/green] gRPC (OTLP receiver) on [cyan]{host}:{grpc_port}[/cyan]"
)
console.print(f"[dim]Process ID: {pid}[/dim]")
console.print(
"[dim]Use [cyan]planoai trace[/cyan] to view collected traces.[/dim]"
)
console.print()
else:
# Failure: trace listener child process did not successfully start.
console.print(f"[red]✗[/red] Failed to start trace listener")
def start_trace_listener_background(
host: str = "0.0.0.0", grpc_port: int = DEFAULT_GRPC_PORT
) -> grpc.Server:
"""Start the trace listener in the background (non-blocking).
Returns the running ``grpc.Server`` so the caller can call
``server.stop()`` later.
"""
return _create_trace_server(host, grpc_port)
"""Start the trace server in-process and return ``grpc.Server`` handle."""
return _start_trace_server(host, grpc_port)
def _span_time_ns(span: dict[str, Any], key: str) -> int:
@ -584,13 +630,13 @@ def _trace_summary(trace: dict[str, Any]) -> TraceSummary:
def _service_color(service: str) -> str:
service = service.lower()
if "inbound" in service:
return "white"
return "#4860fa"
if "outbound" in service:
return "white"
return "#57d9a9"
if "orchestrator" in service:
return PLANO_COLOR
if "routing" in service:
return "magenta"
return "#e3a2fa"
if "agent" in service:
return "cyan"
if "llm" in service:
@ -598,6 +644,63 @@ def _service_color(service: str) -> str:
return "white"
def _error_symbol(status_code: str) -> str:
code = int(status_code) if status_code.isdigit() else 0
if code >= 500:
return "💥" # Server error - something broke
elif code == 429:
return "🚦" # Rate limited
elif code == 404:
return "🔍" # Not found
elif code == 403:
return "🚫" # Forbidden
elif code == 401:
return "🔐" # Unauthorized
elif code >= 400:
return "⚠️" # Client error
else:
return "" # Generic error
def _error_description(status_code: str) -> str:
"""Return a developer-friendly description of the error."""
code = int(status_code) if status_code.isdigit() else 0
if code < 400:
return "Error"
try:
return HTTPStatus(code).phrase
except ValueError:
if code >= 500:
return "Server Error"
return "Client Error"
def _detect_error(span: dict[str, Any]) -> tuple[bool, str, str]:
"""Detect if span has an error and return (has_error, status_code, error_msg).
Returns:
tuple: (has_error, status_code, error_description)
"""
attrs = _attrs(span)
status_code = attrs.get("http.status_code", "")
# Check for non-2xx status codes
if status_code and status_code.isdigit():
code = int(status_code)
if code >= 400:
return True, status_code, _error_description(status_code)
# Check for explicit error attributes
if "error.message" in attrs:
return True, status_code or "unknown", attrs["error.message"]
if "exception.message" in attrs:
return True, status_code or "unknown", attrs["exception.message"]
return False, "", ""
# Attributes to show for inbound/outbound spans when not verbose (trimmed view).
_INBOUND_OUTBOUND_ATTR_KEYS = (
"http.method",
@ -621,10 +724,20 @@ def _trim_attrs_for_display(
def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
# Error attributes always come first
error_priority = [
"http.status_code",
"error.type",
"error.message",
"error.stack",
"exception.type",
"exception.message",
]
# Then regular priority attributes
priority = [
"http.method",
"http.target",
"http.status_code",
"guid:x-request-id",
"request_size",
"response_size",
@ -641,7 +754,10 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
"llm.duration_ms",
"llm.response_bytes",
]
prioritized = [(k, attrs[k]) for k in priority if k in attrs]
# Combine error priority with regular priority
full_priority = error_priority + priority
prioritized = [(k, attrs[k]) for k in full_priority if k in attrs]
prioritized_keys = {k for k, _ in prioritized}
remaining = [(k, v) for k, v in attrs.items() if k not in prioritized_keys]
remaining.sort(key=lambda item: item[0])
@ -649,8 +765,14 @@ def _sorted_attr_items(attrs: dict[str, str]) -> list[tuple[str, str]]:
def _display_attr_value(key: str, value: str) -> str:
if key == "http.status_code" and value != "200":
return f"{value} ⚠️"
if key == "http.status_code":
if value.isdigit():
code = int(value)
if code >= 400:
return f"{value} {_error_symbol(value)}"
elif code >= 200 and code < 300:
return f"{value}"
return value
return value
@ -670,7 +792,7 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
)
spans.sort(key=lambda s: _span_time_ns(s, "startTimeUnixNano"))
tree = Tree("", guide_style="dim")
tree = Tree("", guide_style="dim #5b5a5c bold")
for span in spans:
service = span.get("service", "plano(unknown)")
@ -678,22 +800,52 @@ def _build_tree(trace: dict[str, Any], console: Console, verbose: bool = False)
offset_ms = max(
0, (_span_time_ns(span, "startTimeUnixNano") - start_ns) / 1_000_000
)
color = _service_color(service)
label = Text(f"{offset_ms:.0f}ms ", style="yellow")
label.append(service, style=f"bold {color}")
if name:
label.append(f" {name}", style="dim white")
# Check for errors in this span
has_error, error_code, error_desc = _detect_error(span)
if has_error:
# Create error banner above the span
error_banner = Text()
error_banner.append(error_desc, style="bright_red")
tree.add(error_banner)
# Style the span label in light red
label = Text(f"{offset_ms:.0f}ms ", style="#ff6b6b")
label.append(service, style="bold #ff6b6b")
if name:
label.append(f" {name}", style="#ff6b6b italic")
else:
# Normal styling
color = _service_color(service)
label = Text(f"{offset_ms:.0f}ms ", style="#949c99")
label.append(service, style=f"bold {color}")
if name:
label.append(f" {name}", style="dim white bold italic")
node = tree.add(label)
attrs = _trim_attrs_for_display(_attrs(span), service, verbose)
sorted_items = list(_sorted_attr_items(attrs))
for idx, (key, value) in enumerate(sorted_items):
attr_line = Text()
attr_line.append(f"{key}: ", style="white")
attr_line.append(
_display_attr_value(key, str(value)),
style=f"{PLANO_COLOR}",
)
# attribute key
attr_line.append(f"{key}: ", style="#a4a9aa")
# attribute value
if key == "http.status_code" and value.isdigit():
val_int = int(value)
val_style = "bold red" if val_int >= 400 else "green"
attr_line.append(_display_attr_value(key, str(value)), style=val_style)
elif key in [
"error.message",
"exception.message",
"error.type",
"exception.type",
]:
attr_line.append(_display_attr_value(key, str(value)), style="red")
else:
attr_line.append(
_display_attr_value(key, str(value)), style=f"{PLANO_COLOR} bold"
)
if idx == len(sorted_items) - 1:
attr_line.append("\n")
node.add(attr_line)
@ -904,7 +1056,7 @@ def _run_trace_show(
_build_tree(trace_obj, console, verbose=verbose)
@click.group(invoke_without_command=True)
@click.command()
@click.argument("target", required=False)
@click.option(
"--filter",
@ -950,9 +1102,8 @@ def trace(
verbose,
):
"""Trace requests from the local OTLP listener."""
if ctx.invoked_subcommand:
return
if target == "listen" and not any(
# Handle operational shortcuts when invoked as target values.
has_show_options = any(
[
filter_patterns,
where_filters,
@ -963,9 +1114,20 @@ def trace(
json_out,
verbose,
]
):
)
if target == "listen" and not has_show_options:
_start_trace_listener("0.0.0.0", DEFAULT_GRPC_PORT)
return
if target in ("stop", "down") and not has_show_options:
console = Console()
if _stop_background_listener():
console.print(f"[green]✓[/green] Trace listener stopped")
else:
console.print(f"[dim]No background trace listener running[/dim]")
return
_run_trace_show(
target,
filter_patterns,
@ -977,17 +1139,3 @@ def trace(
json_out,
verbose,
)
@trace.command("listen")
@click.option("--host", default="0.0.0.0", show_default=True)
@click.option(
"--port",
type=int,
default=DEFAULT_GRPC_PORT,
show_default=True,
help="gRPC port for receiving OTLP traces.",
)
def trace_listen(host: str, port: int) -> None:
"""Listen for OTLP/gRPC traces."""
_start_trace_listener(host, port)

View file

@ -0,0 +1,127 @@
"""
Trace listener process runtime utilities.
"""
import os
import signal
import time
import logging
from collections.abc import Callable
# Canonical PID file used by `planoai trace listen/down`.
TRACE_LISTENER_PID_PATH = os.path.expanduser("~/.plano/run/trace_listener.pid")
TRACE_LISTENER_LOG_PATH = os.path.expanduser("~/.plano/run/trace_listener.log")
LOGGER = logging.getLogger(__name__)
def write_listener_pid(pid: int) -> None:
"""Persist listener PID for later management commands."""
# Ensure parent directory exists for first-time installs.
os.makedirs(os.path.dirname(TRACE_LISTENER_PID_PATH), exist_ok=True)
with open(TRACE_LISTENER_PID_PATH, "w") as f:
f.write(str(pid))
def remove_listener_pid() -> None:
"""Remove persisted listener PID file if present."""
# Best-effort cleanup; missing file is not an error.
if os.path.exists(TRACE_LISTENER_PID_PATH):
os.remove(TRACE_LISTENER_PID_PATH)
def get_listener_pid() -> int | None:
"""Return listener PID if present and process is alive."""
if not os.path.exists(TRACE_LISTENER_PID_PATH):
return None
try:
# Parse persisted PID.
with open(TRACE_LISTENER_PID_PATH, "r") as f:
pid = int(f.read().strip())
# Signal 0 performs liveness check without sending a real signal.
os.kill(pid, 0)
return pid
except (ValueError, ProcessLookupError, OSError):
# Stale or malformed PID file: clean it up to prevent repeated confusion.
LOGGER.warning(
"Removing stale or malformed trace listener PID file at %s",
TRACE_LISTENER_PID_PATH,
)
remove_listener_pid()
return None
def stop_listener_process(grace_seconds: float = 0.5) -> bool:
"""Stop persisted listener process, returning True if one was stopped."""
pid = get_listener_pid()
if pid is None:
return False
try:
# Try graceful shutdown first.
os.kill(pid, signal.SIGTERM)
# Allow the process a short window to exit cleanly.
time.sleep(grace_seconds)
try:
# If still alive, force terminate.
os.kill(pid, 0)
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
# Already exited after SIGTERM.
pass
remove_listener_pid()
return True
except ProcessLookupError:
# Process disappeared between checks; treat as already stopped.
remove_listener_pid()
return False
def daemonize_and_run(run_forever: Callable[[], None]) -> int | None:
"""
Fork and detach process to create a Unix daemon.
Returns:
- Parent process: child PID (> 0), allowing caller to report startup.
- Child process: never returns; runs callback in daemon context until termination.
Raises:
- OSError: if fork fails (e.g., resource limits exceeded).
"""
# Duplicate current process. Raises OSError if fork fails.
pid = os.fork()
if pid > 0:
# Parent returns child PID to caller.
return pid
# Child: detach from controlling terminal/session.
# This prevents SIGHUP when parent terminal closes and ensures
# the daemon cannot reacquire a controlling terminal.
os.setsid()
# Redirect stdin to /dev/null and stdout/stderr to a persistent log file.
# This keeps the daemon terminal-independent while preserving diagnostics.
os.makedirs(os.path.dirname(TRACE_LISTENER_LOG_PATH), exist_ok=True)
devnull_in = os.open(os.devnull, os.O_RDONLY)
try:
log_fd = os.open(
TRACE_LISTENER_LOG_PATH,
os.O_WRONLY | os.O_CREAT | os.O_APPEND,
0o644,
)
except OSError:
# If logging cannot be initialized, keep running with output discarded.
log_fd = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull_in, 0) # stdin
os.dup2(log_fd, 1) # stdout
os.dup2(log_fd, 2) # stderr
if devnull_in > 2:
os.close(devnull_in)
if log_fd > 2:
os.close(log_fd)
# Run the daemon main loop (expected to block until process termination).
run_forever()
# If callback unexpectedly returns, exit cleanly to avoid returning to parent context.
os._exit(0)