rewrite planoai CLI in Rust

Add crates/plano-cli/ as a full Rust rewrite of the Python CLI.
Binary name: planoai. All subcommands ported: up, down, build,
logs, cli-agent, trace, init. Config validation and Tera template
rendering replace the Python config_generator. CI updated to use
cargo test/build instead of Python test jobs.
This commit is contained in:
Adil Hafeez 2026-03-22 22:57:35 +00:00
parent 406fa92802
commit 15b9e8b95c
37 changed files with 4658 additions and 91 deletions

View file

@ -1,10 +1,9 @@
---
name: build-cli
description: Build and install the Python CLI (planoai). Use after making changes to cli/ code to install locally.
description: Build and install the Rust CLI (planoai). Use after making changes to plano-cli code to install locally.
---
1. `cd cli && uv sync` — ensure dependencies are installed
2. `cd cli && uv tool install --editable .` — install the CLI locally
3. Verify the installation: `cd cli && uv run planoai --help`
1. `cd crates && cargo build --release -p plano-cli` — build the CLI binary
2. Verify the installation: `./crates/target/release/planoai --help`
If the build or install fails, diagnose and fix the issues.
If the build fails, diagnose and fix the issues.

View file

@ -25,33 +25,19 @@ jobs:
- uses: pre-commit/action@v3.0.1
# ──────────────────────────────────────────────
# Plano tools (CLI) tests — no Docker needed
# Plano CLI (Rust) tests — no Docker needed
# ──────────────────────────────────────────────
plano-tools-tests:
plano-cli-tests:
runs-on: ubuntu-latest-m
defaults:
run:
working-directory: ./cli
steps:
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install Rust
uses: dtolnay/rust-toolchain@stable
- name: Install uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install plano tools
run: uv sync --extra dev
- name: Sync CLI templates to demos
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
run: uv run python -m planoai.template_sync
- name: Run tests
run: uv run pytest
- name: Run plano-cli tests
working-directory: ./crates
run: cargo test -p plano-cli
# ──────────────────────────────────────────────
# Native mode smoke test — build from source & start natively
@ -62,32 +48,22 @@ jobs:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.12"
- name: Install uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
targets: wasm32-wasip1
- name: Install planoai CLI
working-directory: ./cli
- name: Build plano CLI and native binaries
working-directory: ./crates
run: |
uv sync
uv tool install .
- name: Build native binaries
run: planoai build
cargo build --release -p plano-cli
cargo build --release -p brightstaff
cargo build --release --target wasm32-wasip1 -p llm_gateway -p prompt_gateway
- name: Start plano natively
env:
OPENAI_API_KEY: test-key-not-used
run: planoai up tests/e2e/config_native_smoke.yaml
run: ./crates/target/release/planoai up tests/e2e/config_native_smoke.yaml
- name: Health check
run: |
@ -105,7 +81,7 @@ jobs:
- name: Stop plano
if: always()
run: planoai down || true
run: ./crates/target/release/planoai down || true
# ──────────────────────────────────────────────
# Single Docker build — shared by all downstream jobs
@ -157,13 +133,12 @@ jobs:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install Rust
uses: dtolnay/rust-toolchain@stable
- name: Install planoai
run: pip install -e ./cli
- name: Build plano CLI
working-directory: ./crates
run: cargo build --release -p plano-cli
- name: Validate plano config
run: bash config/validate_plano_config.sh

View file

@ -16,8 +16,9 @@ cd crates && cargo test --lib
cd crates && cargo fmt --all -- --check
cd crates && cargo clippy --locked --all-targets --all-features -- -D warnings
# Python CLI
cd cli && uv sync && uv run pytest -v
# Rust — plano CLI binary
cd crates && cargo build --release -p plano-cli
cd crates && cargo test -p plano-cli
# JS/TS (Turbo monorepo)
npm run build && npm run lint && npm run typecheck
@ -47,9 +48,13 @@ Client → Envoy (prompt_gateway.wasm → llm_gateway.wasm) → Agents/LLM Provi
- **common** (lib) — Shared: config, HTTP, routing, rate limiting, tokenizer, PII, tracing
- **hermesllm** (lib) — LLM API translation between providers. Key types: `ProviderId`, `ProviderRequest`, `ProviderResponse`, `ProviderStreamResponse`
### Python CLI (cli/planoai/)
### Plano CLI (crates/plano-cli/)
Entry point: `main.py`. Built with `rich-click`. Commands: `up`, `down`, `build`, `logs`, `trace`, `init`, `cli_agent`, `generate_prompt_targets`.
Rust CLI binary (`planoai`). Built with `clap` v4. Commands: `up`, `down`, `build`, `logs`, `trace`, `init`, `cli-agent`.
### Legacy Python CLI (cli/planoai/) — deprecated
Entry point: `main.py`. Built with `rich-click`. Being replaced by the Rust CLI above.
### Config (config/)
@ -86,7 +91,7 @@ Code in `prompt_gateway` and `llm_gateway` runs in Envoy's WASM sandbox:
Update version (e.g., `0.4.11``0.4.12`) in all of these files:
- `.github/workflows/ci.yml`, `build_filter_image.sh`, `config/validate_plano_config.sh`
- `cli/planoai/__init__.py`, `cli/planoai/consts.py`, `cli/pyproject.toml`
- `crates/plano-cli/Cargo.toml`
- `docs/source/conf.py`, `docs/source/get_started/quickstart.rst`, `docs/source/resources/deployment.rst`
- `apps/www/src/components/Hero.tsx`, `demos/llm_routing/preference_based_routing/README.md`

1073
crates/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["llm_gateway", "prompt_gateway", "common", "brightstaff", "hermesllm"]
members = ["llm_gateway", "prompt_gateway", "common", "brightstaff", "hermesllm", "plano-cli"]

View file

@ -0,0 +1,70 @@
[package]
name = "plano-cli"
version = "0.4.14"
edition = "2021"
default-run = "planoai"
[[bin]]
name = "planoai"
path = "src/main.rs"
[dependencies]
# CLI framework
clap = { version = "4", features = ["derive"] }
# Templating
tera = "1"
# Config parsing & validation
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9.34"
serde_json = { version = "1.0", features = ["preserve_order"] }
jsonschema = "0.29"
# Terminal UI
console = "0.15"
indicatif = "0.17"
dialoguer = "0.11"
# Error handling
thiserror = "2"
anyhow = "1"
# Async runtime
tokio = { version = "1.44", features = ["full"] }
# HTTP client
reqwest = { version = "0.12", features = ["stream"] }
# Process management
nix = { version = "0.29", features = ["signal", "process"] }
# Archives for binary downloads
flate2 = "1.0"
tar = "0.4"
# Version comparison
semver = "1"
# URL parsing
url = "2"
# Regex for env var extraction
regex = "1"
# gRPC for trace listener
tonic = "0.12"
prost = "0.13"
prost-types = "0.13"
# Reuse workspace crates
common = { version = "0.1.0", path = "../common" }
# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Filesystem helpers
dirs = "5"
which = "7"
atty = "0.2"

View file

@ -0,0 +1,4 @@
fn main() {
// For now, just rerun if templates change
println!("cargo:rerun-if-changed=templates/");
}

View file

@ -0,0 +1,105 @@
use std::process::Command;
use anyhow::{bail, Result};
use crate::consts::plano_docker_image;
use crate::utils::{find_repo_root, print_cli_header};
pub async fn run(docker: bool) -> Result<()> {
let dim = console::Style::new().dim();
let red = console::Style::new().red();
let bold = console::Style::new().bold();
if !docker {
print_cli_header();
let repo_root = find_repo_root().ok_or_else(|| {
anyhow::anyhow!(
"Could not find repository root. Make sure you're inside the plano repository."
)
})?;
let crates_dir = repo_root.join("crates");
// Check cargo is available
if which::which("cargo").is_err() {
eprintln!(
"{} {} not found. Install Rust: https://rustup.rs",
red.apply_to(""),
bold.apply_to("cargo")
);
std::process::exit(1);
}
// Build WASM plugins
eprintln!(
"{}",
dim.apply_to("Building WASM plugins (wasm32-wasip1)...")
);
let status = Command::new("cargo")
.args([
"build",
"--release",
"--target",
"wasm32-wasip1",
"-p",
"llm_gateway",
"-p",
"prompt_gateway",
])
.current_dir(&crates_dir)
.status()?;
if !status.success() {
bail!("WASM build failed");
}
// Build brightstaff
eprintln!("{}", dim.apply_to("Building brightstaff (native)..."));
let status = Command::new("cargo")
.args(["build", "--release", "-p", "brightstaff"])
.current_dir(&crates_dir)
.status()?;
if !status.success() {
bail!("brightstaff build failed");
}
let wasm_dir = crates_dir.join("target/wasm32-wasip1/release");
let native_dir = crates_dir.join("target/release");
println!("\n{}:", bold.apply_to("Build artifacts"));
println!(" {}", wasm_dir.join("prompt_gateway.wasm").display());
println!(" {}", wasm_dir.join("llm_gateway.wasm").display());
println!(" {}", native_dir.join("brightstaff").display());
} else {
let repo_root =
find_repo_root().ok_or_else(|| anyhow::anyhow!("Could not find repository root."))?;
let dockerfile = repo_root.join("Dockerfile");
if !dockerfile.exists() {
bail!("Dockerfile not found at {}", dockerfile.display());
}
println!("Building plano image from {}...", repo_root.display());
let status = Command::new("docker")
.args([
"build",
"-f",
&dockerfile.to_string_lossy(),
"-t",
&plano_docker_image(),
&repo_root.to_string_lossy(),
"--add-host=host.docker.internal:host-gateway",
])
.status()?;
if !status.success() {
bail!("Docker build failed");
}
println!("plano image built successfully.");
}
Ok(())
}

View file

@ -0,0 +1,103 @@
use std::collections::HashMap;
use std::process::Command;
use anyhow::{bail, Result};
use crate::consts::PLANO_DOCKER_NAME;
use crate::utils::{find_config_file, is_native_plano_running};
pub async fn run(agent_type: &str, file: Option<String>, path: &str, settings: &str) -> Result<()> {
let native_running = is_native_plano_running();
let docker_running = if !native_running {
crate::docker::container_status(PLANO_DOCKER_NAME).await? == "running"
} else {
false
};
if !native_running && !docker_running {
bail!("Plano is not running. Start Plano first using 'plano up <config.yaml>' (native or --docker mode).");
}
let plano_config_file = find_config_file(path, file.as_deref());
if !plano_config_file.exists() {
bail!("Config file not found: {}", plano_config_file.display());
}
start_cli_agent(&plano_config_file, agent_type, settings)
}
fn start_cli_agent(
plano_config_path: &std::path::Path,
agent_type: &str,
_settings_json: &str,
) -> Result<()> {
let config_str = std::fs::read_to_string(plano_config_path)?;
let config: serde_yaml::Value = serde_yaml::from_str(&config_str)?;
// Resolve CLI agent endpoint
let (host, port) = resolve_cli_agent_endpoint(&config)?;
let base_url = format!("http://{host}:{port}/v1");
let mut env: HashMap<String, String> = std::env::vars().collect();
match agent_type {
"claude" => {
env.insert("ANTHROPIC_BASE_URL".to_string(), base_url);
// Check for model alias
if let Some(model) = config
.get("model_aliases")
.and_then(|a| a.get("arch"))
.and_then(|a| a.get("claude"))
.and_then(|a| a.get("code"))
.and_then(|a| a.get("small"))
.and_then(|a| a.get("fast"))
.and_then(|a| a.get("target"))
.and_then(|v| v.as_str())
{
env.insert("ANTHROPIC_MODEL".to_string(), model.to_string());
}
let status = Command::new("claude").envs(&env).status()?;
if !status.success() {
std::process::exit(status.code().unwrap_or(1));
}
}
"codex" => {
env.insert("OPENAI_BASE_URL".to_string(), base_url);
let status = Command::new("codex").envs(&env).status()?;
if !status.success() {
std::process::exit(status.code().unwrap_or(1));
}
}
_ => bail!("Unsupported agent type: {agent_type}"),
}
Ok(())
}
fn resolve_cli_agent_endpoint(config: &serde_yaml::Value) -> Result<(String, u16)> {
// Look for model listener (egress_traffic)
if let Some(listeners) = config.get("listeners").and_then(|v| v.as_sequence()) {
for listener in listeners {
let listener_type = listener.get("type").and_then(|v| v.as_str()).unwrap_or("");
if listener_type == "model" {
let host = listener
.get("address")
.and_then(|v| v.as_str())
.unwrap_or("0.0.0.0");
let port = listener
.get("port")
.and_then(|v| v.as_u64())
.unwrap_or(12000) as u16;
return Ok((host.to_string(), port));
}
}
}
// Default
Ok(("0.0.0.0".to_string(), 12000))
}

View file

@ -0,0 +1,15 @@
use anyhow::Result;
use crate::utils::print_cli_header;
pub async fn run(docker: bool) -> Result<()> {
print_cli_header();
if !docker {
crate::native::runner::stop_native()?;
} else {
crate::docker::stop_container().await?;
}
Ok(())
}

View file

@ -0,0 +1,132 @@
use std::path::Path;
use anyhow::{bail, Result};
const TEMPLATES: &[(&str, &str, &str)] = &[
(
"sub_agent_orchestration",
"Sub-agent Orchestration",
include_str!("../../templates/sub_agent_orchestration.yaml"),
),
(
"coding_agent_routing",
"Coding Agent Routing",
include_str!("../../templates/coding_agent_routing.yaml"),
),
(
"preference_aware_routing",
"Preference-Aware Routing",
include_str!("../../templates/preference_aware_routing.yaml"),
),
(
"filter_chain_guardrails",
"Filter Chain Guardrails",
include_str!("../../templates/filter_chain_guardrails.yaml"),
),
(
"conversational_state",
"Conversational State",
include_str!("../../templates/conversational_state.yaml"),
),
];
pub async fn run(
template: Option<String>,
clean: bool,
output: Option<String>,
force: bool,
list_templates: bool,
) -> Result<()> {
let bold = console::Style::new().bold();
let dim = console::Style::new().dim();
let green = console::Style::new().green();
let cyan = console::Style::new().cyan();
if list_templates {
println!("\n{}:", bold.apply_to("Available templates"));
for (id, name, _) in TEMPLATES {
println!(" {} - {}", cyan.apply_to(id), name);
}
println!();
return Ok(());
}
let output_path = output.unwrap_or_else(|| "plano_config.yaml".to_string());
let output_path = Path::new(&output_path);
if output_path.exists() && !force {
bail!(
"File {} already exists. Use --force to overwrite.",
output_path.display()
);
}
if clean {
let content = "version: v0.3.0\nlisteners:\n - type: model\n name: egress_traffic\n port: 12000\nmodel_providers: []\n";
std::fs::write(output_path, content)?;
println!(
"{} Created clean config at {}",
green.apply_to(""),
output_path.display()
);
return Ok(());
}
if let Some(template_id) = template {
let tmpl = TEMPLATES
.iter()
.find(|(id, _, _)| *id == template_id)
.ok_or_else(|| {
anyhow::anyhow!(
"Unknown template '{}'. Use --list-templates to see available templates.",
template_id
)
})?;
std::fs::write(output_path, tmpl.2)?;
println!(
"{} Created config from template '{}' at {}",
green.apply_to(""),
tmpl.1,
output_path.display()
);
// Preview
let lines: Vec<&str> = tmpl.2.lines().take(28).collect();
println!("\n{}:", dim.apply_to("Preview"));
for line in &lines {
println!(" {}", dim.apply_to(line));
}
if tmpl.2.lines().count() > 28 {
println!(" {}", dim.apply_to("..."));
}
return Ok(());
}
// Interactive mode using dialoguer
if !atty::is(atty::Stream::Stdin) {
bail!(
"Interactive mode requires a TTY. Use --template or --clean for non-interactive mode."
);
}
let selections: Vec<&str> = TEMPLATES.iter().map(|(_, name, _)| *name).collect();
let selection = dialoguer::Select::new()
.with_prompt("Choose a template")
.items(&selections)
.default(0)
.interact()?;
let tmpl = &TEMPLATES[selection];
std::fs::write(output_path, tmpl.2)?;
println!(
"\n{} Created config from template '{}' at {}",
green.apply_to(""),
tmpl.1,
output_path.display()
);
Ok(())
}

View file

@ -0,0 +1,10 @@
use anyhow::Result;
pub async fn run(debug: bool, follow: bool, docker: bool) -> Result<()> {
if !docker {
crate::native::runner::native_logs(debug, follow)?;
} else {
crate::docker::stream_logs(debug, follow).await?;
}
Ok(())
}

View file

@ -0,0 +1,262 @@
pub mod build;
pub mod cli_agent;
pub mod down;
pub mod init;
pub mod logs;
pub mod up;
use clap::{Parser, Subcommand};
use crate::consts::PLANO_VERSION;
const LOGO: &str = r#"
______ _
| ___ \ |
| |_/ / | __ _ _ __ ___
| __/| |/ _` | '_ \ / _ \
| | | | (_| | | | | (_) |
\_| |_|\__,_|_| |_|\___/
"#;
#[derive(Parser)]
#[command(
name = "planoai",
about = "The Delivery Infrastructure for Agentic Apps"
)]
#[command(version = PLANO_VERSION)]
pub struct Cli {
#[command(subcommand)]
pub command: Option<Command>,
}
#[derive(Subcommand)]
pub enum Command {
/// Start Plano
Up {
/// Config file path (positional)
file: Option<String>,
/// Path to the directory containing config.yaml
#[arg(long, default_value = ".")]
path: String,
/// Run Plano in the foreground
#[arg(long)]
foreground: bool,
/// Start a local OTLP trace collector
#[arg(long)]
with_tracing: bool,
/// Port for the OTLP trace collector
#[arg(long, default_value_t = 4317)]
tracing_port: u16,
/// Run Plano inside Docker instead of natively
#[arg(long)]
docker: bool,
},
/// Stop Plano
Down {
/// Stop a Docker-based Plano instance
#[arg(long)]
docker: bool,
},
/// Build Plano from source
Build {
/// Build the Docker image instead of native binaries
#[arg(long)]
docker: bool,
},
/// Stream logs from Plano
Logs {
/// Show detailed debug logs
#[arg(long)]
debug: bool,
/// Follow the logs
#[arg(long)]
follow: bool,
/// Stream logs from a Docker-based Plano instance
#[arg(long)]
docker: bool,
},
/// Start a CLI agent connected to Plano
CliAgent {
/// The type of CLI agent to start
#[arg(value_parser = ["claude", "codex"])]
agent_type: String,
/// Config file path (positional)
file: Option<String>,
/// Path to the directory containing plano_config.yaml
#[arg(long, default_value = ".")]
path: String,
/// Additional settings as JSON string for the CLI agent
#[arg(long, default_value = "{}")]
settings: String,
},
/// Manage distributed traces
Trace {
#[command(subcommand)]
command: TraceCommand,
},
/// Initialize a new Plano configuration
Init {
/// Use a built-in template
#[arg(long)]
template: Option<String>,
/// Create a clean empty config
#[arg(long)]
clean: bool,
/// Output file path
#[arg(long, short)]
output: Option<String>,
/// Overwrite existing files
#[arg(long)]
force: bool,
/// List available templates
#[arg(long)]
list_templates: bool,
},
}
#[derive(Subcommand)]
pub enum TraceCommand {
/// Start the OTLP trace listener
Listen {
/// Host to bind to
#[arg(long, default_value = "0.0.0.0")]
host: String,
/// Port to listen on
#[arg(long, default_value_t = 4317)]
port: u16,
},
/// Stop the trace listener
Down,
/// Show a specific trace
Show {
/// Trace ID to display
trace_id: String,
/// Show verbose span details
#[arg(long)]
verbose: bool,
},
/// Tail recent traces
Tail {
/// Include spans matching these patterns
#[arg(long)]
include_spans: Option<String>,
/// Exclude spans matching these patterns
#[arg(long)]
exclude_spans: Option<String>,
/// Filter by attribute key=value
#[arg(long, name = "KEY=VALUE")]
r#where: Vec<String>,
/// Show traces since (e.g. 10s, 5m, 1h)
#[arg(long)]
since: Option<String>,
/// Show verbose span details
#[arg(long)]
verbose: bool,
},
}
pub async fn run(cli: Cli) -> anyhow::Result<()> {
// Initialize logging
let log_level = std::env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(&log_level)),
)
.init();
match cli.command {
None => {
print_logo();
// Print help by re-parsing with --help
let _ = Cli::parse_from(["planoai", "--help"]);
Ok(())
}
Some(Command::Up {
file,
path,
foreground,
with_tracing,
tracing_port,
docker,
}) => up::run(file, path, foreground, with_tracing, tracing_port, docker).await,
Some(Command::Down { docker }) => down::run(docker).await,
Some(Command::Build { docker }) => build::run(docker).await,
Some(Command::Logs {
debug,
follow,
docker,
}) => logs::run(debug, follow, docker).await,
Some(Command::CliAgent {
agent_type,
file,
path,
settings,
}) => cli_agent::run(&agent_type, file, &path, &settings).await,
Some(Command::Trace { command }) => match command {
TraceCommand::Listen { host, port } => crate::trace::listen::run(&host, port).await,
TraceCommand::Down => crate::trace::down::run().await,
TraceCommand::Show { trace_id, verbose } => {
crate::trace::show::run(&trace_id, verbose).await
}
TraceCommand::Tail {
include_spans,
exclude_spans,
r#where,
since,
verbose,
} => {
crate::trace::tail::run(
include_spans.as_deref(),
exclude_spans.as_deref(),
&r#where,
since.as_deref(),
verbose,
)
.await
}
},
Some(Command::Init {
template,
clean,
output,
force,
list_templates,
}) => init::run(template, clean, output, force, list_templates).await,
}
}
fn print_logo() {
let style = console::Style::new().bold().color256(141); // closest to #969FF4
println!("{}", style.apply_to(LOGO));
println!(" The Delivery Infrastructure for Agentic Apps\n");
}

View file

@ -0,0 +1,174 @@
use std::collections::HashMap;
use std::path::Path;
use anyhow::Result;
use crate::consts::{
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT, DEFAULT_OTEL_TRACING_GRPC_ENDPOINT,
};
use crate::utils::{
find_config_file, get_llm_provider_access_keys, is_port_in_use, load_env_file,
print_cli_header, print_missing_keys,
};
pub async fn run(
file: Option<String>,
path: String,
foreground: bool,
with_tracing: bool,
tracing_port: u16,
docker: bool,
) -> Result<()> {
let green = console::Style::new().green();
let red = console::Style::new().red();
let dim = console::Style::new().dim();
let cyan = console::Style::new().cyan();
print_cli_header();
let plano_config_file = find_config_file(&path, file.as_deref());
if !plano_config_file.exists() {
eprintln!(
"{} Config file not found: {}",
red.apply_to(""),
dim.apply_to(plano_config_file.display().to_string())
);
std::process::exit(1);
}
// Validate configuration
if !docker {
eprint!("{}", dim.apply_to("Validating configuration..."));
match crate::native::runner::validate_config(&plano_config_file) {
Ok(()) => eprintln!(" {}", green.apply_to("")),
Err(e) => {
eprintln!("\n{} Validation failed", red.apply_to(""));
eprintln!(" {}", dim.apply_to(format!("{e:#}")));
std::process::exit(1);
}
}
} else {
eprint!("{}", dim.apply_to("Validating configuration (Docker)..."));
match crate::docker::validate_config(&plano_config_file).await {
Ok(()) => eprintln!(" {}", green.apply_to("")),
Err(e) => {
eprintln!("\n{} Validation failed", red.apply_to(""));
eprintln!(" {}", dim.apply_to(format!("{e:#}")));
std::process::exit(1);
}
}
}
// Set up environment
let default_otel = if docker {
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT
} else {
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT
};
let mut env_stage: HashMap<String, String> = HashMap::new();
env_stage.insert(
"OTEL_TRACING_GRPC_ENDPOINT".to_string(),
default_otel.to_string(),
);
// Check access keys
let access_keys = get_llm_provider_access_keys(&plano_config_file)?;
let access_keys: Vec<String> = access_keys
.into_iter()
.map(|k| k.strip_prefix('$').unwrap_or(&k).to_string())
.collect();
let access_keys_set: std::collections::HashSet<_> = access_keys.into_iter().collect();
let mut missing_keys = Vec::new();
if !access_keys_set.is_empty() {
let app_env_file = if let Some(ref f) = file {
Path::new(f).parent().unwrap_or(Path::new(".")).join(".env")
} else {
Path::new(&path).join(".env")
};
if !app_env_file.exists() {
for key in &access_keys_set {
match std::env::var(key) {
Ok(val) => {
env_stage.insert(key.clone(), val);
}
Err(_) => missing_keys.push(key.clone()),
}
}
} else {
let env_dict = load_env_file(&app_env_file)?;
for key in &access_keys_set {
if let Some(val) = env_dict.get(key.as_str()) {
env_stage.insert(key.clone(), val.clone());
} else {
missing_keys.push(key.clone());
}
}
}
}
if !missing_keys.is_empty() {
print_missing_keys(&missing_keys);
std::process::exit(1);
}
env_stage.insert(
"LOG_LEVEL".to_string(),
std::env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()),
);
// Handle tracing
if with_tracing {
if is_port_in_use(tracing_port) {
eprintln!(
"{} Trace collector already running on port {}",
green.apply_to(""),
cyan.apply_to(tracing_port.to_string())
);
} else {
match crate::trace::listen::start_background(tracing_port).await {
Ok(()) => {
eprintln!(
"{} Trace collector listening on {}",
green.apply_to(""),
cyan.apply_to(format!("0.0.0.0:{tracing_port}"))
);
}
Err(e) => {
eprintln!(
"{} Failed to start trace collector on port {tracing_port}: {e}",
red.apply_to("")
);
std::process::exit(1);
}
}
}
let tracing_host = if docker {
"host.docker.internal"
} else {
"localhost"
};
env_stage.insert(
"OTEL_TRACING_GRPC_ENDPOINT".to_string(),
format!("http://{tracing_host}:{tracing_port}"),
);
}
// Build full env
let mut env: HashMap<String, String> = std::env::vars().collect();
env.remove("PATH");
env.extend(env_stage);
if !docker {
crate::native::runner::start_native(&plano_config_file, &env, foreground, with_tracing)
.await?;
} else {
crate::docker::start_plano(&plano_config_file, &env, foreground).await?;
}
Ok(())
}

View file

@ -0,0 +1,761 @@
use std::collections::{HashMap, HashSet};
use std::path::Path;
use anyhow::{bail, Result};
use serde_json::json;
use url::Url;
use crate::config::validation::validate_prompt_config;
use crate::consts::DEFAULT_OTEL_TRACING_GRPC_ENDPOINT;
use crate::utils::expand_env_vars;
const SUPPORTED_PROVIDERS_WITH_BASE_URL: &[&str] =
&["azure_openai", "ollama", "qwen", "amazon_bedrock", "plano"];
const SUPPORTED_PROVIDERS_WITHOUT_BASE_URL: &[&str] = &[
"deepseek",
"groq",
"mistral",
"openai",
"gemini",
"anthropic",
"together_ai",
"xai",
"moonshotai",
"zhipu",
];
fn all_supported_providers() -> Vec<&'static str> {
let mut all = Vec::new();
all.extend_from_slice(SUPPORTED_PROVIDERS_WITHOUT_BASE_URL);
all.extend_from_slice(SUPPORTED_PROVIDERS_WITH_BASE_URL);
all
}
/// Get endpoint and port from an endpoint string.
fn get_endpoint_and_port(endpoint: &str, protocol: &str) -> (String, u16) {
if let Some((host, port_str)) = endpoint.rsplit_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
return (host.to_string(), port);
}
}
let port = if protocol == "http" { 80 } else { 443 };
(endpoint.to_string(), port)
}
/// Convert legacy dict-style listeners to array format.
pub fn convert_legacy_listeners(
listeners: &serde_yaml::Value,
model_providers: &serde_yaml::Value,
) -> Result<(Vec<serde_json::Value>, serde_json::Value, serde_json::Value)> {
let mp_json: serde_json::Value = serde_json::to_value(model_providers)?;
let mp_array = if mp_json.is_array() {
mp_json.clone()
} else {
json!([])
};
let mut llm_gateway = json!({
"name": "egress_traffic",
"type": "model",
"port": 12000,
"address": "0.0.0.0",
"timeout": "30s",
"model_providers": mp_array,
});
let mut prompt_gateway = json!({
"name": "ingress_traffic",
"type": "prompt",
"port": 10000,
"address": "0.0.0.0",
"timeout": "30s",
});
if listeners.is_null() {
return Ok((vec![llm_gateway.clone()], llm_gateway, prompt_gateway));
}
// Legacy dict format
if listeners.is_mapping() {
let mut updated = Vec::new();
if let Some(egress) = listeners.get("egress_traffic") {
if let Some(p) = egress.get("port").and_then(|v| v.as_u64()) {
llm_gateway["port"] = json!(p);
}
if let Some(a) = egress.get("address").and_then(|v| v.as_str()) {
llm_gateway["address"] = json!(a);
}
if let Some(t) = egress.get("timeout").and_then(|v| v.as_str()) {
llm_gateway["timeout"] = json!(t);
}
}
if !mp_array.as_array().is_none_or(|a| a.is_empty()) {
llm_gateway["model_providers"] = mp_array;
} else {
bail!("model_providers cannot be empty when using legacy format");
}
updated.push(llm_gateway.clone());
if let Some(ingress) = listeners.get("ingress_traffic") {
if !ingress.is_null() && ingress.is_mapping() {
if let Some(p) = ingress.get("port").and_then(|v| v.as_u64()) {
prompt_gateway["port"] = json!(p);
}
if let Some(a) = ingress.get("address").and_then(|v| v.as_str()) {
prompt_gateway["address"] = json!(a);
}
if let Some(t) = ingress.get("timeout").and_then(|v| v.as_str()) {
prompt_gateway["timeout"] = json!(t);
}
updated.push(prompt_gateway.clone());
}
}
return Ok((updated, llm_gateway, prompt_gateway));
}
// Array format
if let Some(arr) = listeners.as_sequence() {
let mut result: Vec<serde_json::Value> = Vec::new();
let mut model_provider_set = false;
for listener in arr {
let mut l: serde_json::Value = serde_json::to_value(listener)?;
let listener_type = l.get("type").and_then(|v| v.as_str()).unwrap_or("");
if listener_type == "model" {
if model_provider_set {
bail!("Currently only one listener can have model_providers set");
}
l["model_providers"] = mp_array.clone();
model_provider_set = true;
// Merge into llm_gateway defaults
if let Some(obj) = l.as_object() {
for (k, v) in obj {
llm_gateway[k] = v.clone();
}
}
} else if listener_type == "prompt" {
if let Some(obj) = l.as_object() {
for (k, v) in obj {
prompt_gateway[k] = v.clone();
}
}
}
result.push(l);
}
if !model_provider_set {
result.push(llm_gateway.clone());
}
return Ok((result, llm_gateway, prompt_gateway));
}
Ok((vec![llm_gateway.clone()], llm_gateway, prompt_gateway))
}
/// Main config validation and rendering function.
/// Ported from config_generator.py validate_and_render_schema()
pub fn validate_and_render(
config_path: &Path,
schema_path: &Path,
template_path: &Path,
envoy_output_path: &Path,
config_output_path: &Path,
) -> Result<()> {
// Step 1: JSON Schema validation
validate_prompt_config(config_path, schema_path)?;
// Step 2: Load and process config
let config_str = std::fs::read_to_string(config_path)?;
let mut config_yaml: serde_yaml::Value = serde_yaml::from_str(&config_str)?;
let mut inferred_clusters: HashMap<String, serde_json::Value> = HashMap::new();
// Convert legacy llm_providers → model_providers
if config_yaml.get("llm_providers").is_some() {
if config_yaml.get("model_providers").is_some() {
bail!("Please provide either llm_providers or model_providers, not both. llm_providers is deprecated, please use model_providers instead");
}
let providers = config_yaml
.get("llm_providers")
.cloned()
.unwrap_or_default();
config_yaml.as_mapping_mut().unwrap().insert(
serde_yaml::Value::String("model_providers".to_string()),
providers,
);
config_yaml
.as_mapping_mut()
.unwrap()
.remove(serde_yaml::Value::String("llm_providers".to_string()));
}
let listeners_val = config_yaml.get("listeners").cloned().unwrap_or_default();
let model_providers_val = config_yaml
.get("model_providers")
.cloned()
.unwrap_or_default();
let (listeners, llm_gateway, prompt_gateway) =
convert_legacy_listeners(&listeners_val, &model_providers_val)?;
// Update config with processed listeners
let listeners_yaml: serde_yaml::Value =
serde_yaml::from_str(&serde_json::to_string(&listeners)?)?;
config_yaml.as_mapping_mut().unwrap().insert(
serde_yaml::Value::String("listeners".to_string()),
listeners_yaml,
);
// Process endpoints from config
let endpoints_yaml = config_yaml.get("endpoints").cloned().unwrap_or_default();
let mut endpoints: HashMap<String, serde_json::Value> = if endpoints_yaml.is_mapping() {
serde_json::from_str(&serde_json::to_string(&endpoints_yaml)?)?
} else {
HashMap::new()
};
// Process agents and filters → endpoints
let agents = config_yaml
.get("agents")
.and_then(|v| v.as_sequence())
.cloned()
.unwrap_or_default();
let filters = config_yaml
.get("filters")
.and_then(|v| v.as_sequence())
.cloned()
.unwrap_or_default();
let mut agent_id_keys: HashSet<String> = HashSet::new();
let agents_combined: Vec<_> = agents.iter().chain(filters.iter()).collect();
for agent in &agents_combined {
let agent_id = agent.get("id").and_then(|v| v.as_str()).unwrap_or("");
if !agent_id_keys.insert(agent_id.to_string()) {
bail!("Duplicate agent id {agent_id}, please provide unique id for each agent");
}
let agent_url = agent.get("url").and_then(|v| v.as_str()).unwrap_or("");
if !agent_id.is_empty() && !agent_url.is_empty() {
if let Ok(url) = Url::parse(agent_url) {
if let Some(host) = url.host_str() {
let protocol = url.scheme();
let port = url
.port()
.unwrap_or(if protocol == "http" { 80 } else { 443 });
endpoints.insert(
agent_id.to_string(),
json!({
"endpoint": host,
"port": port,
"protocol": protocol,
}),
);
}
}
}
}
// Override inferred clusters with endpoints
for (name, details) in &endpoints {
let mut cluster = details.clone();
if cluster.get("port").is_none() {
let ep = cluster
.get("endpoint")
.and_then(|v| v.as_str())
.unwrap_or("");
let protocol = cluster
.get("protocol")
.and_then(|v| v.as_str())
.unwrap_or("http");
let (endpoint, port) = get_endpoint_and_port(ep, protocol);
cluster["endpoint"] = json!(endpoint);
cluster["port"] = json!(port);
}
inferred_clusters.insert(name.clone(), cluster);
}
// Validate prompt_targets reference valid endpoints
if let Some(targets) = config_yaml
.get("prompt_targets")
.and_then(|v| v.as_sequence())
{
for target in targets {
if let Some(name) = target
.get("endpoint")
.and_then(|e| e.get("name"))
.and_then(|n| n.as_str())
{
if !inferred_clusters.contains_key(name) {
bail!("Unknown endpoint {name}, please add it in endpoints section in your plano_config.yaml file");
}
}
}
}
// Process tracing config
let mut plano_tracing: serde_json::Value = config_yaml
.get("tracing")
.map(|v| serde_json::to_value(v).unwrap_or_default())
.unwrap_or_else(|| json!({}));
// Resolution order: config yaml > OTEL_TRACING_GRPC_ENDPOINT env var > hardcoded default
let otel_endpoint = plano_tracing
.get("opentracing_grpc_endpoint")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| {
std::env::var("OTEL_TRACING_GRPC_ENDPOINT")
.unwrap_or_else(|_| DEFAULT_OTEL_TRACING_GRPC_ENDPOINT.to_string())
});
// Expand env vars if present
let otel_endpoint = if otel_endpoint.contains('$') {
let expanded = expand_env_vars(&otel_endpoint);
eprintln!("Resolved opentracing_grpc_endpoint to {expanded} after expanding environment variables");
expanded
} else {
otel_endpoint
};
// Validate OTEL endpoint
if !otel_endpoint.is_empty() {
if let Ok(url) = Url::parse(&otel_endpoint) {
if url.scheme() != "http" {
bail!("Invalid opentracing_grpc_endpoint {otel_endpoint}, scheme must be http");
}
let path = url.path();
if !path.is_empty() && path != "/" {
bail!("Invalid opentracing_grpc_endpoint {otel_endpoint}, path must be empty");
}
}
}
plano_tracing["opentracing_grpc_endpoint"] = json!(otel_endpoint);
// Process model providers
let mut updated_model_providers: Vec<serde_json::Value> = Vec::new();
let mut model_provider_name_set: HashSet<String> = HashSet::new();
let mut model_name_keys: HashSet<String> = HashSet::new();
let mut model_usage_name_keys: HashSet<String> = HashSet::new();
let mut llms_with_endpoint: Vec<serde_json::Value> = Vec::new();
let mut llms_with_endpoint_cluster_names: HashSet<String> = HashSet::new();
let all_providers = all_supported_providers();
for listener in &listeners {
let model_providers = match listener.get("model_providers").and_then(|v| v.as_array()) {
Some(mps) if !mps.is_empty() => mps,
_ => continue,
};
for mp in model_providers {
let mut mp = mp.clone();
// Check usage
if mp.get("usage").and_then(|v| v.as_str()).is_some() {
// has usage, tracked elsewhere
}
let mp_name = mp
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if !mp_name.is_empty() && !model_provider_name_set.insert(mp_name.clone()) {
bail!("Duplicate model_provider name {mp_name}, please provide unique name for each model_provider");
}
let model_name = mp
.get("model")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
// Check wildcard
let is_wildcard = if model_name.contains('/') {
let tokens: Vec<&str> = model_name.split('/').collect();
tokens.len() >= 2 && tokens.last() == Some(&"*")
} else {
false
};
if model_name_keys.contains(&model_name) && !is_wildcard {
bail!("Duplicate model name {model_name}, please provide unique model name for each model_provider");
}
if !is_wildcard {
model_name_keys.insert(model_name.clone());
}
if mp
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.is_empty()
{
mp["name"] = json!(model_name);
}
model_provider_name_set.insert(
mp.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
);
let tokens: Vec<&str> = model_name.split('/').collect();
if tokens.len() < 2 {
bail!("Invalid model name {model_name}. Please provide model name in the format <provider>/<model_id> or <provider>/* for wildcards.");
}
let provider = tokens[0].trim();
let is_wildcard = tokens.last().map(|s| s.trim()) == Some("*");
// Validate wildcard constraints
if is_wildcard {
if mp.get("default").and_then(|v| v.as_bool()).unwrap_or(false) {
bail!("Model {model_name} is configured as default but uses wildcard (*). Default models cannot be wildcards.");
}
if mp
.get("routing_preferences")
.and_then(|v| v.as_array())
.is_some_and(|a| !a.is_empty())
{
bail!("Model {model_name} has routing_preferences but uses wildcard (*). Models with routing preferences cannot be wildcards.");
}
}
// Validate providers requiring base_url
if SUPPORTED_PROVIDERS_WITH_BASE_URL.contains(&provider)
&& mp.get("base_url").and_then(|v| v.as_str()).is_none()
{
bail!("Provider '{provider}' requires 'base_url' to be set for model {model_name}");
}
let model_id = tokens[1..].join("/");
// Handle unsupported providers
let mut provider_str = provider.to_string();
if !is_wildcard && !all_providers.contains(&provider) {
if mp.get("base_url").is_none() || mp.get("provider_interface").is_none() {
bail!("Must provide base_url and provider_interface for unsupported provider {provider} for model {model_name}. Supported providers are: {}", all_providers.join(", "));
}
provider_str = mp
.get("provider_interface")
.and_then(|v| v.as_str())
.unwrap_or(provider)
.to_string();
} else if is_wildcard && !all_providers.contains(&provider) {
if mp.get("base_url").is_none() || mp.get("provider_interface").is_none() {
bail!("Must provide base_url and provider_interface for unsupported provider {provider} for wildcard model {model_name}. Supported providers are: {}", all_providers.join(", "));
}
provider_str = mp
.get("provider_interface")
.and_then(|v| v.as_str())
.unwrap_or(provider)
.to_string();
} else if all_providers.contains(&provider)
&& mp
.get("provider_interface")
.and_then(|v| v.as_str())
.is_some()
{
bail!("Please provide provider interface as part of model name {model_name} using the format <provider>/<model_id>. For example, use 'openai/gpt-3.5-turbo' instead of 'gpt-3.5-turbo' ");
}
// Duplicate model_id check
if !is_wildcard && model_name_keys.contains(&model_id) {
bail!("Duplicate model_id {model_id}, please provide unique model_id for each model_provider");
}
if !is_wildcard {
model_name_keys.insert(model_id.clone());
}
// Validate routing preferences uniqueness
if let Some(prefs) = mp.get("routing_preferences").and_then(|v| v.as_array()) {
for pref in prefs {
if let Some(name) = pref.get("name").and_then(|v| v.as_str()) {
if !model_usage_name_keys.insert(name.to_string()) {
bail!("Duplicate routing preference name \"{name}\", please provide unique name for each routing preference");
}
}
}
}
// Warn if both passthrough_auth and access_key
if mp
.get("passthrough_auth")
.and_then(|v| v.as_bool())
.unwrap_or(false)
&& mp.get("access_key").is_some()
{
let name = mp.get("name").and_then(|v| v.as_str()).unwrap_or("unknown");
eprintln!("WARNING: Model provider '{name}' has both 'passthrough_auth: true' and 'access_key' configured. The access_key will be ignored and the client's Authorization header will be forwarded instead.");
}
mp["model"] = json!(model_id);
mp["provider_interface"] = json!(provider_str);
// Handle provider vs provider_interface
if mp.get("provider").is_some() && mp.get("provider_interface").is_some() {
bail!("Please provide either provider or provider_interface, not both");
}
if let Some(p) = mp.get("provider").cloned() {
mp["provider_interface"] = p;
mp.as_object_mut().unwrap().remove("provider");
}
updated_model_providers.push(mp.clone());
// Handle base_url → endpoint extraction
if let Some(base_url) = mp.get("base_url").and_then(|v| v.as_str()) {
if let Ok(url) = Url::parse(base_url) {
let path = url.path();
if !path.is_empty() && path != "/" {
mp["base_url_path_prefix"] = json!(path);
}
if !["http", "https"].contains(&url.scheme()) {
bail!("Please provide a valid URL with scheme (http/https) in base_url");
}
let protocol = url.scheme();
let port = url
.port()
.unwrap_or(if protocol == "http" { 80 } else { 443 });
let endpoint = url.host_str().unwrap_or("");
mp["endpoint"] = json!(endpoint);
mp["port"] = json!(port);
mp["protocol"] = json!(protocol);
let cluster_name = format!("{provider_str}_{endpoint}");
mp["cluster_name"] = json!(cluster_name);
if llms_with_endpoint_cluster_names.insert(cluster_name) {
llms_with_endpoint.push(mp.clone());
}
}
}
}
}
// Auto-add internal model providers
let overrides_config: serde_json::Value = config_yaml
.get("overrides")
.map(|v| serde_json::to_value(v).unwrap_or_default())
.unwrap_or_else(|| json!({}));
let model_name_set: HashSet<String> = updated_model_providers
.iter()
.filter_map(|mp| {
mp.get("model")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
.collect();
// Auto-add arch-router
let router_model = overrides_config
.get("llm_routing_model")
.and_then(|v| v.as_str())
.unwrap_or("Arch-Router");
let router_model_id = if router_model.contains('/') {
router_model.split_once('/').unwrap().1
} else {
router_model
};
if !model_usage_name_keys.is_empty() && !model_name_set.contains(router_model_id) {
updated_model_providers.push(json!({
"name": "arch-router",
"provider_interface": "plano",
"model": router_model_id,
"internal": true,
}));
}
// Always add arch-function
if !model_provider_name_set.contains("arch-function") {
updated_model_providers.push(json!({
"name": "arch-function",
"provider_interface": "plano",
"model": "Arch-Function",
"internal": true,
}));
}
// Auto-add plano-orchestrator
let orch_model = overrides_config
.get("agent_orchestration_model")
.and_then(|v| v.as_str())
.unwrap_or("Plano-Orchestrator");
let orch_model_id = if orch_model.contains('/') {
orch_model.split_once('/').unwrap().1
} else {
orch_model
};
if !model_name_set.contains(orch_model_id) {
updated_model_providers.push(json!({
"name": "plano/orchestrator",
"provider_interface": "plano",
"model": orch_model_id,
"internal": true,
}));
}
// Update config with processed model_providers
let mp_yaml: serde_yaml::Value =
serde_yaml::from_str(&serde_json::to_string(&updated_model_providers)?)?;
config_yaml.as_mapping_mut().unwrap().insert(
serde_yaml::Value::String("model_providers".to_string()),
mp_yaml,
);
// Validate only one listener with model_providers
let mut listeners_with_provider = 0;
for listener in &listeners {
if listener
.get("model_providers")
.and_then(|v| v.as_array())
.is_some()
{
listeners_with_provider += 1;
if listeners_with_provider > 1 {
bail!("Please provide model_providers either under listeners or at root level, not both. Currently we don't support multiple listeners with model_providers");
}
}
}
// Validate input_filters reference valid agent/filter IDs
for listener in &listeners {
if let Some(filters) = listener.get("input_filters").and_then(|v| v.as_array()) {
let listener_name = listener
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
for fc_id in filters {
if let Some(id) = fc_id.as_str() {
if !agent_id_keys.contains(id) {
let available: Vec<_> = agent_id_keys.iter().cloned().collect();
let mut available_sorted = available;
available_sorted.sort();
bail!("Listener '{listener_name}' references input_filters id '{id}' which is not defined in agents or filters. Available ids: {}", available_sorted.join(", "));
}
}
}
}
}
// Validate model aliases
if let Some(aliases) = config_yaml.get("model_aliases") {
if let Some(mapping) = aliases.as_mapping() {
for (alias_key, alias_val) in mapping {
let alias_name = alias_key.as_str().unwrap_or("");
if let Some(target) = alias_val.get("target").and_then(|v| v.as_str()) {
if !model_name_keys.contains(target) {
let mut available: Vec<_> = model_name_keys.iter().cloned().collect();
available.sort();
bail!("Model alias 2 - '{alias_name}' targets '{target}' which is not defined as a model. Available models: {}", available.join(", "));
}
}
}
}
}
// Generate rendered config strings
let plano_config_string = serde_yaml::to_string(&config_yaml)?;
// Handle agent orchestrator
let use_agent_orchestrator = overrides_config
.get("use_agent_orchestrator")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let agent_orchestrator = if use_agent_orchestrator {
if endpoints.is_empty() {
bail!("Please provide agent orchestrator in the endpoints section in your plano_config.yaml file");
} else if endpoints.len() > 1 {
bail!("Please provide single agent orchestrator in the endpoints section in your plano_config.yaml file");
} else {
Some(endpoints.keys().next().unwrap().clone())
}
} else {
None
};
let upstream_connect_timeout = overrides_config
.get("upstream_connect_timeout")
.and_then(|v| v.as_str())
.unwrap_or("5s");
let upstream_tls_ca_path = overrides_config
.get("upstream_tls_ca_path")
.and_then(|v| v.as_str())
.unwrap_or("/etc/ssl/certs/ca-certificates.crt");
// Render template
let template_filename = template_path
.file_name()
.and_then(|f| f.to_str())
.unwrap_or("envoy.template.yaml");
let mut tera = tera::Tera::default();
let template_content = std::fs::read_to_string(template_path)?;
tera.add_raw_template(template_filename, &template_content)?;
let mut context = tera::Context::new();
context.insert("prompt_gateway_listener", &prompt_gateway);
context.insert("llm_gateway_listener", &llm_gateway);
context.insert("plano_config", &plano_config_string);
context.insert("plano_llm_config", &plano_config_string);
context.insert("plano_clusters", &inferred_clusters);
context.insert("plano_model_providers", &updated_model_providers);
context.insert("plano_tracing", &plano_tracing);
context.insert("local_llms", &llms_with_endpoint);
context.insert("agent_orchestrator", &agent_orchestrator);
context.insert("listeners", &listeners);
context.insert("upstream_connect_timeout", upstream_connect_timeout);
context.insert("upstream_tls_ca_path", upstream_tls_ca_path);
let rendered = tera.render(template_filename, &context)?;
// Write output files
if let Some(parent) = envoy_output_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(envoy_output_path, &rendered)?;
if let Some(parent) = config_output_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(config_output_path, &plano_config_string)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_endpoint_and_port_with_port() {
let (host, port) = get_endpoint_and_port("example.com:8080", "http");
assert_eq!(host, "example.com");
assert_eq!(port, 8080);
}
#[test]
fn test_get_endpoint_and_port_http() {
let (host, port) = get_endpoint_and_port("example.com", "http");
assert_eq!(host, "example.com");
assert_eq!(port, 80);
}
#[test]
fn test_get_endpoint_and_port_https() {
let (host, port) = get_endpoint_and_port("example.com", "https");
assert_eq!(host, "example.com");
assert_eq!(port, 443);
}
}

View file

@ -0,0 +1,4 @@
pub mod generator;
pub mod validation;
pub use generator::validate_and_render;

View file

@ -0,0 +1,39 @@
use anyhow::{bail, Result};
use std::path::Path;
/// Validate a plano config file against the JSON schema.
pub fn validate_prompt_config(config_path: &Path, schema_path: &Path) -> Result<()> {
let config_str = std::fs::read_to_string(config_path)?;
let schema_str = std::fs::read_to_string(schema_path)?;
let config_yaml: serde_yaml::Value = serde_yaml::from_str(&config_str)?;
let schema_yaml: serde_yaml::Value = serde_yaml::from_str(&schema_str)?;
// Convert to JSON for jsonschema validation
let config_json: serde_json::Value =
serde_json::from_str(&serde_json::to_string(&config_yaml)?)?;
let schema_json: serde_json::Value =
serde_json::from_str(&serde_json::to_string(&schema_yaml)?)?;
let validator = jsonschema::validator_for(&schema_json)
.map_err(|e| anyhow::anyhow!("Invalid schema: {e}"))?;
let errors: Vec<_> = validator.iter_errors(&config_json).collect();
if !errors.is_empty() {
let mut msg = String::new();
for err in &errors {
let path = if err.instance_path.as_str().is_empty() {
"root".to_string()
} else {
err.instance_path.to_string()
};
msg.push_str(&format!(
"{}\n Location: {}\n Value: {}\n",
err, path, err.instance
));
}
bail!("{msg}");
}
Ok(())
}

View file

@ -0,0 +1,43 @@
use std::path::PathBuf;
pub const PLANO_COLOR: &str = "#969FF4";
pub const SERVICE_NAME: &str = "plano";
pub const PLANO_DOCKER_NAME: &str = "plano";
pub const PLANO_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const DEFAULT_OTEL_TRACING_GRPC_ENDPOINT: &str = "http://localhost:4317";
pub const DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT: &str = "http://localhost:4317";
pub const ENVOY_VERSION: &str = "v1.37.0";
pub const PLANO_GITHUB_REPO: &str = "katanemo/archgw";
pub fn plano_docker_image() -> String {
std::env::var("PLANO_DOCKER_IMAGE")
.unwrap_or_else(|_| format!("katanemo/plano:{PLANO_VERSION}"))
}
pub fn plano_home() -> PathBuf {
dirs::home_dir()
.expect("could not determine home directory")
.join(".plano")
}
pub fn plano_run_dir() -> PathBuf {
plano_home().join("run")
}
pub fn plano_bin_dir() -> PathBuf {
plano_home().join("bin")
}
pub fn plano_plugins_dir() -> PathBuf {
plano_home().join("plugins")
}
pub fn native_pid_file() -> PathBuf {
plano_run_dir().join("plano.pid")
}
pub fn plano_release_base_url() -> String {
format!("https://github.com/{PLANO_GITHUB_REPO}/releases/download")
}

View file

@ -0,0 +1,211 @@
use std::collections::HashMap;
use std::path::Path;
use std::process::Command;
use anyhow::{bail, Result};
use crate::consts::{plano_docker_image, PLANO_DOCKER_NAME};
/// Get Docker container status.
pub async fn container_status(container: &str) -> Result<String> {
let output = Command::new("docker")
.args(["inspect", "--type=container", container])
.output()?;
if !output.status.success() {
return Ok("not found".to_string());
}
let json: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(json[0]["State"]["Status"]
.as_str()
.unwrap_or("")
.to_string())
}
/// Validate config using Docker.
pub async fn validate_config(plano_config_path: &Path) -> Result<()> {
let abs_path = std::fs::canonicalize(plano_config_path)?;
let args = vec![
"docker".to_string(),
"run".to_string(),
"--rm".to_string(),
"-v".to_string(),
format!("{}:/app/plano_config.yaml:ro", abs_path.display()),
"--entrypoint".to_string(),
"python".to_string(),
plano_docker_image(),
"-m".to_string(),
"planoai.config_generator".to_string(),
];
let output = Command::new(&args[0]).args(&args[1..]).output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("{}", stderr.trim());
}
Ok(())
}
/// Start Plano in Docker.
pub async fn start_plano(
plano_config_path: &Path,
env: &HashMap<String, String>,
foreground: bool,
) -> Result<()> {
let abs_path = std::fs::canonicalize(plano_config_path)?;
// Prepare config (replace localhost → host.docker.internal)
let config_content = std::fs::read_to_string(&abs_path)?;
let docker_config = if config_content.contains("localhost") {
let replaced = config_content.replace("localhost", "host.docker.internal");
let tmp = std::env::temp_dir().join("plano_config_docker.yaml");
std::fs::write(&tmp, &replaced)?;
tmp
} else {
abs_path.clone()
};
// Get gateway ports
let config: serde_yaml::Value = serde_yaml::from_str(&std::fs::read_to_string(&abs_path)?)?;
let mut gateway_ports = Vec::new();
if let Some(listeners) = config.get("listeners").and_then(|v| v.as_sequence()) {
for listener in listeners {
if let Some(port) = listener.get("port").and_then(|v| v.as_u64()) {
gateway_ports.push(port as u16);
}
}
}
if gateway_ports.is_empty() {
gateway_ports.push(12000);
}
// Build docker run command
let mut docker_args = vec![
"run".to_string(),
"-d".to_string(),
"--name".to_string(),
PLANO_DOCKER_NAME.to_string(),
];
// Port mappings
docker_args.extend(["-p".to_string(), "12001:12001".to_string()]);
docker_args.extend(["-p".to_string(), "19901:9901".to_string()]);
for port in &gateway_ports {
docker_args.extend(["-p".to_string(), format!("{port}:{port}")]);
}
// Volume
docker_args.extend([
"-v".to_string(),
format!("{}:/app/plano_config.yaml:ro", docker_config.display()),
]);
// Environment variables
for (k, v) in env {
docker_args.extend(["-e".to_string(), format!("{k}={v}")]);
}
docker_args.extend([
"--add-host".to_string(),
"host.docker.internal:host-gateway".to_string(),
plano_docker_image(),
]);
let output = Command::new("docker").args(&docker_args).output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("Docker run failed: {}", stderr.trim());
}
// Health check
let green = console::Style::new().green();
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(60);
loop {
let mut all_healthy = true;
for &port in &gateway_ports {
if reqwest::get(&format!("http://localhost:{port}/healthz"))
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
{
continue;
}
all_healthy = false;
}
if all_healthy {
eprintln!("{} Plano is running (Docker mode)", green.apply_to(""));
for &port in &gateway_ports {
eprintln!(" http://localhost:{port}");
}
break;
}
if start.elapsed() > timeout {
bail!("Health check timed out after 60s");
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if foreground {
// Stream logs
let mut child = tokio::process::Command::new("docker")
.args(["logs", "-f", PLANO_DOCKER_NAME])
.spawn()?;
tokio::select! {
_ = child.wait() => {}
_ = tokio::signal::ctrl_c() => {
let _ = child.kill().await;
stop_container().await?;
}
}
}
Ok(())
}
/// Stop Docker container.
pub async fn stop_container() -> Result<()> {
let _ = Command::new("docker")
.args(["stop", PLANO_DOCKER_NAME])
.output();
let _ = Command::new("docker")
.args(["rm", "-f", PLANO_DOCKER_NAME])
.output();
let green = console::Style::new().green();
eprintln!("{} Plano stopped (Docker mode).", green.apply_to(""));
Ok(())
}
/// Stream Docker logs.
pub async fn stream_logs(_debug: bool, follow: bool) -> Result<()> {
let mut args = vec!["logs".to_string()];
if follow {
args.push("-f".to_string());
}
args.push(PLANO_DOCKER_NAME.to_string());
let mut child = tokio::process::Command::new("docker").args(&args).spawn()?;
tokio::select! {
result = child.wait() => {
result?;
}
_ = tokio::signal::ctrl_c() => {
let _ = child.kill().await;
}
}
Ok(())
}

View file

@ -0,0 +1,22 @@
#![allow(dead_code)]
mod commands;
mod config;
mod consts;
mod docker;
mod native;
mod trace;
mod utils;
mod version;
use clap::Parser;
use commands::Cli;
#[tokio::main]
async fn main() {
let cli = Cli::parse();
if let Err(e) = commands::run(cli).await {
eprintln!("Error: {e:#}");
std::process::exit(1);
}
}

View file

@ -0,0 +1,268 @@
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use anyhow::{bail, Result};
use indicatif::{ProgressBar, ProgressStyle};
use crate::consts::{
plano_bin_dir, plano_plugins_dir, plano_release_base_url, ENVOY_VERSION, PLANO_VERSION,
};
use crate::utils::find_repo_root;
/// Get the platform slug for binary downloads.
fn get_platform_slug() -> Result<&'static str> {
let os = std::env::consts::OS;
let arch = std::env::consts::ARCH;
match (os, arch) {
("linux", "x86_64") => Ok("linux-amd64"),
("linux", "aarch64") => Ok("linux-arm64"),
("macos", "aarch64") => Ok("darwin-arm64"),
("macos", "x86_64") => {
bail!("macOS x86_64 (Intel) is not supported. Pre-built binaries are only available for Apple Silicon (arm64).");
}
_ => bail!(
"Unsupported platform {os}/{arch}. Supported: linux-amd64, linux-arm64, darwin-arm64"
),
}
}
/// Download a file with a progress bar.
async fn download_file(url: &str, dest: &Path, label: &str) -> Result<()> {
let client = reqwest::Client::new();
let resp = client.get(url).send().await?;
if !resp.status().is_success() {
bail!("Download failed: HTTP {}", resp.status());
}
let total = resp.content_length().unwrap_or(0);
let pb = ProgressBar::new(total);
pb.set_style(
ProgressStyle::default_bar()
.template(&format!(
" {label} {{bar:30}} {{percent}}% ({{bytes}}/{{total_bytes}})"
))
.unwrap()
.progress_chars("█░░"),
);
let bytes = resp.bytes().await?;
pb.set_position(bytes.len() as u64);
pb.finish();
println!();
fs::write(dest, &bytes)?;
Ok(())
}
/// Check for locally-built WASM plugins.
fn find_local_wasm_plugins() -> Option<(PathBuf, PathBuf)> {
let repo_root = find_repo_root()?;
let wasm_dir = repo_root.join("crates/target/wasm32-wasip1/release");
let prompt_gw = wasm_dir.join("prompt_gateway.wasm");
let llm_gw = wasm_dir.join("llm_gateway.wasm");
if prompt_gw.exists() && llm_gw.exists() {
Some((prompt_gw, llm_gw))
} else {
None
}
}
/// Check for locally-built brightstaff binary.
fn find_local_brightstaff() -> Option<PathBuf> {
let repo_root = find_repo_root()?;
let path = repo_root.join("crates/target/release/brightstaff");
if path.exists() {
Some(path)
} else {
None
}
}
/// Ensure Envoy binary is available. Returns path to binary.
pub async fn ensure_envoy_binary() -> Result<PathBuf> {
let bin_dir = plano_bin_dir();
let envoy_path = bin_dir.join("envoy");
let version_path = bin_dir.join("envoy.version");
if envoy_path.exists() {
if let Ok(cached) = fs::read_to_string(&version_path) {
if cached.trim() == ENVOY_VERSION {
tracing::info!("Envoy {} (cached)", ENVOY_VERSION);
return Ok(envoy_path);
}
tracing::info!("Envoy version changed, re-downloading...");
}
}
let slug = get_platform_slug()?;
let url = format!(
"https://github.com/tetratelabs/archive-envoy/releases/download/{ENVOY_VERSION}/envoy-{ENVOY_VERSION}-{slug}.tar.xz"
);
fs::create_dir_all(&bin_dir)?;
let tmp_path = bin_dir.join("envoy.tar.xz");
download_file(&url, &tmp_path, &format!("Envoy {ENVOY_VERSION}")).await?;
tracing::info!("Extracting Envoy {}...", ENVOY_VERSION);
// Extract using tar command (tar.xz not well supported by Rust tar crate)
let status = tokio::process::Command::new("tar")
.args([
"xf",
&tmp_path.to_string_lossy(),
"-C",
&bin_dir.to_string_lossy(),
])
.status()
.await?;
if !status.success() {
bail!("Failed to extract Envoy archive");
}
// Find and move the envoy binary
let mut found = false;
for entry in walkdir(&bin_dir)? {
if entry.file_name() == Some(std::ffi::OsStr::new("envoy")) && entry != envoy_path {
fs::copy(&entry, &envoy_path)?;
found = true;
break;
}
}
// Clean up extracted directories
for entry in fs::read_dir(&bin_dir)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
let _ = fs::remove_dir_all(entry.path());
}
}
let _ = fs::remove_file(&tmp_path);
if !found && !envoy_path.exists() {
bail!("Could not find envoy binary in the downloaded archive");
}
fs::set_permissions(&envoy_path, fs::Permissions::from_mode(0o755))?;
fs::write(&version_path, ENVOY_VERSION)?;
Ok(envoy_path)
}
/// Simple recursive file walker.
fn walkdir(dir: &Path) -> Result<Vec<PathBuf>> {
let mut results = Vec::new();
if dir.is_dir() {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
results.extend(walkdir(&path)?);
} else {
results.push(path);
}
}
}
Ok(results)
}
/// Ensure WASM plugins are available. Returns (prompt_gw_path, llm_gw_path).
pub async fn ensure_wasm_plugins() -> Result<(PathBuf, PathBuf)> {
// 1. Local source build
if let Some(local) = find_local_wasm_plugins() {
tracing::info!("Using locally-built WASM plugins");
return Ok(local);
}
// 2. Cached download
let plugins_dir = plano_plugins_dir();
let version_path = plugins_dir.join("wasm.version");
let prompt_gw_path = plugins_dir.join("prompt_gateway.wasm");
let llm_gw_path = plugins_dir.join("llm_gateway.wasm");
if prompt_gw_path.exists() && llm_gw_path.exists() {
if let Ok(cached) = fs::read_to_string(&version_path) {
if cached.trim() == PLANO_VERSION {
tracing::info!("WASM plugins {} (cached)", PLANO_VERSION);
return Ok((prompt_gw_path, llm_gw_path));
}
}
}
// 3. Download
fs::create_dir_all(&plugins_dir)?;
let base = plano_release_base_url();
for (name, dest) in [
("prompt_gateway.wasm", &prompt_gw_path),
("llm_gateway.wasm", &llm_gw_path),
] {
let url = format!("{base}/{PLANO_VERSION}/{name}.gz");
let gz_dest = dest.with_extension("wasm.gz");
download_file(&url, &gz_dest, &format!("{name} ({PLANO_VERSION})")).await?;
// Decompress
tracing::info!("Decompressing {name}...");
let gz_data = fs::read(&gz_dest)?;
let mut decoder = flate2::read::GzDecoder::new(&gz_data[..]);
let mut out = fs::File::create(dest)?;
std::io::copy(&mut decoder, &mut out)?;
let _ = fs::remove_file(&gz_dest);
}
fs::write(&version_path, PLANO_VERSION)?;
Ok((prompt_gw_path, llm_gw_path))
}
/// Ensure brightstaff binary is available. Returns path.
pub async fn ensure_brightstaff_binary() -> Result<PathBuf> {
// 1. Local source build
if let Some(local) = find_local_brightstaff() {
tracing::info!("Using locally-built brightstaff");
return Ok(local);
}
// 2. Cached download
let bin_dir = plano_bin_dir();
let brightstaff_path = bin_dir.join("brightstaff");
let version_path = bin_dir.join("brightstaff.version");
if brightstaff_path.exists() {
if let Ok(cached) = fs::read_to_string(&version_path) {
if cached.trim() == PLANO_VERSION {
tracing::info!("brightstaff {} (cached)", PLANO_VERSION);
return Ok(brightstaff_path);
}
}
}
// 3. Download
let slug = get_platform_slug()?;
let url = format!(
"{}/{PLANO_VERSION}/brightstaff-{slug}.gz",
plano_release_base_url()
);
fs::create_dir_all(&bin_dir)?;
let gz_path = bin_dir.join("brightstaff.gz");
download_file(
&url,
&gz_path,
&format!("brightstaff ({PLANO_VERSION}, {slug})"),
)
.await?;
tracing::info!("Decompressing brightstaff...");
let gz_data = fs::read(&gz_path)?;
let mut decoder = flate2::read::GzDecoder::new(&gz_data[..]);
let mut out = fs::File::create(&brightstaff_path)?;
std::io::copy(&mut decoder, &mut out)?;
let _ = fs::remove_file(&gz_path);
fs::set_permissions(&brightstaff_path, fs::Permissions::from_mode(0o755))?;
fs::write(&version_path, PLANO_VERSION)?;
Ok(brightstaff_path)
}

View file

@ -0,0 +1,2 @@
pub mod binaries;
pub mod runner;

View file

@ -0,0 +1,481 @@
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use crate::config;
use crate::consts::{native_pid_file, plano_run_dir};
use crate::native::binaries;
use crate::utils::{expand_env_vars, find_repo_root, is_pid_alive};
/// Find the config directory containing schema and templates.
fn find_config_dir() -> Result<PathBuf> {
// Check repo root first
if let Some(repo_root) = find_repo_root() {
let config_dir = repo_root.join("config");
if config_dir.is_dir() && config_dir.join("plano_config_schema.yaml").exists() {
return Ok(config_dir);
}
}
// Check if installed alongside the binary
if let Ok(exe) = std::env::current_exe() {
if let Some(parent) = exe.parent() {
let config_dir = parent.join("config");
if config_dir.is_dir() {
return Ok(config_dir);
}
// Check ../config (for bin/plano layout)
if let Some(grandparent) = parent.parent() {
let config_dir = grandparent.join("config");
if config_dir.is_dir() {
return Ok(config_dir);
}
}
}
}
bail!("Could not find config templates. Make sure you're inside the plano repository or have the config directory available.")
}
/// Validate config without starting processes.
pub fn validate_config(plano_config_path: &Path) -> Result<()> {
let config_dir = find_config_dir()?;
let run_dir = plano_run_dir();
fs::create_dir_all(&run_dir)?;
config::validate_and_render(
plano_config_path,
&config_dir.join("plano_config_schema.yaml"),
&config_dir.join("envoy.template.yaml"),
&run_dir.join("envoy.yaml"),
&run_dir.join("plano_config_rendered.yaml"),
)
}
/// Render native config. Returns (envoy_config_path, plano_config_rendered_path).
pub async fn render_native_config(
plano_config_path: &Path,
env: &HashMap<String, String>,
with_tracing: bool,
) -> Result<(PathBuf, PathBuf)> {
let run_dir = plano_run_dir();
fs::create_dir_all(&run_dir)?;
let (prompt_gw_path, llm_gw_path) = binaries::ensure_wasm_plugins().await?;
// If --with-tracing, inject tracing config if not already present
let effective_config_path = if with_tracing {
let content = fs::read_to_string(plano_config_path)?;
let mut config: serde_yaml::Value = serde_yaml::from_str(&content)?;
let tracing = config.as_mapping_mut().and_then(|m| {
m.entry(serde_yaml::Value::String("tracing".to_string()))
.or_insert(serde_yaml::Value::Mapping(serde_yaml::Mapping::new()))
.as_mapping_mut()
});
if let Some(tracing) = tracing {
if !tracing.contains_key(serde_yaml::Value::String("random_sampling".to_string())) {
tracing.insert(
serde_yaml::Value::String("random_sampling".to_string()),
serde_yaml::Value::Number(serde_yaml::Number::from(100)),
);
}
}
let path = run_dir.join("config_with_tracing.yaml");
fs::write(&path, serde_yaml::to_string(&config)?)?;
path
} else {
plano_config_path.to_path_buf()
};
let envoy_config_path = run_dir.join("envoy.yaml");
let plano_config_rendered_path = run_dir.join("plano_config_rendered.yaml");
let config_dir = find_config_dir()?;
// Temporarily set env vars for config rendering
for (k, v) in env {
std::env::set_var(k, v);
}
config::validate_and_render(
&effective_config_path,
&config_dir.join("plano_config_schema.yaml"),
&config_dir.join("envoy.template.yaml"),
&envoy_config_path,
&plano_config_rendered_path,
)?;
// Post-process envoy.yaml: replace Docker paths with local paths
let mut envoy_content = fs::read_to_string(&envoy_config_path)?;
envoy_content = envoy_content.replace(
"/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm",
&prompt_gw_path.to_string_lossy(),
);
envoy_content = envoy_content.replace(
"/etc/envoy/proxy-wasm-plugins/llm_gateway.wasm",
&llm_gw_path.to_string_lossy(),
);
// Replace /var/log/ with local log directory
let log_dir = run_dir.join("logs");
fs::create_dir_all(&log_dir)?;
envoy_content = envoy_content.replace("/var/log/", &format!("{}/", log_dir.display()));
// Platform-specific CA cert path
if cfg!(target_os = "macos") {
envoy_content =
envoy_content.replace("/etc/ssl/certs/ca-certificates.crt", "/etc/ssl/cert.pem");
}
fs::write(&envoy_config_path, &envoy_content)?;
// Run envsubst-equivalent on both rendered files
for path in [&envoy_config_path, &plano_config_rendered_path] {
let content = fs::read_to_string(path)?;
let expanded = expand_env_vars(&content);
fs::write(path, expanded)?;
}
Ok((envoy_config_path, plano_config_rendered_path))
}
/// Start Envoy and brightstaff natively.
pub async fn start_native(
plano_config_path: &Path,
env: &HashMap<String, String>,
foreground: bool,
with_tracing: bool,
) -> Result<()> {
let pid_file = native_pid_file();
let run_dir = plano_run_dir();
// Stop existing instance
if pid_file.exists() {
tracing::info!("Stopping existing Plano instance...");
stop_native()?;
}
let envoy_path = binaries::ensure_envoy_binary().await?;
binaries::ensure_wasm_plugins().await?;
let brightstaff_path = binaries::ensure_brightstaff_binary().await?;
let (envoy_config_path, plano_config_rendered_path) =
render_native_config(plano_config_path, env, with_tracing).await?;
tracing::info!("Configuration rendered");
let log_dir = run_dir.join("logs");
fs::create_dir_all(&log_dir)?;
let log_level = env.get("LOG_LEVEL").map(|s| s.as_str()).unwrap_or("info");
// Build env for subprocesses
let mut proc_env: HashMap<String, String> = std::env::vars().collect();
proc_env.insert("RUST_LOG".to_string(), log_level.to_string());
proc_env.insert(
"PLANO_CONFIG_PATH_RENDERED".to_string(),
plano_config_rendered_path.to_string_lossy().to_string(),
);
for (k, v) in env {
proc_env.insert(k.clone(), v.clone());
}
// Start brightstaff
let brightstaff_pid = daemon_exec(
&[brightstaff_path.to_string_lossy().to_string()],
&proc_env,
&log_dir.join("brightstaff.log"),
)?;
tracing::info!("Started brightstaff (PID {brightstaff_pid})");
// Start envoy
let envoy_pid = daemon_exec(
&[
envoy_path.to_string_lossy().to_string(),
"-c".to_string(),
envoy_config_path.to_string_lossy().to_string(),
"--component-log-level".to_string(),
format!("wasm:{log_level}"),
"--log-format".to_string(),
"[%Y-%m-%d %T.%e][%l] %v".to_string(),
],
&proc_env,
&log_dir.join("envoy.log"),
)?;
tracing::info!("Started envoy (PID {envoy_pid})");
// Save PIDs
fs::create_dir_all(plano_run_dir())?;
let pids = serde_json::json!({
"envoy_pid": envoy_pid,
"brightstaff_pid": brightstaff_pid,
});
fs::write(&pid_file, serde_json::to_string(&pids)?)?;
// Health check
let gateway_ports = get_gateway_ports(plano_config_path)?;
tracing::info!("Waiting for listeners to become healthy...");
let start = Instant::now();
let timeout = Duration::from_secs(60);
let green = console::Style::new().green();
loop {
let mut all_healthy = true;
for &port in &gateway_ports {
if !health_check_endpoint(&format!("http://localhost:{port}/healthz")).await {
all_healthy = false;
}
}
if all_healthy {
eprintln!("{} Plano is running (native mode)", green.apply_to(""));
for &port in &gateway_ports {
eprintln!(" http://localhost:{port}");
}
break;
}
if !is_pid_alive(brightstaff_pid) {
bail!(
"brightstaff exited unexpectedly. Check logs: {}",
log_dir.join("brightstaff.log").display()
);
}
if !is_pid_alive(envoy_pid) {
bail!(
"envoy exited unexpectedly. Check logs: {}",
log_dir.join("envoy.log").display()
);
}
if start.elapsed() > timeout {
stop_native()?;
bail!(
"Health check timed out after 60s. Check logs in: {}",
log_dir.display()
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
if foreground {
tracing::info!("Running in foreground. Press Ctrl+C to stop.");
tracing::info!("Logs: {}", log_dir.display());
let mut log_files = vec![
log_dir.join("envoy.log").to_string_lossy().to_string(),
log_dir
.join("brightstaff.log")
.to_string_lossy()
.to_string(),
];
// Add access logs
if let Ok(entries) = fs::read_dir(&log_dir) {
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() {
if name.starts_with("access_") && name.ends_with(".log") {
log_files.push(entry.path().to_string_lossy().to_string());
}
}
}
}
let mut tail_args = vec!["tail".to_string(), "-f".to_string()];
tail_args.extend(log_files);
let mut child = tokio::process::Command::new(&tail_args[0])
.args(&tail_args[1..])
.spawn()?;
tokio::select! {
_ = child.wait() => {}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Stopping Plano...");
let _ = child.kill().await;
stop_native()?;
}
}
} else {
tracing::info!("Logs: {}", log_dir.display());
tracing::info!("Run 'plano down' to stop.");
}
Ok(())
}
/// Double-fork daemon execution. Returns the grandchild PID.
fn daemon_exec(args: &[String], env: &HashMap<String, String>, log_path: &Path) -> Result<i32> {
use std::process::{Command, Stdio};
let log_file = fs::File::create(log_path)?;
let child = Command::new(&args[0])
.args(&args[1..])
.envs(env)
.stdin(Stdio::null())
.stdout(log_file.try_clone()?)
.stderr(log_file)
.spawn()?;
Ok(child.id() as i32)
}
/// Stop natively-running Envoy and brightstaff processes.
pub fn stop_native() -> Result<()> {
let pid_file = native_pid_file();
if !pid_file.exists() {
tracing::info!("No native Plano instance found (PID file missing).");
return Ok(());
}
let content = fs::read_to_string(&pid_file)?;
let pids: serde_json::Value = serde_json::from_str(&content)?;
let envoy_pid = pids.get("envoy_pid").and_then(|v| v.as_i64());
let brightstaff_pid = pids.get("brightstaff_pid").and_then(|v| v.as_i64());
for (name, pid) in [("envoy", envoy_pid), ("brightstaff", brightstaff_pid)] {
let Some(pid) = pid else { continue };
let pid = pid as i32;
let nix_pid = Pid::from_raw(pid);
match kill(nix_pid, Signal::SIGTERM) {
Ok(()) => {
tracing::info!("Sent SIGTERM to {name} (PID {pid})");
}
Err(nix::errno::Errno::ESRCH) => {
tracing::info!("{name} (PID {pid}) already stopped");
continue;
}
Err(e) => {
tracing::error!("Error stopping {name} (PID {pid}): {e}");
continue;
}
}
// Wait for graceful shutdown
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if Instant::now() > deadline {
let _ = kill(nix_pid, Signal::SIGKILL);
tracing::info!("Sent SIGKILL to {name} (PID {pid})");
break;
}
if !is_pid_alive(pid) {
break;
}
std::thread::sleep(Duration::from_millis(500));
}
}
let _ = fs::remove_file(&pid_file);
let green = console::Style::new().green();
eprintln!("{} Plano stopped (native mode).", green.apply_to(""));
Ok(())
}
/// Stream native logs.
pub fn native_logs(debug: bool, follow: bool) -> Result<()> {
let log_dir = plano_run_dir().join("logs");
if !log_dir.is_dir() {
bail!(
"No native log directory found at {}. Is Plano running?",
log_dir.display()
);
}
let mut log_files: Vec<String> = Vec::new();
// Collect access logs
if let Ok(entries) = fs::read_dir(&log_dir) {
let mut access_logs: Vec<_> = entries
.flatten()
.filter(|e| {
e.file_name()
.to_str()
.map(|n| n.starts_with("access_") && n.ends_with(".log"))
.unwrap_or(false)
})
.map(|e| e.path().to_string_lossy().to_string())
.collect();
access_logs.sort();
log_files.extend(access_logs);
}
if debug {
log_files.push(log_dir.join("envoy.log").to_string_lossy().to_string());
log_files.push(
log_dir
.join("brightstaff.log")
.to_string_lossy()
.to_string(),
);
}
// Filter to existing files
log_files.retain(|f| Path::new(f).exists());
if log_files.is_empty() {
bail!("No log files found in {}", log_dir.display());
}
let mut tail_args = vec!["tail".to_string()];
if follow {
tail_args.push("-f".to_string());
}
tail_args.extend(log_files);
let mut child = std::process::Command::new(&tail_args[0])
.args(&tail_args[1..])
.spawn()?;
let _ = child.wait();
Ok(())
}
/// Get gateway ports from config.
fn get_gateway_ports(plano_config_path: &Path) -> Result<Vec<u16>> {
let content = fs::read_to_string(plano_config_path)?;
let config: serde_yaml::Value = serde_yaml::from_str(&content)?;
let mut ports = Vec::new();
if let Some(listeners) = config.get("listeners") {
if let Some(seq) = listeners.as_sequence() {
for listener in seq {
if let Some(port) = listener.get("port").and_then(|v| v.as_u64()) {
ports.push(port as u16);
}
}
} else if let Some(map) = listeners.as_mapping() {
for (_, v) in map {
if let Some(port) = v.get("port").and_then(|v| v.as_u64()) {
ports.push(port as u16);
}
}
}
}
ports.sort();
ports.dedup();
if ports.is_empty() {
ports.push(12000); // default
}
Ok(ports)
}
/// Health check an endpoint.
async fn health_check_endpoint(url: &str) -> bool {
reqwest::get(url)
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
}

View file

@ -0,0 +1,60 @@
use std::fs;
use std::path::PathBuf;
use anyhow::Result;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use crate::consts::plano_run_dir;
pub fn pid_file_path() -> PathBuf {
plano_run_dir().join("trace_listener.pid")
}
pub fn log_file_path() -> PathBuf {
plano_run_dir().join("trace_listener.log")
}
pub fn write_listener_pid(pid: u32) -> Result<()> {
let run_dir = plano_run_dir();
fs::create_dir_all(&run_dir)?;
fs::write(pid_file_path(), pid.to_string())?;
Ok(())
}
pub fn remove_listener_pid() -> Result<()> {
let path = pid_file_path();
if path.exists() {
fs::remove_file(path)?;
}
Ok(())
}
pub fn get_listener_pid() -> Option<u32> {
let content = fs::read_to_string(pid_file_path()).ok()?;
let pid: u32 = content.trim().parse().ok()?;
// Check if alive
if kill(Pid::from_raw(pid as i32), None).is_ok() {
Some(pid)
} else {
None
}
}
pub fn stop_listener_process() -> Result<()> {
if let Some(pid) = get_listener_pid() {
let nix_pid = Pid::from_raw(pid as i32);
let _ = kill(nix_pid, Signal::SIGTERM);
// Brief wait
std::thread::sleep(std::time::Duration::from_millis(500));
// Force kill if still alive
if kill(nix_pid, None).is_ok() {
let _ = kill(nix_pid, Signal::SIGKILL);
}
}
remove_listener_pid()?;
Ok(())
}

View file

@ -0,0 +1,16 @@
use anyhow::Result;
use crate::trace::daemon;
pub async fn run() -> Result<()> {
let green = console::Style::new().green();
if daemon::get_listener_pid().is_none() {
eprintln!("No trace listener running.");
return Ok(());
}
daemon::stop_listener_process()?;
eprintln!("{} Trace listener stopped.", green.apply_to(""));
Ok(())
}

View file

@ -0,0 +1,58 @@
use anyhow::Result;
use crate::trace::daemon;
use crate::trace::store::TraceStore;
/// Start the trace listener in the foreground.
pub async fn run(host: &str, port: u16) -> Result<()> {
let green = console::Style::new().green();
let cyan = console::Style::new().cyan();
// Check if already running
if let Some(pid) = daemon::get_listener_pid() {
eprintln!(
"{} Trace listener already running (PID {pid})",
green.apply_to("")
);
return Ok(());
}
eprintln!(
"{} Starting trace listener on {}",
green.apply_to(""),
cyan.apply_to(format!("{host}:{port}"))
);
// Start as a background task in this process
start_background(port).await?;
// Write PID
daemon::write_listener_pid(std::process::id())?;
// Wait forever (until ctrl+c)
tokio::signal::ctrl_c().await?;
daemon::remove_listener_pid()?;
eprintln!("\nTrace listener stopped.");
Ok(())
}
/// Start the trace listener in the background (within the current process).
pub async fn start_background(port: u16) -> Result<()> {
let store = TraceStore::shared();
// TODO: Implement gRPC OTLP listener using tonic
// For now, spawn a placeholder task
let _store = store.clone();
tokio::spawn(async move {
// The actual gRPC server will be implemented here
// using tonic with the OTLP ExportTraceServiceRequest handler
tracing::info!("Trace listener background task started on port {port}");
// Keep running
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
});
Ok(())
}

View file

@ -0,0 +1,6 @@
pub mod daemon;
pub mod down;
pub mod listen;
pub mod show;
pub mod store;
pub mod tail;

View file

@ -0,0 +1,18 @@
use anyhow::{bail, Result};
pub async fn run(trace_id: &str, verbose: bool) -> Result<()> {
// TODO: Connect to trace listener via gRPC and fetch trace
// For now, print a placeholder
println!("Showing trace: {trace_id}");
if verbose {
println!("(verbose mode)");
}
// The full implementation will:
// 1. Connect to the gRPC trace query service
// 2. Fetch the trace by ID
// 3. Build a span tree
// 4. Render it using console styling
bail!("Trace show is not yet fully implemented. The gRPC trace query service needs to be running.")
}

View file

@ -0,0 +1,118 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
const MAX_TRACES: usize = 50;
const MAX_SPANS_PER_TRACE: usize = 500;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Span {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub name: String,
pub start_time_unix_nano: u64,
pub end_time_unix_nano: u64,
pub status: Option<SpanStatus>,
pub attributes: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SpanStatus {
pub code: i32,
pub message: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Trace {
pub trace_id: String,
pub spans: Vec<Span>,
pub root_span: Option<String>,
pub start_time: u64,
}
pub type SharedTraceStore = Arc<RwLock<TraceStore>>;
#[derive(Debug, Default)]
pub struct TraceStore {
traces: HashMap<String, Trace>,
trace_order: Vec<String>,
/// Maps span_id → group_trace_id (for merging traces via parent_span_id links)
span_to_group: HashMap<String, String>,
}
impl TraceStore {
pub fn new() -> Self {
Self::default()
}
pub fn shared() -> SharedTraceStore {
Arc::new(RwLock::new(Self::new()))
}
pub fn add_spans(&mut self, spans: Vec<Span>) {
for span in spans {
let trace_id = span.trace_id.clone();
// Check if this span belongs to an existing group via parent_span_id
let group_id = span
.parent_span_id
.as_ref()
.and_then(|pid| self.span_to_group.get(pid).cloned())
.unwrap_or_else(|| trace_id.clone());
// Register this span's ID in the group
self.span_to_group
.insert(span.span_id.clone(), group_id.clone());
let trace = self.traces.entry(group_id.clone()).or_insert_with(|| {
self.trace_order.push(group_id.clone());
Trace {
trace_id: group_id.clone(),
spans: Vec::new(),
root_span: None,
start_time: span.start_time_unix_nano,
}
});
// Dedup by span_id
if trace.spans.iter().any(|s| s.span_id == span.span_id) {
continue;
}
// Track root span
if span.parent_span_id.is_none() || span.parent_span_id.as_deref() == Some("") {
trace.root_span = Some(span.span_id.clone());
}
if trace.spans.len() < MAX_SPANS_PER_TRACE {
trace.spans.push(span);
}
}
// Evict oldest traces
while self.trace_order.len() > MAX_TRACES {
if let Some(oldest) = self.trace_order.first().cloned() {
self.trace_order.remove(0);
if let Some(trace) = self.traces.remove(&oldest) {
for span in &trace.spans {
self.span_to_group.remove(&span.span_id);
}
}
}
}
}
pub fn get_traces(&self) -> Vec<&Trace> {
self.trace_order
.iter()
.rev()
.filter_map(|id| self.traces.get(id))
.collect()
}
pub fn get_trace(&self, trace_id: &str) -> Option<&Trace> {
self.traces.get(trace_id)
}
}

View file

@ -0,0 +1,68 @@
use anyhow::{bail, Result};
pub async fn run(
include_spans: Option<&str>,
exclude_spans: Option<&str>,
where_filters: &[String],
since: Option<&str>,
_verbose: bool,
) -> Result<()> {
// TODO: Connect to trace listener via gRPC and tail traces
// For now, print a placeholder
println!("Tailing traces...");
if let Some(inc) = include_spans {
println!(" include: {inc}");
}
if let Some(exc) = exclude_spans {
println!(" exclude: {exc}");
}
for w in where_filters {
println!(" where: {w}");
}
if let Some(s) = since {
println!(" since: {s}");
}
// The full implementation will:
// 1. Connect to the gRPC trace query service
// 2. Fetch recent traces
// 3. Apply filters
// 4. Render matching traces
bail!("Trace tail is not yet fully implemented. The gRPC trace query service needs to be running.")
}
/// Parse a "since" string like "10s", "5m", "1h", "7d" into seconds.
pub fn parse_since_seconds(since: &str) -> Option<u64> {
let since = since.trim();
if since.is_empty() {
return None;
}
let (num_str, unit) = since.split_at(since.len() - 1);
let num: u64 = num_str.parse().ok()?;
match unit {
"s" => Some(num),
"m" => Some(num * 60),
"h" => Some(num * 3600),
"d" => Some(num * 86400),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_since_seconds() {
assert_eq!(parse_since_seconds("10s"), Some(10));
assert_eq!(parse_since_seconds("5m"), Some(300));
assert_eq!(parse_since_seconds("1h"), Some(3600));
assert_eq!(parse_since_seconds("7d"), Some(604800));
assert_eq!(parse_since_seconds(""), None);
assert_eq!(parse_since_seconds("abc"), None);
}
}

View file

@ -0,0 +1,237 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::{bail, Result};
use regex::Regex;
/// Find the repository root by looking for Dockerfile + crates + config dirs.
pub fn find_repo_root() -> Option<PathBuf> {
let mut current = std::env::current_dir().ok()?;
loop {
if current.join("Dockerfile").exists()
&& current.join("crates").exists()
&& current.join("config").exists()
{
return Some(current);
}
if current.join(".git").exists() && current.join("crates").exists() {
return Some(current);
}
if !current.pop() {
break;
}
}
None
}
/// Find the appropriate config file path.
pub fn find_config_file(path: &str, file: Option<&str>) -> PathBuf {
if let Some(f) = file {
return PathBuf::from(f)
.canonicalize()
.unwrap_or_else(|_| PathBuf::from(f));
}
let config_yaml = Path::new(path).join("config.yaml");
if config_yaml.exists() {
std::fs::canonicalize(&config_yaml).unwrap_or(config_yaml)
} else {
let plano_config = Path::new(path).join("plano_config.yaml");
std::fs::canonicalize(&plano_config).unwrap_or(plano_config)
}
}
/// Parse a .env file into a HashMap.
pub fn load_env_file(path: &Path) -> Result<HashMap<String, String>> {
let content = std::fs::read_to_string(path)?;
let mut map = HashMap::new();
for line in content.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((key, value)) = line.split_once('=') {
map.insert(key.trim().to_string(), value.trim().to_string());
}
}
Ok(map)
}
/// Extract LLM provider access keys from config YAML.
pub fn get_llm_provider_access_keys(config_path: &Path) -> Result<Vec<String>> {
let content = std::fs::read_to_string(config_path)?;
let config: serde_yaml::Value = serde_yaml::from_str(&content)?;
let mut keys = Vec::new();
// Handle legacy llm_providers → model_providers
let config = if config.get("llm_providers").is_some() && config.get("model_providers").is_some()
{
bail!("Please provide either llm_providers or model_providers, not both.");
} else {
config
};
// Get model_providers from listeners or root
let model_providers = config
.get("model_providers")
.or_else(|| config.get("llm_providers"));
// Check prompt_targets for authorization headers
if let Some(targets) = config.get("prompt_targets").and_then(|v| v.as_sequence()) {
for target in targets {
if let Some(headers) = target
.get("endpoint")
.and_then(|e| e.get("http_headers"))
.and_then(|h| h.as_mapping())
{
for (k, v) in headers {
if let (Some(key), Some(val)) = (k.as_str(), v.as_str()) {
if key.to_lowercase() == "authorization" {
let tokens: Vec<&str> = val.split(' ').collect();
if tokens.len() > 1 {
keys.push(tokens[1].to_string());
} else {
keys.push(val.to_string());
}
}
}
}
}
}
}
// Get listeners to find model_providers
let listeners = config.get("listeners");
let mp_list = if let Some(listeners) = listeners {
// Collect model_providers from listeners
let mut all_mp = Vec::new();
if let Some(seq) = listeners.as_sequence() {
for listener in seq {
if let Some(mps) = listener
.get("model_providers")
.and_then(|v| v.as_sequence())
{
all_mp.extend(mps.iter());
}
}
}
// Also check root model_providers
if let Some(mps) = model_providers.and_then(|v| v.as_sequence()) {
all_mp.extend(mps.iter());
}
all_mp
} else if let Some(mps) = model_providers.and_then(|v| v.as_sequence()) {
mps.iter().collect()
} else {
Vec::new()
};
for mp in &mp_list {
if let Some(key) = mp.get("access_key").and_then(|v| v.as_str()) {
keys.push(key.to_string());
}
}
// Extract env vars from state_storage_v1_responses.connection_string
if let Some(state_storage) = config.get("state_storage_v1_responses") {
if let Some(conn_str) = state_storage
.get("connection_string")
.and_then(|v| v.as_str())
{
let re = Regex::new(r"\$\{?([A-Z_][A-Z0-9_]*)\}?")?;
for cap in re.captures_iter(conn_str) {
keys.push(format!("${}", &cap[1]));
}
}
}
Ok(keys)
}
/// Check if a TCP port is already in use.
pub fn is_port_in_use(port: u16) -> bool {
std::net::TcpListener::bind(("0.0.0.0", port)).is_err()
}
/// Check if the native Plano is running by verifying the PID file.
pub fn is_native_plano_running() -> bool {
let pid_file = crate::consts::native_pid_file();
if !pid_file.exists() {
return false;
}
let content = match std::fs::read_to_string(&pid_file) {
Ok(c) => c,
Err(_) => return false,
};
let pids: serde_json::Value = match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return false,
};
let envoy_pid = pids.get("envoy_pid").and_then(|v| v.as_i64());
let brightstaff_pid = pids.get("brightstaff_pid").and_then(|v| v.as_i64());
match (envoy_pid, brightstaff_pid) {
(Some(ep), Some(bp)) => is_pid_alive(ep as i32) && is_pid_alive(bp as i32),
_ => false,
}
}
/// Check if a process is alive using kill(0).
pub fn is_pid_alive(pid: i32) -> bool {
use nix::sys::signal::kill;
use nix::unistd::Pid;
kill(Pid::from_raw(pid), None).is_ok()
}
/// Expand environment variables ($VAR and ${VAR}) in a string.
pub fn expand_env_vars(input: &str) -> String {
let re = Regex::new(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}|\$([A-Za-z_][A-Za-z0-9_]*)").unwrap();
re.replace_all(input, |caps: &regex::Captures| {
let var_name = caps
.get(1)
.or_else(|| caps.get(2))
.map(|m| m.as_str())
.unwrap_or("");
std::env::var(var_name).unwrap_or_default()
})
.into_owned()
}
/// Print the CLI header with version.
pub fn print_cli_header() {
let style = console::Style::new().bold().color256(141);
let dim = console::Style::new().dim();
println!(
"\n{} {}\n",
style.apply_to("Plano CLI"),
dim.apply_to(format!("v{}", crate::consts::PLANO_VERSION))
);
}
/// Print missing API keys error.
pub fn print_missing_keys(missing_keys: &[String]) {
let red = console::Style::new().red();
let bold = console::Style::new().bold();
let dim = console::Style::new().dim();
let cyan = console::Style::new().cyan();
println!(
"\n{} {}\n",
red.apply_to(""),
red.apply_to("Missing API keys!")
);
for key in missing_keys {
println!(" {} {}", red.apply_to(""), bold.apply_to(key));
}
println!("\n{}", dim.apply_to("Set the environment variable(s):"));
for key in missing_keys {
println!(
" {}",
cyan.apply_to(format!("export {key}=\"your-api-key\""))
);
}
println!(
"\n{}\n",
dim.apply_to("Or create a .env file in the config directory.")
);
}

View file

@ -0,0 +1,89 @@
use crate::consts::{PLANO_GITHUB_REPO, PLANO_VERSION};
/// Get the current CLI version.
pub fn get_version() -> &'static str {
PLANO_VERSION
}
/// Fetch the latest version from GitHub releases.
pub async fn get_latest_version() -> Option<String> {
let url = format!("https://api.github.com/repos/{PLANO_GITHUB_REPO}/releases/latest");
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build()
.ok()?;
let resp = client
.get(&url)
.header("User-Agent", "plano-cli")
.send()
.await
.ok()?;
let json: serde_json::Value = resp.json().await.ok()?;
let tag = json.get("tag_name")?.as_str()?;
// Strip leading 'v' if present
Some(tag.strip_prefix('v').unwrap_or(tag).to_string())
}
/// Check if current version is outdated.
pub fn check_version_status(current: &str, latest: Option<&str>) -> VersionStatus {
let Some(latest) = latest else {
return VersionStatus {
is_outdated: false,
latest: None,
};
};
let current_parts = parse_version(current);
let latest_parts = parse_version(latest);
VersionStatus {
is_outdated: latest_parts > current_parts,
latest: Some(latest.to_string()),
}
}
pub struct VersionStatus {
pub is_outdated: bool,
pub latest: Option<String>,
}
fn parse_version(v: &str) -> Vec<u32> {
v.split('.')
.filter_map(|s| {
// Handle pre-release suffixes like "1a1"
let numeric: String = s.chars().take_while(|c| c.is_ascii_digit()).collect();
numeric.parse().ok()
})
.collect()
}
/// Maybe check for updates and print a message.
pub async fn maybe_check_updates() {
if std::env::var("PLANO_SKIP_VERSION_CHECK").is_ok() {
return;
}
let current = get_version();
if let Some(latest) = get_latest_version().await {
let status = check_version_status(current, Some(&latest));
if status.is_outdated {
let yellow = console::Style::new().yellow();
let bold = console::Style::new().bold();
let dim = console::Style::new().dim();
println!(
"\n{} {}",
yellow.apply_to("⚠ Update available:"),
bold.apply_to(&latest)
);
println!(
"{}",
dim.apply_to("Run: cargo install plano-cli (or download from GitHub releases)")
);
} else {
let dim = console::Style::new().dim();
println!("{}", dim.apply_to("✓ You're up to date"));
}
}
}

View file

@ -0,0 +1,41 @@
version: v0.3.0
model_providers:
# OpenAI Models
- model: openai/gpt-5-2025-08-07
access_key: $OPENAI_API_KEY
routing_preferences:
- name: code generation
description: generating new code snippets, functions, or boilerplate based on user prompts or requirements
- model: openai/gpt-4.1-2025-04-14
access_key: $OPENAI_API_KEY
routing_preferences:
- name: code understanding
description: understand and explain existing code snippets, functions, or libraries
# Anthropic Models
- model: anthropic/claude-sonnet-4-5
default: true
access_key: $ANTHROPIC_API_KEY
- model: anthropic/claude-haiku-4-5
access_key: $ANTHROPIC_API_KEY
# Ollama Models
- model: ollama/llama3.1
base_url: http://localhost:11434
# Model aliases - friendly names that map to actual provider names
model_aliases:
# Alias for a small faster Claude model
arch.claude.code.small.fast:
target: claude-haiku-4-5
listeners:
- type: model
name: model_listener
port: 12000
tracing:
random_sampling: 100

View file

@ -0,0 +1,36 @@
version: v0.3.0
agents:
- id: assistant
url: http://localhost:10510
model_providers:
# OpenAI Models
- model: openai/gpt-5-mini-2025-08-07
access_key: $OPENAI_API_KEY
default: true
# Anthropic Models
- model: anthropic/claude-sonnet-4-20250514
access_key: $ANTHROPIC_API_KEY
listeners:
- type: agent
name: conversation_service
port: 8001
router: plano_orchestrator_v1
agents:
- id: assistant
description: |
A conversational assistant that maintains context across multi-turn
conversations. It can answer follow-up questions, remember previous
context, and provide coherent responses in ongoing dialogues.
# State storage configuration for v1/responses API
# Manages conversation state for multi-turn conversations
state_storage:
# Type: memory | postgres
type: memory
tracing:
random_sampling: 100

View file

@ -0,0 +1,50 @@
version: v0.3.0
agents:
- id: rag_agent
url: http://rag-agents:10505
filters:
- id: input_guards
url: http://rag-agents:10500
type: http
# type: mcp (default)
# transport: streamable-http (default)
# tool: input_guards (default - same as filter id)
- id: query_rewriter
url: http://rag-agents:10501
type: http
# type: mcp (default)
# transport: streamable-http (default)
# tool: query_rewriter (default - same as filter id)
- id: context_builder
url: http://rag-agents:10502
type: http
model_providers:
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY
default: true
- model: openai/gpt-4o
access_key: $OPENAI_API_KEY
model_aliases:
fast-llm:
target: gpt-4o-mini
smart-llm:
target: gpt-4o
listeners:
- type: agent
name: agent_1
port: 8001
router: plano_orchestrator_v1
agents:
- id: rag_agent
description: virtual assistant for retrieval augmented generation tasks
filter_chain:
- input_guards
- query_rewriter
- context_builder
tracing:
random_sampling: 100

View file

@ -0,0 +1,27 @@
version: v0.3.0
model_providers:
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY
default: true
- model: openai/gpt-4o
access_key: $OPENAI_API_KEY
routing_preferences:
- name: code understanding
description: understand and explain existing code snippets, functions, or libraries
- model: anthropic/claude-sonnet-4-20250514
access_key: $ANTHROPIC_API_KEY
routing_preferences:
- name: code generation
description: generating new code snippets, functions, or boilerplate based on user prompts or requirements
listeners:
- type: model
name: model_listener
port: 12000
tracing:
random_sampling: 100

View file

@ -0,0 +1,57 @@
version: v0.3.0
agents:
- id: weather_agent
url: http://langchain-weather-agent:10510
- id: flight_agent
url: http://crewai-flight-agent:10520
model_providers:
- model: openai/gpt-4o
access_key: $OPENAI_API_KEY
default: true
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY # smaller, faster, cheaper model for extracting entities like location
listeners:
- type: agent
name: travel_booking_service
port: 8001
router: plano_orchestrator_v1
agents:
- id: weather_agent
description: |
WeatherAgent is a specialized AI assistant for real-time weather information and forecasts. It provides accurate weather data for any city worldwide using the Open-Meteo API, helping travelers plan their trips with up-to-date weather conditions.
Capabilities:
* Get real-time weather conditions and multi-day forecasts for any city worldwide using Open-Meteo API (free, no API key needed)
* Provides current temperature
* Provides multi-day forecasts
* Provides weather conditions
* Provides sunrise/sunset times
* Provides detailed weather information
* Understands conversation context to resolve location references from previous messages
* Handles weather-related questions including "What's the weather in [city]?", "What's the forecast for [city]?", "How's the weather in [city]?"
* When queries include both weather and other travel questions (e.g., flights, currency), this agent answers ONLY the weather part
- id: flight_agent
description: |
FlightAgent is an AI-powered tool specialized in providing live flight information between airports. It leverages the FlightAware AeroAPI to deliver real-time flight status, gate information, and delay updates.
Capabilities:
* Get live flight information between airports using FlightAware AeroAPI
* Shows real-time flight status
* Shows scheduled/estimated/actual departure and arrival times
* Shows gate and terminal information
* Shows delays
* Shows aircraft type
* Shows flight status
* Automatically resolves city names to airport codes (IATA/ICAO)
* Understands conversation context to infer origin/destination from follow-up questions
* Handles flight-related questions including "What flights go from [city] to [city]?", "Do flights go to [city]?", "Are there direct flights from [city]?"
* When queries include both flight and other travel questions (e.g., weather, currency), this agent answers ONLY the flight part
tracing:
random_sampling: 100