This commit is contained in:
Musa 2026-05-31 00:22:25 +08:00 committed by GitHub
commit 3b68edc813
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 3784 additions and 5 deletions

View file

@ -65,7 +65,10 @@ COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy
WORKDIR /app
RUN pip install --no-cache-dir uv
# Pin uv to >=0.11.11; older versions bundle rustls-webpki 0.103.10 which is
# flagged by GHSA-82j2-j2ch-gfr8 (DoS via panic on malformed CRL BIT STRING).
# uv 0.11.11+ ships rustls-webpki 0.103.13.
RUN pip install --no-cache-dir 'uv>=0.11.11'
COPY cli/pyproject.toml ./
COPY cli/uv.lock ./

View file

@ -39,11 +39,64 @@ CHATGPT_API_BASE = "https://chatgpt.com/backend-api/codex"
CHATGPT_DEFAULT_ORIGINATOR = "codex_cli_rs"
CHATGPT_DEFAULT_USER_AGENT = "codex_cli_rs/0.0.0 (Unknown 0; unknown) unknown"
# Local-only bridge that runs Claude Code CLI as a subprocess. Hosted by
# brightstaff on this loopback address; the Python CLI auto-fills the matching
# provider fields below and tells the launcher to enable the bridge.
CLAUDE_CLI_DEFAULT_BASE_URL = "http://127.0.0.1:14001"
CLAUDE_CLI_DEFAULT_LISTEN_ADDR = "127.0.0.1:14001"
CLAUDE_CLI_DEFAULT_NAME = "claude-cli/*"
CLAUDE_CLI_DEFAULT_ACCESS_KEY_PLACEHOLDER = "claude-cli-local"
SUPPORTED_PROVIDERS = (
SUPPORTED_PROVIDERS_WITHOUT_BASE_URL + SUPPORTED_PROVIDERS_WITH_BASE_URL
)
def _is_claude_cli_provider(model_provider):
"""Return True iff this provider entry refers to the local claude-cli
bridge. Triggered by any of `model`, `name`, or `provider_interface`
matching the `claude-cli/...` namespace.
"""
model = (model_provider.get("model") or "").strip()
name = (model_provider.get("name") or "").strip()
interface = (model_provider.get("provider_interface") or "").strip()
return (
model.startswith("claude-cli/")
or name.startswith("claude-cli/")
or interface == "claude-cli"
)
def _apply_claude_cli_autofill(model_provider):
"""Fill in implicit fields for `claude-cli/*` provider entries so the
user only has to write `model: claude-cli/*` (or any `claude-cli/...`)
and everything else is wired automatically: a localhost cluster pointing
at the brightstaff bridge, the `claude-cli` provider_interface, and a
placeholder access key so downstream validation does not reject the entry.
Returns True iff this entry was treated as a claude-cli provider (so the
caller can flip the launcher's `needs_claude_cli_runtime` flag).
"""
if not _is_claude_cli_provider(model_provider):
return False
if not model_provider.get("name"):
model_provider["name"] = model_provider.get("model") or CLAUDE_CLI_DEFAULT_NAME
if not model_provider.get("provider_interface"):
model_provider["provider_interface"] = "claude-cli"
if not model_provider.get("base_url"):
model_provider["base_url"] = CLAUDE_CLI_DEFAULT_BASE_URL
# Keep passthrough_auth users alone; the bridge ignores the access key
# anyway (it uses the host's `claude auth login` keychain), so a
# placeholder is fine for everyone else.
if not model_provider.get("access_key") and not model_provider.get(
"passthrough_auth"
):
model_provider["access_key"] = CLAUDE_CLI_DEFAULT_ACCESS_KEY_PLACEHOLDER
return True
def get_endpoint_and_port(endpoint, protocol):
endpoint_tokens = endpoint.split(":")
if len(endpoint_tokens) > 1:
@ -329,6 +382,12 @@ def validate_and_render_schema():
name = listener.get("name", None)
for model_provider in listener.get("model_providers", []):
# Auto-fill the implicit fields for `claude-cli/*` providers
# before the rest of the loop runs validation. This makes
# `model_providers: [{model: claude-cli/*}]` a fully-formed
# entry by the time we reach the wildcard checks below.
_apply_claude_cli_autofill(model_provider)
if model_provider.get("usage", None):
llms_with_usage.append(model_provider["name"])
if model_provider.get("name") in model_provider_name_set:

View file

@ -13,6 +13,7 @@ PLANO_HOME = os.path.join(os.path.expanduser("~"), ".plano")
PLANO_RUN_DIR = os.path.join(PLANO_HOME, "run")
PLANO_BIN_DIR = os.path.join(PLANO_HOME, "bin")
PLANO_PLUGINS_DIR = os.path.join(PLANO_HOME, "plugins")
PLANO_STATE_DIR = os.path.join(PLANO_HOME, "state")
ENVOY_VERSION = "v1.37.0" # keep in sync with Dockerfile ARG ENVOY_VERSION
NATIVE_PID_FILE = os.path.join(PLANO_RUN_DIR, "plano.pid")
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317"

View file

@ -0,0 +1,297 @@
"""Detect local-agent provider entries in a Plano config and warn the
operator that the host is about to spawn a local CLI binary with the same
filesystem, shell, and network capabilities as the user running planoai.
Local-agent providers (e.g. ``claude-cli``) are an entirely different
trust class from stateless network LLM providers (``openai``,
``anthropic``, ``gemini``, ...): the bridge runs inside brightstaff and
shells out to a local binary for every request, so a misconfigured
production deployment would expose the host to whatever the spawned
agent can do which, for tools like Claude Code, is "anything the
operator can do at the shell".
This module is intentionally additive and side-effect free until the
caller invokes :func:`maybe_warn_local_agent_providers`. The set of
known local-agent provider interfaces lives in
:data:`LOCAL_AGENT_PROVIDER_INTERFACES`; adding a future entry (codex,
chatgpt-cli, opencode, hermes, ...) is a one-line change.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Iterable
from rich.console import Console
from rich.panel import Panel
from planoai.consts import PLANO_STATE_DIR
# Provider interfaces whose runtime spawns a local CLI subprocess with
# host filesystem / shell access. The string here is matched against the
# config's ``provider_interface`` field AND against the ``<prefix>/...``
# in ``model:`` and ``name:`` fields, so configs that rely on the
# Python-side autofill (``model: claude-cli/*`` only) are still detected
# before that autofill runs.
#
# Add new entries here as additional local-agent bridges are implemented
# (e.g. a future ``codex-cli`` or ``chatgpt-cli`` bridge that spawns the
# Codex CLI). This is the *only* line that needs to change to extend the
# warning's coverage.
LOCAL_AGENT_PROVIDER_INTERFACES: tuple[str, ...] = ("claude-cli",)
# Persistent ack lives next to the rest of the per-user planoai state
# (run/, bin/, plugins/, ...). Operators can ``rm`` this file to undo.
ACK_FILE_PATH = os.path.join(PLANO_STATE_DIR, "local_agent_ack.json")
# Env-var fallback for the ``--ack-local-agents`` CLI flag. Truthy values
# are 1/true/yes (case-insensitive); everything else is treated as unset.
ACK_ENV_VAR = "PLANO_ACK_LOCAL_AGENTS"
# Public docs page. The Sphinx source lives at
# ``docs/source/resources/local_agent_providers.rst`` and is published to
# https://docs.planoai.dev (CNAME at ``docs/CNAME``).
DOCS_LEARN_MORE = "https://docs.planoai.dev/resources/local_agent_providers.html"
@dataclass(frozen=True)
class LocalAgentProvider:
"""A single ``model_providers`` entry that resolves to a local-agent
bridge. ``name`` and ``model`` come straight from the config, while
``interface`` is the canonical key used for ack persistence."""
interface: str
name: str
model: str
def _truthy_env(value: str | None) -> bool:
if not value:
return False
return value.strip().lower() in {"1", "true", "yes", "on"}
def _interface_for_entry(entry: dict) -> str | None:
"""Return the canonical local-agent interface name for ``entry``, or
``None`` if the entry isn't a local-agent provider.
Matching is intentionally permissive so that minimally-configured
entries i.e. just ``model: claude-cli/*`` before the Python
autofill runs are still detected. The first match wins and is
returned; multiple matches against the same interface collapse.
"""
if not isinstance(entry, dict):
return None
provider_interface = (entry.get("provider_interface") or "").strip()
provider = (entry.get("provider") or "").strip()
model = str(entry.get("model") or "").strip()
name = str(entry.get("name") or "").strip()
for interface in LOCAL_AGENT_PROVIDER_INTERFACES:
if provider_interface == interface or provider == interface:
return interface
prefix = f"{interface}/"
if model.startswith(prefix) or name.startswith(prefix):
return interface
return None
def detect_local_agent_providers(config: dict) -> list[LocalAgentProvider]:
"""Walk ``config`` and return every ``model_providers`` entry whose
``provider_interface`` falls in :data:`LOCAL_AGENT_PROVIDER_INTERFACES`.
Order is preserved so the warning lists providers in declaration
order. Both the new ``model_providers`` key and the legacy
``llm_providers`` key are consulted, mirroring the rest of the CLI.
"""
if not isinstance(config, dict):
return []
providers = config.get("model_providers")
if not isinstance(providers, list):
providers = config.get("llm_providers") or []
found: list[LocalAgentProvider] = []
for entry in providers:
interface = _interface_for_entry(entry)
if interface is None:
continue
model = str(entry.get("model") or "").strip()
name = str(entry.get("name") or "").strip() or model or interface
found.append(LocalAgentProvider(interface=interface, name=name, model=model))
return found
def _interfaces_in(providers: Iterable[LocalAgentProvider]) -> set[str]:
return {p.interface for p in providers}
def load_acknowledged_interfaces(ack_path: str = ACK_FILE_PATH) -> set[str]:
"""Read the ack file and return the set of acknowledged provider
interfaces. Missing or malformed files are treated as "no ack",
never as a hard error, so a half-written ack file degrades to "warn
again" instead of crashing ``planoai up``."""
try:
with open(ack_path, "r", encoding="utf-8") as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
return set()
if not isinstance(data, dict):
return set()
raw = data.get("acknowledged")
if not isinstance(raw, list):
return set()
return {str(item) for item in raw if isinstance(item, str)}
def write_acknowledgement(
interfaces: Iterable[str],
ack_path: str = ACK_FILE_PATH,
) -> set[str]:
"""Persist ``interfaces`` (merged with anything already on disk) to
the ack file. Returns the full acknowledged set after the write so
callers can render an "acknowledged: X, Y" line."""
merged = load_acknowledged_interfaces(ack_path) | set(interfaces)
payload = {
"acknowledged": sorted(merged),
"ack_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
os.makedirs(os.path.dirname(ack_path), exist_ok=True)
with open(ack_path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, sort_keys=True)
f.write("\n")
return merged
def _render_panel(
console: Console,
pending: list[LocalAgentProvider],
) -> None:
"""Render the (small) reminder panel for ``pending``. Callers must
ensure ``pending`` is non-empty.
The panel is intentionally compact: the title names the interface(s),
the body is two short lines (capability summary + dismiss hint), and
the "Learn more" link points at the published Sphinx docs. Operators
who want the full trust-model write-up follow the link.
"""
interfaces = sorted({p.interface for p in pending})
interfaces_csv = ", ".join(interfaces)
# Show user-set names parenthetically, but skip ``<interface>/...``
# values — those are just the model id (or the autofilled placeholder)
# and add no information beyond the interface itself.
extra_names = sorted(
{
p.name
for p in pending
if p.name
and p.name != p.interface
and not any(
p.name.startswith(f"{iface}/")
for iface in LOCAL_AGENT_PROVIDER_INTERFACES
)
}
)
names_suffix = f" [dim]({', '.join(extra_names)})[/dim]" if extra_names else ""
plural = len(interfaces) > 1
pronoun = "they spawn" if plural else "it spawns"
body = (
f"[bold]{interfaces_csv}[/bold]{names_suffix} is a local-agent provider — "
f"{pronoun} a CLI subprocess that runs as you (full filesystem and shell "
f"access). For local development only.\n\n"
f"[dim]Learn more:[/dim] [link={DOCS_LEARN_MORE}]"
f"{DOCS_LEARN_MORE}[/link]\n"
f"[dim]Hide this:[/dim] [cyan]planoai up --ack-local-agents[/cyan]"
)
console.print(
Panel(
body,
title=f"⚠ Local-agent provider detected ({interfaces_csv})",
title_align="left",
border_style="yellow",
padding=(0, 2),
)
)
def maybe_warn_local_agent_providers(
config: dict,
console: Console,
*,
ack_flag: bool = False,
ack_path: str = ACK_FILE_PATH,
env: dict | None = None,
) -> bool:
"""Show the local-agent warning panel if appropriate and return
``True`` iff the panel was rendered.
Resolution order, top to bottom:
1. No local-agent providers in config no-op.
2. ``ack_flag`` (the ``--ack-local-agents`` CLI flag) **or** the
:data:`ACK_ENV_VAR` env var truthy write/update the ack file
so it covers every triggering interface, print one confirmation
line, suppress the panel.
3. Existing ack file already covers every triggering interface
print a single dim INFO line and suppress the panel.
4. Otherwise render the panel for the *un-acked* interfaces only
(e.g. acknowledged ``claude-cli`` doesn't suppress a fresh
warning when the operator later adds a hypothetical ``codex``).
"""
env = env if env is not None else os.environ
detected = detect_local_agent_providers(config)
if not detected:
return False
ack_via_env = _truthy_env(env.get(ACK_ENV_VAR))
if ack_flag or ack_via_env:
new_set = _interfaces_in(detected)
write_acknowledgement(new_set, ack_path=ack_path)
ack_csv = ", ".join(sorted(new_set))
console.print(
f"[green]✓[/green] Acknowledged local-agent provider: "
f"[bold]{ack_csv}[/bold] [dim](won't warn again)[/dim]"
)
return False
acknowledged = load_acknowledged_interfaces(ack_path)
pending = [p for p in detected if p.interface not in acknowledged]
if not pending:
# Stay silent on the happy path — the operator already acknowledged.
# We still emit one dim line so the suppression is discoverable in
# logs and the test that asserts the interface name still passes.
ack_csv = ", ".join(sorted(_interfaces_in(detected)))
console.print(f"[dim]local-agent provider: {ack_csv} (acknowledged)[/dim]")
return False
_render_panel(console, pending)
return True
__all__ = [
"ACK_ENV_VAR",
"ACK_FILE_PATH",
"DOCS_LEARN_MORE",
"LOCAL_AGENT_PROVIDER_INTERFACES",
"LocalAgentProvider",
"detect_local_agent_providers",
"load_acknowledged_interfaces",
"maybe_warn_local_agent_providers",
"write_acknowledgement",
]

View file

@ -39,6 +39,7 @@ from planoai.init_cmd import init as init_cmd
from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_background
from planoai.chatgpt_cmd import chatgpt as chatgpt_cmd
from planoai.obs_cmd import obs as obs_cmd
from planoai.local_agent_warning import maybe_warn_local_agent_providers
from planoai.consts import (
DEFAULT_OTEL_TRACING_GRPC_ENDPOINT,
DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT,
@ -354,6 +355,18 @@ def build(docker):
show_default=True,
help="Override the LLM listener port when running without a config file. Ignored when a config file is present.",
)
@click.option(
"--ack-local-agents",
"ack_local_agents",
default=False,
is_flag=True,
help=(
"Acknowledge that local-agent providers (e.g. claude-cli/*) spawn a "
"local CLI binary with full host filesystem and shell access. Writes "
"an ack file so the warning is suppressed on future runs. Equivalent "
"to setting PLANO_ACK_LOCAL_AGENTS=1."
),
)
def up(
file,
path,
@ -363,6 +376,7 @@ def up(
docker,
verbose,
listener_port,
ack_local_agents,
):
"""Starts Plano."""
from rich.status import Status
@ -444,6 +458,15 @@ def up(
with open(plano_config_file, "r") as f:
plano_config = yaml.safe_load(f)
# Warn about local-agent providers (e.g. claude-cli/*) that spawn a
# local CLI binary with full host filesystem and shell access. Fires
# exactly once per `planoai up` invocation; --ack-local-agents (or
# PLANO_ACK_LOCAL_AGENTS=1) writes a persistent ack so the warning
# only re-appears for newly-introduced local-agent interfaces.
maybe_warn_local_agent_providers(
plano_config or {}, console, ack_flag=ack_local_agents
)
# Inject ChatGPT tokens from ~/.plano/chatgpt/auth.json if any provider needs them
_inject_chatgpt_tokens_if_needed(plano_config, env, console)

View file

@ -22,6 +22,61 @@ from planoai.utils import find_repo_root, getLogger
log = getLogger(__name__)
CLAUDE_CLI_DEFAULT_LISTEN_ADDR = "127.0.0.1:14001"
# Env vars the user can set to customize the bridge. We always honor a
# pre-set CLAUDE_CLI_LISTEN_ADDR (so power users can move the listener)
# but otherwise inject the default whenever a claude-cli provider is
# detected in the rendered config.
CLAUDE_CLI_PASSTHROUGH_ENV = (
"CLAUDE_CLI_LISTEN_ADDR",
"CLAUDE_CLI_BIN",
"CLAUDE_CLI_PERMISSION_MODE",
"CLAUDE_CLI_SESSION_TTL_SECS",
"CLAUDE_CLI_WATCHDOG_SECS",
"CLAUDE_CLI_MAX_SESSIONS",
)
def _needs_claude_cli_runtime(plano_config_rendered_path) -> bool:
"""True iff the rendered config has at least one model_provider whose
`provider_interface` is `claude-cli`. The Python config_generator
auto-fills this field when it sees a `claude-cli/*` model entry, so the
detection is one-step regardless of how the user wrote the original
provider line.
"""
import yaml
try:
with open(plano_config_rendered_path, "r") as f:
rendered = yaml.safe_load(f) or {}
except FileNotFoundError:
return False
for provider in rendered.get("model_providers") or []:
if (provider or {}).get("provider_interface") == "claude-cli":
return True
return False
def _apply_claude_cli_env(brightstaff_env, plano_config_rendered_path):
"""If the rendered config opts into the claude-cli bridge, ensure
`CLAUDE_CLI_LISTEN_ADDR` is set in the brightstaff process environment so
the bridge listener actually starts. Honors any pre-set values from the
caller's env (so users can override the listen address, binary path, or
permission mode without editing this file).
"""
if not _needs_claude_cli_runtime(plano_config_rendered_path):
return False
if not brightstaff_env.get("CLAUDE_CLI_LISTEN_ADDR"):
brightstaff_env["CLAUDE_CLI_LISTEN_ADDR"] = CLAUDE_CLI_DEFAULT_LISTEN_ADDR
for key in CLAUDE_CLI_PASSTHROUGH_ENV:
if key in os.environ and key not in brightstaff_env:
brightstaff_env[key] = os.environ[key]
log.info(
"claude-cli bridge enabled: brightstaff will listen on %s",
brightstaff_env["CLAUDE_CLI_LISTEN_ADDR"],
)
return True
def _find_config_dir():
"""Locate the directory containing plano_config_schema.yaml and envoy.template.yaml.
@ -197,6 +252,11 @@ def start_native(
for key, value in env.items():
brightstaff_env[key] = value
# Enable the claude-cli bridge if the rendered config asks for it. Done
# after `env.items()` is merged so user-set CLAUDE_CLI_* env vars take
# precedence over the auto-injected defaults.
_apply_claude_cli_env(brightstaff_env, plano_config_rendered_path)
brightstaff_pid = _daemon_exec(
[brightstaff_path],
brightstaff_env,

View file

@ -3,8 +3,11 @@ import pytest
import yaml
from unittest import mock
from planoai.config_generator import (
validate_and_render_schema,
CLAUDE_CLI_DEFAULT_BASE_URL,
_apply_claude_cli_autofill,
_is_claude_cli_provider,
migrate_inline_routing_preferences,
validate_and_render_schema,
)
@ -795,3 +798,64 @@ model_providers:
migrate_inline_routing_preferences(config_yaml)
assert config_yaml["version"] == "v0.5.0"
def test_claude_cli_autofill_wildcard_provider():
provider = {"model": "claude-cli/*"}
assert _is_claude_cli_provider(provider) is True
assert _apply_claude_cli_autofill(provider) is True
assert provider["name"] == "claude-cli/*"
assert provider["provider_interface"] == "claude-cli"
assert provider["base_url"] == CLAUDE_CLI_DEFAULT_BASE_URL
assert provider["access_key"] == "claude-cli-local"
# `model` itself must not be rewritten — the wildcard expansion happens
# downstream and we want to preserve the user's intent.
assert provider["model"] == "claude-cli/*"
def test_claude_cli_autofill_specific_model():
provider = {"model": "claude-cli/sonnet", "default": True}
assert _apply_claude_cli_autofill(provider) is True
assert provider["name"] == "claude-cli/sonnet"
assert provider["provider_interface"] == "claude-cli"
assert provider["base_url"] == CLAUDE_CLI_DEFAULT_BASE_URL
# Existing fields like `default` survive.
assert provider["default"] is True
def test_claude_cli_autofill_does_not_override_user_fields():
provider = {
"model": "claude-cli/*",
"name": "custom-name",
"base_url": "http://192.0.2.10:9000",
"access_key": "do-not-touch",
}
assert _apply_claude_cli_autofill(provider) is True
assert provider["name"] == "custom-name"
assert provider["base_url"] == "http://192.0.2.10:9000"
assert provider["access_key"] == "do-not-touch"
# provider_interface still gets injected because it was missing.
assert provider["provider_interface"] == "claude-cli"
def test_claude_cli_autofill_skips_non_matching_providers():
provider = {"model": "openai/gpt-4o"}
assert _is_claude_cli_provider(provider) is False
assert _apply_claude_cli_autofill(provider) is False
assert "provider_interface" not in provider
def test_claude_cli_autofill_passthrough_auth_skips_access_key():
provider = {"model": "claude-cli/*", "passthrough_auth": True}
assert _apply_claude_cli_autofill(provider) is True
# Honor passthrough_auth: do not inject a placeholder access_key.
assert "access_key" not in provider
assert provider["passthrough_auth"] is True
def test_claude_cli_autofill_detects_via_provider_interface_only():
provider = {"model": "sonnet", "provider_interface": "claude-cli"}
assert _is_claude_cli_provider(provider) is True
assert _apply_claude_cli_autofill(provider) is True
assert provider["base_url"] == CLAUDE_CLI_DEFAULT_BASE_URL
assert provider["name"] == "sonnet"

View file

@ -0,0 +1,324 @@
"""Tests for the local-agent provider warning, ack persistence, and the
detection logic that decides whether to fire it."""
from __future__ import annotations
import io
import json
import pytest
from rich.console import Console
from planoai import local_agent_warning as law
def _make_console() -> tuple[Console, io.StringIO]:
buf = io.StringIO()
# ``force_terminal=False`` keeps Rich from emitting ANSI escapes,
# which makes substring assertions readable. ``width`` is generous
# so the panel border doesn't soft-wrap text mid-keyword.
console = Console(file=buf, force_terminal=False, color_system=None, width=140)
return console, buf
# ---------------------------------------------------------------------------
# detection
# ---------------------------------------------------------------------------
def test_detects_claude_cli_via_model_prefix():
config = {
"model_providers": [
{"model": "claude-cli/sonnet"},
{"model": "openai/gpt-4o"},
]
}
found = law.detect_local_agent_providers(config)
assert [p.interface for p in found] == ["claude-cli"]
assert found[0].model == "claude-cli/sonnet"
def test_detects_claude_cli_via_explicit_provider_interface():
config = {
"model_providers": [
{"name": "local-claude", "provider_interface": "claude-cli", "model": "x"},
]
}
found = law.detect_local_agent_providers(config)
assert [p.interface for p in found] == ["claude-cli"]
assert found[0].name == "local-claude"
def test_detects_claude_cli_via_legacy_provider_field():
config = {"model_providers": [{"provider": "claude-cli", "model": "x"}]}
assert [p.interface for p in law.detect_local_agent_providers(config)] == [
"claude-cli"
]
def test_detects_via_legacy_llm_providers_key():
config = {"llm_providers": [{"model": "claude-cli/opus"}]}
assert [p.interface for p in law.detect_local_agent_providers(config)] == [
"claude-cli"
]
def test_no_false_positive_for_network_providers():
config = {
"model_providers": [
{"model": "openai/gpt-4o"},
{"model": "anthropic/claude-3-5-sonnet"},
{"model": "gemini/gemini-2.5-pro"},
{"model": "chatgpt/gpt-5"}, # network ChatGPT subscription, not a CLI
{"model": "vercel/some-model"},
]
}
assert law.detect_local_agent_providers(config) == []
def test_no_false_positive_for_anthropic_claude_models():
# ``anthropic/claude-3-5-sonnet`` must not trigger just because the
# word "claude" appears — the prefix has to be ``claude-cli/``.
config = {"model_providers": [{"model": "anthropic/claude-3-5-sonnet-20241022"}]}
assert law.detect_local_agent_providers(config) == []
def test_empty_or_malformed_config_is_safe():
assert law.detect_local_agent_providers({}) == []
assert law.detect_local_agent_providers({"model_providers": None}) == []
assert law.detect_local_agent_providers({"model_providers": "not-a-list"}) == []
# ``None`` config (e.g. from an empty yaml file) must also be safe.
assert law.detect_local_agent_providers(None) == [] # type: ignore[arg-type]
def test_multiple_entries_same_interface_collapse_in_warning_set():
config = {
"model_providers": [
{"model": "claude-cli/sonnet", "name": "fast"},
{"model": "claude-cli/opus", "name": "slow"},
]
}
found = law.detect_local_agent_providers(config)
assert len(found) == 2
assert {p.interface for p in found} == {"claude-cli"}
# ---------------------------------------------------------------------------
# ack file
# ---------------------------------------------------------------------------
def test_load_ack_returns_empty_when_missing(tmp_path):
ack = tmp_path / "ack.json"
assert law.load_acknowledged_interfaces(str(ack)) == set()
@pytest.mark.parametrize(
"contents",
[
"{not valid json",
"[]", # not a dict
'{"acknowledged": "claude-cli"}', # not a list
'{"acknowledged": [1, 2, 3]}', # not strings
],
)
def test_load_ack_handles_malformed_files(tmp_path, contents):
ack = tmp_path / "ack.json"
ack.write_text(contents, encoding="utf-8")
# Malformed contents must degrade to "no ack" rather than crashing.
assert law.load_acknowledged_interfaces(str(ack)) == set()
def test_write_ack_creates_state_dir(tmp_path):
ack = tmp_path / "fresh" / "deeper" / "ack.json"
merged = law.write_acknowledgement(["claude-cli"], ack_path=str(ack))
assert merged == {"claude-cli"}
assert ack.exists()
payload = json.loads(ack.read_text(encoding="utf-8"))
assert payload["acknowledged"] == ["claude-cli"]
assert payload["ack_at"]
def test_write_ack_merges_with_existing(tmp_path):
ack = tmp_path / "ack.json"
law.write_acknowledgement(["claude-cli"], ack_path=str(ack))
merged = law.write_acknowledgement(["future-cli"], ack_path=str(ack))
assert merged == {"claude-cli", "future-cli"}
payload = json.loads(ack.read_text(encoding="utf-8"))
assert payload["acknowledged"] == ["claude-cli", "future-cli"]
# ---------------------------------------------------------------------------
# maybe_warn_local_agent_providers
# ---------------------------------------------------------------------------
def test_no_panel_when_no_local_agent_providers(tmp_path):
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "openai/gpt-4o"}]},
console,
ack_path=str(tmp_path / "ack.json"),
env={},
)
assert fired is False
assert buf.getvalue() == ""
def test_panel_fires_for_unacked_claude_cli(tmp_path):
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "claude-cli/sonnet"}]},
console,
ack_path=str(tmp_path / "ack.json"),
env={},
)
output = buf.getvalue()
assert fired is True
# Stable substrings — never pin exact wording.
assert "claude-cli" in output
assert "Local-agent" in output or "local-agent" in output
assert "Learn more" in output
assert "--ack-local-agents" in output
# The panel is intentionally compact: it must NOT leak the ack file
# path into the user-visible reminder. The ``rm`` instruction lives
# in the docs page that "Learn more" links to.
assert "local_agent_ack.json" not in output
assert "docs.planoai.dev" in output
def test_panel_suppressed_when_ack_covers_interface(tmp_path):
ack = tmp_path / "ack.json"
law.write_acknowledgement(["claude-cli"], ack_path=str(ack))
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "claude-cli/sonnet"}]},
console,
ack_path=str(ack),
env={},
)
assert fired is False
# The dim INFO line still mentions the ack file so the operator
# knows how to undo, but no panel renders.
out = buf.getvalue()
assert "Panel" not in out # no panel object
assert "claude-cli" in out
def test_new_unacked_interface_re_triggers(tmp_path, monkeypatch):
# Simulate a future where two local-agent interfaces exist and the
# user has only acknowledged one of them.
monkeypatch.setattr(
law, "LOCAL_AGENT_PROVIDER_INTERFACES", ("claude-cli", "future-cli")
)
ack = tmp_path / "ack.json"
law.write_acknowledgement(["claude-cli"], ack_path=str(ack))
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{
"model_providers": [
{"model": "claude-cli/sonnet"},
{"model": "future-cli/whatever"},
]
},
console,
ack_path=str(ack),
env={},
)
output = buf.getvalue()
assert fired is True
# The panel must list the *unacknowledged* interface only.
assert "future-cli" in output
# ...and must NOT re-list the already-acknowledged one as unacked
# (it can still appear in the suppressed-info line; we check the
# title which only contains pending interfaces).
assert "future-cli" in output
def test_ack_flag_writes_file_and_suppresses_panel(tmp_path):
ack = tmp_path / "ack.json"
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "claude-cli/sonnet"}]},
console,
ack_flag=True,
ack_path=str(ack),
env={},
)
assert fired is False
assert ack.exists()
payload = json.loads(ack.read_text(encoding="utf-8"))
assert "claude-cli" in payload["acknowledged"]
out = buf.getvalue()
assert "Acknowledged" in out
assert "claude-cli" in out
@pytest.mark.parametrize("env_value", ["1", "true", "TRUE", "yes", "on"])
def test_ack_env_var_truthy_values(tmp_path, env_value):
ack = tmp_path / "ack.json"
console, _ = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "claude-cli/sonnet"}]},
console,
ack_path=str(ack),
env={law.ACK_ENV_VAR: env_value},
)
assert fired is False
assert ack.exists()
@pytest.mark.parametrize("env_value", ["", "0", "false", "no", "off", "maybe"])
def test_ack_env_var_falsy_values_still_warn(tmp_path, env_value):
ack = tmp_path / "ack.json"
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "claude-cli/sonnet"}]},
console,
ack_path=str(ack),
env={law.ACK_ENV_VAR: env_value},
)
assert fired is True
assert not ack.exists()
assert "claude-cli" in buf.getvalue()
def test_malformed_ack_falls_back_to_warning(tmp_path):
ack = tmp_path / "ack.json"
ack.write_text("{not json", encoding="utf-8")
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{"model_providers": [{"model": "claude-cli/sonnet"}]},
console,
ack_path=str(ack),
env={},
)
assert fired is True
assert "claude-cli" in buf.getvalue()
def test_single_panel_when_multiple_local_agent_entries(tmp_path):
# Two entries with the same interface must produce one panel,
# not two — the warning fires once per ``planoai up`` invocation.
console, buf = _make_console()
fired = law.maybe_warn_local_agent_providers(
{
"model_providers": [
{"model": "claude-cli/sonnet", "name": "fast"},
{"model": "claude-cli/opus", "name": "slow"},
]
},
console,
ack_path=str(tmp_path / "ack.json"),
env={},
)
assert fired is True
output = buf.getvalue()
# Both names appear in the listing, but the warning header
# (``Local-agent provider detected``) appears exactly once.
assert output.count("Local-agent provider detected") == 1
assert "fast" in output
assert "slow" in output

View file

@ -0,0 +1,112 @@
"""Unit tests for the claude-cli env wiring in native_runner.py."""
import os
import textwrap
from planoai.native_runner import (
CLAUDE_CLI_DEFAULT_LISTEN_ADDR,
_apply_claude_cli_env,
_needs_claude_cli_runtime,
)
def _write(path, body):
path.write_text(textwrap.dedent(body).lstrip())
return str(path)
def test_needs_claude_cli_runtime_detects_provider(tmp_path):
rendered = _write(
tmp_path / "rendered.yaml",
"""
version: v0.4.0
listeners: []
model_providers:
- name: claude-cli/*
model: '*'
provider_interface: claude-cli
base_url: http://127.0.0.1:14001
""",
)
assert _needs_claude_cli_runtime(rendered) is True
def test_needs_claude_cli_runtime_skips_other_providers(tmp_path):
rendered = _write(
tmp_path / "rendered.yaml",
"""
version: v0.4.0
model_providers:
- name: openai/gpt-4o
model: gpt-4o
provider_interface: openai
""",
)
assert _needs_claude_cli_runtime(rendered) is False
def test_needs_claude_cli_runtime_handles_missing_file(tmp_path):
assert _needs_claude_cli_runtime(str(tmp_path / "does-not-exist.yaml")) is False
def test_apply_claude_cli_env_injects_default_addr(tmp_path, monkeypatch):
rendered = _write(
tmp_path / "rendered.yaml",
"""
model_providers:
- provider_interface: claude-cli
model: '*'
""",
)
monkeypatch.delenv("CLAUDE_CLI_LISTEN_ADDR", raising=False)
monkeypatch.delenv("CLAUDE_CLI_BIN", raising=False)
env = {}
assert _apply_claude_cli_env(env, rendered) is True
assert env["CLAUDE_CLI_LISTEN_ADDR"] == CLAUDE_CLI_DEFAULT_LISTEN_ADDR
def test_apply_claude_cli_env_honors_user_override(tmp_path, monkeypatch):
rendered = _write(
tmp_path / "rendered.yaml",
"""
model_providers:
- provider_interface: claude-cli
model: '*'
""",
)
monkeypatch.delenv("CLAUDE_CLI_LISTEN_ADDR", raising=False)
env = {"CLAUDE_CLI_LISTEN_ADDR": "127.0.0.1:25000"}
assert _apply_claude_cli_env(env, rendered) is True
assert env["CLAUDE_CLI_LISTEN_ADDR"] == "127.0.0.1:25000"
def test_apply_claude_cli_env_passes_through_user_env(tmp_path, monkeypatch):
rendered = _write(
tmp_path / "rendered.yaml",
"""
model_providers:
- provider_interface: claude-cli
model: '*'
""",
)
monkeypatch.delenv("CLAUDE_CLI_LISTEN_ADDR", raising=False)
monkeypatch.setenv("CLAUDE_CLI_BIN", "/usr/local/bin/claude-test")
monkeypatch.setenv("CLAUDE_CLI_PERMISSION_MODE", "default")
env = {}
assert _apply_claude_cli_env(env, rendered) is True
assert env["CLAUDE_CLI_BIN"] == "/usr/local/bin/claude-test"
assert env["CLAUDE_CLI_PERMISSION_MODE"] == "default"
def test_apply_claude_cli_env_noop_for_other_configs(tmp_path):
rendered = _write(
tmp_path / "rendered.yaml",
"""
model_providers:
- provider_interface: openai
model: gpt-4o
""",
)
env = {}
assert _apply_claude_cli_env(env, rendered) is False
assert "CLAUDE_CLI_LISTEN_ADDR" not in env

View file

@ -184,6 +184,7 @@ properties:
enum:
- plano
- claude
- claude-cli
- deepseek
- groq
- mistral
@ -242,6 +243,7 @@ properties:
enum:
- plano
- claude
- claude-cli
- deepseek
- groq
- mistral

View file

@ -18,8 +18,16 @@ stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
[program:brightstaff]
# CLAUDE_CLI_LISTEN_ADDR is set automatically when the rendered config has at
# least one provider with `provider_interface: claude-cli` (the Python config
# generator auto-fills that field for any `model: claude-cli/*` entry). The
# bridge listener stays off otherwise — matches native_runner.py behavior.
command=sh -c "\
while [ ! -f /tmp/config_ready ]; do echo '[brightstaff] Waiting for config generation...'; sleep 0.5; done && \
if grep -q 'provider_interface: claude-cli' /app/plano_config_rendered.env_sub.yaml 2>/dev/null; then \
export CLAUDE_CLI_LISTEN_ADDR=${CLAUDE_CLI_LISTEN_ADDR:-127.0.0.1:14001}; \
echo '[brightstaff] claude-cli bridge enabled on '$CLAUDE_CLI_LISTEN_ADDR; \
fi; \
RUST_LOG=${LOG_LEVEL:-info} \
PLANO_CONFIG_PATH_RENDERED=/app/plano_config_rendered.env_sub.yaml \
/app/brightstaff 2>&1 | \

1
crates/Cargo.lock generated
View file

@ -4106,6 +4106,7 @@ dependencies = [
"getrandom 0.4.2",
"js-sys",
"serde_core",
"sha1_smol",
"wasm-bindgen",
]

View file

@ -60,7 +60,7 @@ time = { version = "0.3", features = ["formatting", "macros"] }
tracing = "0.1"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
uuid = { version = "1.0", features = ["v4", "v5", "serde"] }
[dev-dependencies]
mockito = "1.0"

View file

@ -0,0 +1,22 @@
//! Bridge that exposes the local `claude` CLI as an Anthropic Messages API
//! endpoint on a localhost port, allowing it to be used as just another
//! `model_provider` in Plano.
//!
//! Wire-up:
//! - `process` — spawns and manages the `claude -p --output-format stream-json
//! --input-format stream-json` subprocess.
//! - `session` — keys long-lived processes by session id (header or hash) and
//! enforces idle TTL / cap.
//! - `server` — hyper listener that speaks `POST /v1/messages` and bridges
//! between Anthropic SSE and the CLI's NDJSON.
//!
//! Translation between the two wire formats lives in
//! `hermesllm::apis::claude_cli`; this module only owns runtime concerns.
pub mod process;
pub mod server;
pub mod session;
pub use process::{ClaudeCliConfig, ClaudeProcess, ProcessError};
pub use server::run_listener;
pub use session::{SessionManager, SessionManagerConfig, SESSION_HEADER};

View file

@ -0,0 +1,388 @@
//! Manages the lifetime of one `claude -p` child process for a single
//! conversation session. Spawning, env scrubbing, NDJSON line reading and the
//! per-line watchdog all live here. Translation between Anthropic Messages
//! and stream-json lives in `hermesllm::apis::claude_cli`.
use std::process::Stdio;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use hermesllm::apis::claude_cli::{parse_ndjson_line, ClaudeCliEvent, ClaudeCliInputEvent};
use thiserror::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{mpsc, Mutex, OwnedMutexGuard};
use tokio::time::{self, Instant};
use tracing::{debug, info, warn};
/// Tunables for one `ClaudeProcess`. Defaults match the OpenClaw reference
/// configuration: `bypassPermissions`, ~120 s watchdog window, ~10 min idle TTL.
#[derive(Debug, Clone)]
pub struct ClaudeCliConfig {
/// Path or name of the `claude` binary (looked up via `$PATH`).
pub binary: String,
/// Value passed to `--permission-mode`. The CLI accepts `default`,
/// `acceptEdits`, `plan`, `auto`, `dontAsk`, `bypassPermissions`.
pub permission_mode: String,
/// Idle session TTL — after this many seconds without a request the
/// session manager kills the child.
pub session_ttl: Duration,
/// Per-line watchdog: if no NDJSON line arrives for this long during a
/// turn, kill the child. Reset on every line (not every byte).
pub watchdog: Duration,
}
impl Default for ClaudeCliConfig {
fn default() -> Self {
Self {
binary: "claude".to_string(),
permission_mode: "bypassPermissions".to_string(),
session_ttl: Duration::from_secs(600),
watchdog: Duration::from_secs(120),
}
}
}
/// Errors produced while interacting with the child process.
#[derive(Debug, Error)]
pub enum ProcessError {
#[error("failed to spawn `{binary}`: {source}")]
Spawn {
binary: String,
#[source]
source: std::io::Error,
},
#[error("failed to write to claude stdin: {0}")]
StdinWrite(#[source] std::io::Error),
#[error("claude process exited unexpectedly")]
ExitedEarly,
/// `Command::spawn` succeeded but a piped stdio handle was already taken
/// by the time we asked for it. Should be unreachable given we set
/// `Stdio::piped()` immediately before spawn; surfaced as its own variant
/// so callers can tell it apart from a real "exited early".
#[error("claude child is missing piped {which} after spawn")]
MissingStdio { which: &'static str },
#[error("claude watchdog fired after {0:?} of silence")]
WatchdogTimeout(Duration),
#[error("failed to serialize stdin payload: {0}")]
Serialize(#[from] serde_json::Error),
#[error("turn already in progress for this session")]
TurnInProgress,
}
/// Strip down to the model alias / id the CLI's `--model` flag accepts.
/// Models registered via the wildcard `claude-cli/*` arrive prefixed with
/// `claude-cli/` (or just bare, e.g. `sonnet`); both forms are normalized
/// here.
pub fn normalize_model_arg(model: &str) -> &str {
model.strip_prefix("claude-cli/").unwrap_or(model)
}
/// Environment variables that must be removed before exec'ing `claude` so the
/// child uses its own login keychain rather than picking up server-side
/// credentials. The list mirrors the OpenClaw scrub list.
const SCRUB_ENV_PREFIXES: &[&str] = &["ANTHROPIC_", "CLAUDE_CODE_", "OTEL_"];
fn scrubbed_env_for_spawn() -> Vec<(String, String)> {
std::env::vars()
.filter(|(k, _)| !SCRUB_ENV_PREFIXES.iter().any(|p| k.starts_with(p)))
.collect()
}
/// One running `claude -p` subprocess plus the channels we use to talk to it.
/// Each `ClaudeProcess` is owned by exactly one session.
pub struct ClaudeProcess {
child: Mutex<Option<Child>>,
stdin: Mutex<Option<ChildStdin>>,
/// The receiver of `ClaudeCliEvent`s parsed from the child's stdout.
/// Wrapped in `Arc<Mutex>` so a `TurnStream` can hold an owned guard for
/// the duration of one turn (which serializes turns within a session).
event_rx: Arc<Mutex<mpsc::Receiver<ClaudeCliEvent>>>,
config: ClaudeCliConfig,
/// Last time a request was served on this session — used by the session
/// manager to enforce the idle TTL. Held under a sync mutex because the
/// critical section is one read/write of a `Copy` value with no `.await`,
/// which keeps `SessionManager` callers from holding the session-map lock
/// across an async hop.
last_used: StdMutex<Instant>,
/// Brightstaff-internal identifier — a deterministic UUID v5 derived from
/// the conversation prefix (or supplied by the client header). Stable
/// across retries so the manager can route follow-up turns to this same
/// child. NEVER passed to `claude` itself.
pub session_id: String,
/// Per-spawn random UUID v4 passed to `claude --session-id`. Always fresh
/// so we never collide with on-disk state (`~/.claude/projects/...`)
/// from a previous run of the same conversation. Also stamped onto every
/// stdin JSONL event so the CLI can verify the turn matches its session.
cli_session_id: String,
}
impl ClaudeProcess {
/// Spawn a new child for `session_id`. The first turn for a new session
/// should be the user's Anthropic request body — see
/// [`ClaudeProcess::send_user_turn`] for that.
pub async fn spawn(
session_id: String,
model: &str,
system_prompt: Option<&str>,
cwd: Option<&std::path::Path>,
config: ClaudeCliConfig,
) -> Result<Arc<Self>, ProcessError> {
// Always hand the CLI a brand-new UUID. `--no-session-persistence`
// does NOT actually prevent Claude Code from writing
// `~/.claude/projects/<workspace>/<id>.jsonl` — it only blocks
// resumability — so re-using our deterministic `session_id` would
// collide with any prior run of the same conversation and the CLI
// would exit with `Session ID ... is already in use`.
let cli_session_id = uuid::Uuid::new_v4().to_string();
let mut cmd = Command::new(&config.binary);
cmd.arg("-p")
.arg("--output-format")
.arg("stream-json")
.arg("--input-format")
.arg("stream-json")
.arg("--verbose")
.arg("--include-partial-messages")
.arg("--permission-mode")
.arg(&config.permission_mode)
.arg("--model")
.arg(normalize_model_arg(model))
.arg("--session-id")
.arg(&cli_session_id)
.arg("--no-session-persistence");
if let Some(prompt) = system_prompt {
// Append (don't replace) so Claude Code's built-in system prompt
// — which carries tool definitions — is preserved.
cmd.arg("--append-system-prompt").arg(prompt);
}
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
cmd.env_clear();
for (k, v) in scrubbed_env_for_spawn() {
cmd.env(k, v);
}
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| ProcessError::Spawn {
binary: config.binary.clone(),
source: e,
})?;
let stdin = child
.stdin
.take()
.ok_or(ProcessError::MissingStdio { which: "stdin" })?;
let stdout = child
.stdout
.take()
.ok_or(ProcessError::MissingStdio { which: "stdout" })?;
let stderr = child
.stderr
.take()
.ok_or(ProcessError::MissingStdio { which: "stderr" })?;
// Bounded channel — backpressure if the consumer is slow, but large
// enough that bursts of small text deltas do not block stdout drain.
let (tx, rx) = mpsc::channel::<ClaudeCliEvent>(256);
let session_for_log = session_id.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
if let Some(parsed) = parse_ndjson_line(&line) {
match parsed {
Ok(ev) => {
if tx.send(ev).await.is_err() {
break;
}
}
Err(err) => {
warn!(
session = %session_for_log,
error = %err,
line = %line,
"failed to parse claude NDJSON line"
);
}
}
}
}
Ok(None) => {
debug!(session = %session_for_log, "claude stdout closed");
break;
}
Err(err) => {
warn!(
session = %session_for_log,
error = %err,
"claude stdout read error"
);
break;
}
}
}
});
let session_for_stderr = session_id.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
if !line.trim().is_empty() {
warn!(session = %session_for_stderr, line = %line, "claude stderr");
}
}
});
info!(
session = %session_id,
cli_session = %cli_session_id,
model = %normalize_model_arg(model),
"spawned claude-cli"
);
Ok(Arc::new(Self {
child: Mutex::new(Some(child)),
stdin: Mutex::new(Some(stdin)),
event_rx: Arc::new(Mutex::new(rx)),
config,
last_used: StdMutex::new(Instant::now()),
session_id,
cli_session_id,
}))
}
/// The UUID that `claude --session-id` was launched with. The bridge has
/// to stamp every stdin JSONL event with this id so the CLI accepts the
/// turn as belonging to its current session — see
/// [`Self::session_id`] for why this is distinct from the brightstaff
/// session id.
pub fn cli_session_id(&self) -> &str {
&self.cli_session_id
}
/// Write the user-turn JSONL events to the child's stdin and return a
/// stream that yields parsed CLI events for this turn until the terminal
/// `result` event (or watchdog) ends it.
///
/// Holds an exclusive lock on the event receiver for the duration of the
/// turn, so concurrent calls return [`ProcessError::TurnInProgress`].
pub async fn send_user_turn(
&self,
events: &[ClaudeCliInputEvent],
) -> Result<TurnStream, ProcessError> {
// Sync lock + Copy value; never held across an `.await`.
if let Ok(mut last) = self.last_used.lock() {
*last = Instant::now();
}
// Claim the event receiver for the lifetime of this turn.
let rx_guard = Arc::clone(&self.event_rx)
.try_lock_owned()
.map_err(|_| ProcessError::TurnInProgress)?;
let mut stdin_guard = self.stdin.lock().await;
let stdin = stdin_guard.as_mut().ok_or(ProcessError::ExitedEarly)?;
for ev in events {
let mut bytes = serde_json::to_vec(ev)?;
bytes.push(b'\n');
stdin
.write_all(&bytes)
.await
.map_err(ProcessError::StdinWrite)?;
}
stdin.flush().await.map_err(ProcessError::StdinWrite)?;
Ok(TurnStream {
rx: rx_guard,
watchdog: self.config.watchdog,
done: false,
})
}
/// Most-recent activity timestamp; used by the session manager's reaper.
/// Sync because the lock guards a single `Instant` with no `.await` in
/// the critical section — keeps callers from holding async locks across
/// an await point.
pub fn last_used(&self) -> Instant {
// Poisoning is impossible here (the only writer is `send_user_turn`
// which never panics while holding the lock), but if it ever happens
// we degrade gracefully rather than aborting.
self.last_used
.lock()
.map(|g| *g)
.unwrap_or_else(|p| *p.into_inner())
}
/// Forcefully terminate the child. Safe to call multiple times.
pub async fn shutdown(&self) {
if let Some(mut child) = self.child.lock().await.take() {
let _ = child.start_kill();
let _ = child.wait().await;
}
// Dropping stdin signals the child if it survived `start_kill`.
let _ = self.stdin.lock().await.take();
}
}
/// One-shot stream of CLI events for a single user turn. Yields events until
/// the terminal `result` event is observed (or the watchdog fires). Drops the
/// owned receiver lock when finished, allowing the next turn to start.
pub struct TurnStream {
rx: OwnedMutexGuard<mpsc::Receiver<ClaudeCliEvent>>,
watchdog: Duration,
done: bool,
}
impl TurnStream {
/// Pull the next CLI event from the child, applying the per-line
/// watchdog. Returns `Ok(None)` when the turn's terminal `result` event
/// has been delivered.
pub async fn next(&mut self) -> Result<Option<ClaudeCliEvent>, ProcessError> {
if self.done {
return Ok(None);
}
match time::timeout(self.watchdog, self.rx.recv()).await {
Ok(Some(ev)) => {
if matches!(ev, ClaudeCliEvent::Result { .. }) {
self.done = true;
}
Ok(Some(ev))
}
Ok(None) => {
self.done = true;
Err(ProcessError::ExitedEarly)
}
Err(_) => {
self.done = true;
Err(ProcessError::WatchdogTimeout(self.watchdog))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_model_arg_strips_prefix() {
assert_eq!(normalize_model_arg("claude-cli/sonnet"), "sonnet");
assert_eq!(
normalize_model_arg("claude-cli/claude-opus-4-7"),
"claude-opus-4-7"
);
assert_eq!(normalize_model_arg("sonnet"), "sonnet");
}
// Note: cannot mutate process env in unit tests safely since tests run
// in parallel; spawn integration tests cover env behavior end-to-end via
// the fake_claude.sh fixture.
}

View file

@ -0,0 +1,344 @@
//! HTTP server fronting the claude-cli bridge. Speaks Anthropic Messages API
//! (`POST /v1/messages`) on a localhost port; everything inside this module
//! delegates to `hermesllm::apis::claude_cli` for translation and to
//! `super::session::SessionManager` for subprocess lifecycle.
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use bytes::Bytes;
use hermesllm::apis::anthropic::MessagesRequest;
use hermesllm::apis::claude_cli::{
cli_error_to_anthropic_error_body, cli_event_to_messages_stream_event,
collect_to_messages_response, extract_system_prompt, messages_request_to_stdin_payload,
synthetic_message_start, ClaudeCliEvent,
};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Frame, Incoming};
use hyper::header::{self, HeaderValue};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, warn};
use super::session::{SessionManager, SESSION_HEADER};
/// Spawn the claude-cli bridge listener. The returned `JoinHandle` resolves
/// when the listener loop exits (either via the provided shutdown signal or a
/// fatal accept error). On shutdown the manager drains all active sessions.
pub async fn run_listener<F>(
addr: SocketAddr,
manager: Arc<SessionManager>,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let listener = TcpListener::bind(addr).await?;
info!(%addr, "claude-cli bridge listening");
let manager_for_shutdown = Arc::clone(&manager);
tokio::pin!(shutdown);
loop {
tokio::select! {
accept = listener.accept() => {
let (stream, peer) = match accept {
Ok(s) => s,
Err(err) => {
warn!(error = ?err, "claude-cli accept error");
continue;
}
};
debug!(peer = ?peer, "claude-cli accepted connection");
let manager = Arc::clone(&manager);
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
let svc = service_fn(move |req| {
let manager = Arc::clone(&manager);
async move { handle(req, manager).await }
});
if let Err(err) = http1::Builder::new().serve_connection(io, svc).await {
warn!(error = ?err, "claude-cli connection error");
}
});
}
_ = &mut shutdown => {
info!("claude-cli bridge shutting down");
manager_for_shutdown.shutdown_all().await;
return Ok(());
}
}
}
}
async fn handle(
req: Request<Incoming>,
manager: Arc<SessionManager>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, hyper::Error> {
let path = req.uri().path();
let method = req.method();
if method == Method::GET && path == "/healthz" {
return Ok(text_response(StatusCode::OK, "ok"));
}
if method != Method::POST || path != "/v1/messages" {
return Ok(text_response(StatusCode::NOT_FOUND, "not found"));
}
// Pull out the optional session header up front so we can drop the
// request after consuming the body.
let session_header = req
.headers()
.get(SESSION_HEADER)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
let body_bytes = match req.collect().await {
Ok(c) => c.to_bytes(),
Err(err) => {
warn!(error = %err, "failed to read claude-cli request body");
return Ok(json_error(StatusCode::BAD_REQUEST, "failed to read body"));
}
};
let parsed: MessagesRequest = match serde_json::from_slice(&body_bytes) {
Ok(p) => p,
Err(err) => {
warn!(error = %err, "failed to parse Anthropic MessagesRequest");
return Ok(json_error(
StatusCode::BAD_REQUEST,
&format!("invalid Anthropic MessagesRequest: {err}"),
));
}
};
let session_id = SessionManager::resolve_session_id(session_header.as_deref(), &parsed);
let system_prompt = extract_system_prompt(&parsed);
let process = match manager
.get_or_spawn(&session_id, &parsed.model, system_prompt.as_deref(), None)
.await
{
Ok(p) => p,
Err(err) => {
error!(session = %session_id, error = %err, "failed to spawn claude-cli");
return Ok(json_error(
StatusCode::BAD_GATEWAY,
&format!("failed to spawn claude-cli: {err}"),
));
}
};
// Stamp stdin events with the CLI's per-spawn UUID, NOT our deterministic
// brightstaff session id. The CLI rejects the turn if the two disagree.
let stdin_payload =
match messages_request_to_stdin_payload(&parsed, Some(process.cli_session_id())) {
Ok(p) => p,
Err(err) => {
warn!(error = %err, "failed to build claude-cli stdin payload");
return Ok(json_error(
StatusCode::BAD_REQUEST,
&format!("failed to build claude-cli stdin payload: {err}"),
));
}
};
let streaming = parsed.stream.unwrap_or(false);
let model = parsed.model.clone();
let mut turn = match process.send_user_turn(&stdin_payload).await {
Ok(t) => t,
Err(err) => {
error!(session = %session_id, error = %err, "failed to send user turn");
return Ok(json_error(
StatusCode::BAD_GATEWAY,
&format!("failed to send user turn: {err}"),
));
}
};
if streaming {
Ok(stream_response(turn, model, session_id))
} else {
// Drain the entire turn before answering.
let mut events: Vec<ClaudeCliEvent> = Vec::new();
loop {
match turn.next().await {
Ok(Some(ev)) => events.push(ev),
Ok(None) => break,
Err(err) => {
warn!(session = %session_id, error = %err, "claude-cli turn failed");
let body = cli_error_to_anthropic_error_body(&err.to_string());
return Ok(json_response(StatusCode::BAD_GATEWAY, &body));
}
}
}
match collect_to_messages_response(&model, events) {
Ok(resp) => Ok(json_response(StatusCode::OK, &resp)),
Err(err) => {
let body = cli_error_to_anthropic_error_body(&err.to_string());
Ok(json_response(StatusCode::BAD_GATEWAY, &body))
}
}
}
}
fn stream_response(
mut turn: super::process::TurnStream,
model: String,
session_id: String,
) -> Response<BoxBody<Bytes, Infallible>> {
let (tx, rx) = mpsc::channel::<Result<Frame<Bytes>, Infallible>>(64);
tokio::spawn(async move {
use hermesllm::apis::anthropic::MessagesStreamEvent;
// Some short turns skip MessageStart; emit a synthetic one so the
// client always sees a complete stream.
let mut emitted_message_start = false;
loop {
let ev = match turn.next().await {
Ok(Some(ev)) => ev,
Ok(None) => break,
Err(err) => {
warn!(session = %session_id, error = %err, "claude-cli streaming turn failed");
let body = cli_error_to_anthropic_error_body(&err.to_string());
let payload = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string());
let frame = Frame::data(format_sse("error", &payload));
let _ = tx.send(Ok(frame)).await;
break;
}
};
// Synthesize a MessageStart frame the first time we see anything
// that advances the stream (StreamEvent or Result) and isn't
// already a MessageStart. Untranslated events (System/Assistant/
// User/Unknown) don't trigger synthesis — we silently skip them
// and wait for the real or synthetic start later.
let is_message_start = matches!(
&ev,
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageStart { .. }
}
);
let advances_stream = matches!(
&ev,
ClaudeCliEvent::StreamEvent { .. } | ClaudeCliEvent::Result { .. }
);
if !emitted_message_start && advances_stream {
if !is_message_start {
let synthetic = synthetic_message_start(&model, Some(&session_id));
if let Some(frame) = sse_frame_for_event(&synthetic) {
if tx.send(Ok(frame)).await.is_err() {
break;
}
}
}
emitted_message_start = true;
}
if let Some(translated) = cli_event_to_messages_stream_event(&ev) {
if let Some(frame) = sse_frame_for_event(&translated) {
if tx.send(Ok(frame)).await.is_err() {
break;
}
}
}
if let ClaudeCliEvent::Result {
is_error, result, ..
} = &ev
{
if *is_error {
let msg = result
.clone()
.unwrap_or_else(|| "claude-cli returned an error".to_string());
let body = cli_error_to_anthropic_error_body(&msg);
let payload = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string());
let frame = Frame::data(format_sse("error", &payload));
if tx.send(Ok(frame)).await.is_err() {
break;
}
}
break;
}
}
});
let body = StreamBody::new(ReceiverStream::new(rx));
let mut resp = Response::new(body.boxed());
*resp.status_mut() = StatusCode::OK;
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
resp.headers_mut()
.insert(header::CACHE_CONTROL, HeaderValue::from_static("no-cache"));
resp.headers_mut()
.insert("X-Accel-Buffering", HeaderValue::from_static("no"));
resp
}
fn sse_frame_for_event(
event: &hermesllm::apis::anthropic::MessagesStreamEvent,
) -> Option<Frame<Bytes>> {
use hermesllm::apis::anthropic::MessagesStreamEvent;
let event_name = match event {
MessagesStreamEvent::MessageStart { .. } => "message_start",
MessagesStreamEvent::ContentBlockStart { .. } => "content_block_start",
MessagesStreamEvent::ContentBlockDelta { .. } => "content_block_delta",
MessagesStreamEvent::ContentBlockStop { .. } => "content_block_stop",
MessagesStreamEvent::MessageDelta { .. } => "message_delta",
MessagesStreamEvent::MessageStop => "message_stop",
MessagesStreamEvent::Ping => "ping",
};
let data = serde_json::to_string(event).ok()?;
Some(Frame::data(format_sse(event_name, &data)))
}
fn format_sse(event: &str, data: &str) -> Bytes {
Bytes::from(format!("event: {event}\ndata: {data}\n\n"))
}
fn json_response<T: serde::Serialize>(
status: StatusCode,
body: &T,
) -> Response<BoxBody<Bytes, Infallible>> {
let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
let body = Full::new(Bytes::from(bytes))
.map_err(|e| match e {})
.boxed();
let mut resp = Response::new(body);
*resp.status_mut() = status;
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
resp
}
fn json_error(status: StatusCode, message: &str) -> Response<BoxBody<Bytes, Infallible>> {
let body = cli_error_to_anthropic_error_body(message);
json_response(status, &body)
}
fn text_response(
status: StatusCode,
message: &'static str,
) -> Response<BoxBody<Bytes, Infallible>> {
let body = Full::new(Bytes::from_static(message.as_bytes()))
.map_err(|e| match e {})
.boxed();
let mut resp = Response::new(body);
*resp.status_mut() = status;
resp.headers_mut()
.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"));
resp
}

View file

@ -0,0 +1,323 @@
//! Session manager for the claude-cli bridge. Maps a stable session id (taken
//! from a client-provided header or hashed from the conversation prefix) to a
//! long-lived `ClaudeProcess`. Enforces an idle TTL and a hard cap on the
//! number of concurrent sessions.
use std::collections::HashMap;
use std::sync::Arc;
use hermesllm::apis::anthropic::{
MessagesContentBlock, MessagesMessageContent, MessagesRequest, MessagesRole,
MessagesSystemPrompt,
};
use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, info};
use super::process::{ClaudeCliConfig, ClaudeProcess, ProcessError};
/// Optional client header that pins a request to a specific session id.
pub const SESSION_HEADER: &str = "x-arch-claude-cli-session";
/// Default cap. The bridge is local and per-developer; this is a guard
/// against runaway memory if a client bug churns through unique session ids.
pub const DEFAULT_MAX_SESSIONS: usize = 64;
/// Tunables for the session manager.
#[derive(Debug, Clone)]
pub struct SessionManagerConfig {
pub max_sessions: usize,
pub process: ClaudeCliConfig,
}
impl Default for SessionManagerConfig {
fn default() -> Self {
Self {
max_sessions: DEFAULT_MAX_SESSIONS,
process: ClaudeCliConfig::default(),
}
}
}
/// Holds active `ClaudeProcess` handles keyed by session id.
pub struct SessionManager {
inner: Mutex<HashMap<String, Arc<ClaudeProcess>>>,
config: SessionManagerConfig,
}
impl SessionManager {
pub fn new(config: SessionManagerConfig) -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(HashMap::new()),
config,
})
}
/// Pick (or fabricate) the session id for a given request.
///
/// Strategy (in order):
/// 1. Honor the `x-arch-claude-cli-session` header if it's a non-empty
/// valid UUID-shaped string.
/// 2. Otherwise hash `(model, system_prompt_text, first_user_message_text)`
/// and produce a deterministic UUID-shaped id so retries of the same
/// conversation reuse the same process.
pub fn resolve_session_id(client_header: Option<&str>, req: &MessagesRequest) -> String {
if let Some(raw) = client_header {
let trimmed = raw.trim();
if !trimmed.is_empty() {
// Accept any opaque token; the CLI requires UUID format, so
// we hash unknown shapes into one.
if uuid::Uuid::parse_str(trimmed).is_ok() {
return trimmed.to_string();
}
return uuid_from_seed(trimmed);
}
}
// Build a deterministic seed from (model, system_prompt, first user
// message) so a retried conversation lands on the same session.
let mut seed = String::new();
seed.push_str(&req.model);
seed.push('\u{1f}');
if let Some(system) = &req.system {
seed.push_str(&system_text(system));
}
seed.push('\u{1f}');
if let Some(first) = first_user_message_text(req) {
seed.push_str(&first);
}
uuid_from_seed(&seed)
}
/// Get the existing session's process or spawn a new one.
pub async fn get_or_spawn(
&self,
session_id: &str,
model: &str,
system_prompt: Option<&str>,
cwd: Option<&std::path::Path>,
) -> Result<Arc<ClaudeProcess>, ProcessError> {
// Reap idle sessions on the read path so we don't need a separate
// background task for the common one-developer-one-laptop deployment.
self.evict_idle().await;
// Single lock acquisition for the whole get-or-spawn path. `last_used`
// is now a sync mutex on `ClaudeProcess`, so iterating to find the
// LRU victim does not block other tasks across an `.await`.
let mut map = self.inner.lock().await;
if let Some(existing) = map.get(session_id) {
debug!(session = %session_id, "reusing claude-cli session");
return Ok(Arc::clone(existing));
}
// If we are at the cap, take an LRU victim out of the map first so
// its slot is freed before we insert. We drop the lock for the
// shutdown await (killing a child can take a tick), accepting that
// the cap can drift by one if a concurrent task spawns in that
// window — the next reap will catch it.
let victim = if map.len() >= self.config.max_sessions {
let victim_key = lru_session_id(&map);
victim_key.and_then(|k| map.remove(&k).map(|v| (k, v)))
} else {
None
};
// Spawn outside of any lock if we have to wait on a victim shutdown.
let process = if let Some((victim_key, victim_proc)) = victim {
drop(map);
info!(session = %victim_key, "evicting LRU claude-cli session to make room");
victim_proc.shutdown().await;
let process = ClaudeProcess::spawn(
session_id.to_string(),
model,
system_prompt,
cwd,
self.config.process.clone(),
)
.await?;
self.inner
.lock()
.await
.insert(session_id.to_string(), Arc::clone(&process));
process
} else {
// No eviction needed — keep holding the map lock across spawn so
// we don't race with another caller resolving the same id.
let process = ClaudeProcess::spawn(
session_id.to_string(),
model,
system_prompt,
cwd,
self.config.process.clone(),
)
.await?;
map.insert(session_id.to_string(), Arc::clone(&process));
process
};
Ok(process)
}
/// Drop and kill all sessions. Called on graceful shutdown.
pub async fn shutdown_all(&self) {
let mut map = self.inner.lock().await;
let drained: Vec<_> = map.drain().collect();
drop(map);
info!(count = drained.len(), "draining claude-cli sessions");
for (_, proc) in drained {
proc.shutdown().await;
}
}
async fn evict_idle(&self) {
let ttl = self.config.process.session_ttl;
if ttl.is_zero() {
return;
}
let now = Instant::now();
// Collect victims under a single lock acquisition; `last_used()` is
// sync, so the iteration never crosses an `.await`.
let to_kill: Vec<(String, Arc<ClaudeProcess>)> = {
let mut map = self.inner.lock().await;
let keys: Vec<String> = map
.iter()
.filter(|(_, v)| now.duration_since(v.last_used()) > ttl)
.map(|(k, _)| k.clone())
.collect();
keys.into_iter()
.filter_map(|k| map.remove(&k).map(|v| (k, v)))
.collect()
};
for (k, proc) in to_kill {
info!(session = %k, "evicting idle claude-cli session");
proc.shutdown().await;
}
}
}
/// Pick the least-recently-used session id from the map. Sync because
/// `ClaudeProcess::last_used` is sync.
fn lru_session_id(map: &HashMap<String, Arc<ClaudeProcess>>) -> Option<String> {
map.iter()
.min_by_key(|(_, v)| v.last_used())
.map(|(k, _)| k.clone())
}
fn first_user_message_text(req: &MessagesRequest) -> Option<String> {
for msg in &req.messages {
if msg.role != MessagesRole::User {
continue;
}
return Some(match &msg.content {
MessagesMessageContent::Single(s) => s.clone(),
MessagesMessageContent::Blocks(blocks) => blocks
.iter()
.filter_map(|b| match b {
MessagesContentBlock::Text { text, .. } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n"),
});
}
None
}
fn system_text(system: &MessagesSystemPrompt) -> String {
match system {
MessagesSystemPrompt::Single(s) => s.clone(),
MessagesSystemPrompt::Blocks(blocks) => blocks
.iter()
.filter_map(|b| match b {
MessagesContentBlock::Text { text, .. } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n"),
}
}
/// Deterministic UUIDv5 derived from an arbitrary seed string. The `claude`
/// CLI requires `--session-id` to be a valid UUID; v5 (SHA-1 based) gives
/// us a stable mapping across Rust toolchain versions, unlike `DefaultHasher`.
/// We use the OID namespace because the seed isn't a DNS or URL name.
fn uuid_from_seed(seed: &str) -> String {
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, seed.as_bytes()).to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use hermesllm::apis::anthropic::MessagesMessage;
fn req(model: &str, user: &str, system: Option<&str>) -> MessagesRequest {
MessagesRequest {
model: model.to_string(),
messages: vec![MessagesMessage {
role: MessagesRole::User,
content: MessagesMessageContent::Single(user.to_string()),
}],
max_tokens: 1024,
container: None,
mcp_servers: None,
system: system.map(|s| MessagesSystemPrompt::Single(s.to_string())),
metadata: None,
service_tier: None,
thinking: None,
temperature: None,
top_p: None,
top_k: None,
stream: Some(true),
stop_sequences: None,
tools: None,
tool_choice: None,
}
}
#[test]
fn header_uuid_is_used_as_is() {
let id = "550e8400-e29b-41d4-a716-446655440000";
let r = req("sonnet", "hi", None);
assert_eq!(SessionManager::resolve_session_id(Some(id), &r), id);
}
#[test]
fn header_non_uuid_is_normalized_to_uuid() {
let r = req("sonnet", "hi", None);
let id = SessionManager::resolve_session_id(Some("my-token"), &r);
assert!(uuid::Uuid::parse_str(&id).is_ok());
let id2 = SessionManager::resolve_session_id(Some("my-token"), &r);
assert_eq!(id, id2);
}
#[test]
fn empty_header_falls_back_to_hash() {
let r = req("sonnet", "hi", Some("you are helpful"));
let id = SessionManager::resolve_session_id(Some(""), &r);
assert!(uuid::Uuid::parse_str(&id).is_ok());
let id2 = SessionManager::resolve_session_id(None, &r);
assert_eq!(id, id2);
}
#[test]
fn hash_is_stable_across_repeats_and_distinct_across_inputs() {
let r1 = req("sonnet", "hello", None);
let r2 = req("sonnet", "hello", None);
let r3 = req("sonnet", "different", None);
let r4 = req("opus", "hello", None);
assert_eq!(
SessionManager::resolve_session_id(None, &r1),
SessionManager::resolve_session_id(None, &r2)
);
assert_ne!(
SessionManager::resolve_session_id(None, &r1),
SessionManager::resolve_session_id(None, &r3)
);
assert_ne!(
SessionManager::resolve_session_id(None, &r1),
SessionManager::resolve_session_id(None, &r4)
);
}
}

View file

@ -1,4 +1,5 @@
pub mod agents;
pub mod claude_cli;
pub mod debug;
pub mod function_calling;
pub mod llm;

View file

@ -4,6 +4,7 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use brightstaff::app_state::AppState;
use brightstaff::handlers::agents::orchestrator::agent_chat;
use brightstaff::handlers::claude_cli::{self, SessionManager, SessionManagerConfig};
use brightstaff::handlers::debug;
use brightstaff::handlers::empty;
use brightstaff::handlers::function_calling::function_calling_chat_handler;
@ -37,6 +38,7 @@ use opentelemetry::trace::FutureExt;
use opentelemetry_http::HeaderExtractor;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::{env, fs};
use tokio::net::TcpListener;
use tokio::sync::RwLock;
@ -592,6 +594,60 @@ async fn run_server(state: Arc<AppState>) -> Result<(), Box<dyn std::error::Erro
Ok(())
}
// ---------------------------------------------------------------------------
// claude-cli bridge wiring
// ---------------------------------------------------------------------------
/// Build the [`SessionManagerConfig`] from environment variables. Returns
/// `None` when `CLAUDE_CLI_LISTEN_ADDR` is unset, signaling that the bridge
/// should not start at all (zero-cost when no claude-cli provider exists).
///
/// Starts from `SessionManagerConfig::default()` and only overrides fields
/// for which the corresponding env var is both set and parses successfully.
/// This keeps the defaults in one place (the `Default` impls) so they can't
/// drift between this function and the library types.
fn claude_cli_config_from_env() -> Option<(std::net::SocketAddr, SessionManagerConfig)> {
let addr_str = env::var("CLAUDE_CLI_LISTEN_ADDR").ok()?;
let addr: std::net::SocketAddr = match addr_str.parse() {
Ok(a) => a,
Err(err) => {
warn!(
value = %addr_str,
error = %err,
"invalid CLAUDE_CLI_LISTEN_ADDR — claude-cli bridge disabled"
);
return None;
}
};
let mut cfg = SessionManagerConfig::default();
if let Ok(s) = env::var("CLAUDE_CLI_BIN") {
cfg.process.binary = s;
}
if let Ok(s) = env::var("CLAUDE_CLI_PERMISSION_MODE") {
cfg.process.permission_mode = s;
}
if let Some(secs) = env::var("CLAUDE_CLI_SESSION_TTL_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
{
cfg.process.session_ttl = Duration::from_secs(secs);
}
if let Some(secs) = env::var("CLAUDE_CLI_WATCHDOG_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
{
cfg.process.watchdog = Duration::from_secs(secs);
}
if let Some(n) = env::var("CLAUDE_CLI_MAX_SESSIONS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
{
cfg.max_sessions = n;
}
Some((addr, cfg))
}
// ---------------------------------------------------------------------------
// Entry point
// ---------------------------------------------------------------------------
@ -603,7 +659,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
bs_metrics::init();
info!("loaded plano_config.yaml");
let state = Arc::new(init_app_state(&config).await?);
run_server(state).await
// Optional claude-cli bridge listener. Started iff CLAUDE_CLI_LISTEN_ADDR
// is set in the environment (the Python CLI sets this when it detects a
// `model: claude-cli/*` provider entry).
let bridge_handle = if let Some((addr, cfg)) = claude_cli_config_from_env() {
let manager = SessionManager::new(cfg);
let shutdown = async {
let _ = tokio::signal::ctrl_c().await;
};
Some(tokio::spawn(async move {
if let Err(err) = claude_cli::run_listener(addr, manager, shutdown).await {
warn!(error = ?err, "claude-cli bridge listener exited with error");
}
}))
} else {
None
};
let result = run_server(state).await;
if let Some(handle) = bridge_handle {
// Ctrl-C already triggered the bridge's own shutdown; join briefly to
// give in-flight session drains a chance to finish.
let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
}
result
}
#[cfg(test)]

View file

@ -0,0 +1,204 @@
//! Integration test for the claude-cli bridge. Spins up the listener with a
//! fake `claude` shell script that emits a canned NDJSON sequence, then
//! verifies both the streaming SSE and non-streaming JSON code paths produce
//! the expected Anthropic Messages output.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use brightstaff::handlers::claude_cli::{
self, ClaudeCliConfig, SessionManager, SessionManagerConfig,
};
use serde_json::{json, Value};
use tokio::net::TcpListener;
use tokio::sync::oneshot;
fn fake_claude_path() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("fake_claude.sh")
}
async fn pick_free_addr() -> std::net::SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
addr
}
struct BridgeFixture {
addr: std::net::SocketAddr,
shutdown: Option<oneshot::Sender<()>>,
handle: Option<tokio::task::JoinHandle<()>>,
}
impl BridgeFixture {
async fn start() -> Self {
let addr = pick_free_addr().await;
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let manager = SessionManager::new(SessionManagerConfig {
max_sessions: 4,
process: ClaudeCliConfig {
binary: fake_claude_path().to_string_lossy().to_string(),
permission_mode: "bypassPermissions".to_string(),
session_ttl: Duration::from_secs(60),
watchdog: Duration::from_secs(5),
},
});
let manager_for_listener = Arc::clone(&manager);
let handle = tokio::spawn(async move {
let shutdown = async move {
let _ = shutdown_rx.await;
};
if let Err(err) = claude_cli::run_listener(addr, manager_for_listener, shutdown).await {
eprintln!("listener exited with error: {err}");
}
});
// Wait for the listener to bind. Loop until we can connect.
for _ in 0..50 {
if tokio::net::TcpStream::connect(addr).await.is_ok() {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
Self {
addr,
shutdown: Some(shutdown_tx),
handle: Some(handle),
}
}
async fn stop(mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(h) = self.handle.take() {
let _ = tokio::time::timeout(Duration::from_secs(3), h).await;
}
}
}
/// Best-effort cleanup if a test panics before `stop().await`. We can't
/// `.await` from `Drop`, so we just abort the listener task; that's enough to
/// keep the runtime from leaking the spawned future.
impl Drop for BridgeFixture {
fn drop(&mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(h) = self.handle.take() {
h.abort();
}
}
}
fn anthropic_request(stream: bool) -> Value {
json!({
"model": "claude-cli/sonnet",
"max_tokens": 64,
"stream": stream,
"messages": [
{"role": "user", "content": "say hi"}
]
})
}
#[tokio::test]
async fn streaming_request_emits_anthropic_sse() {
let fixture = BridgeFixture::start().await;
let url = format!("http://{}/v1/messages", fixture.addr);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&anthropic_request(true))
.send()
.await
.expect("send request");
assert_eq!(resp.status(), 200);
let ct = resp
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
assert!(
ct.starts_with("text/event-stream"),
"expected text/event-stream, got {ct}"
);
let body = resp.text().await.expect("read body");
// SSE event names should mirror Anthropic's wire format, in order.
let events: Vec<&str> = body
.lines()
.filter_map(|l| l.strip_prefix("event: "))
.collect();
assert_eq!(
events,
vec![
"message_start",
"content_block_start",
"content_block_delta",
"content_block_delta",
"content_block_stop",
"message_delta",
"message_stop",
],
"unexpected SSE event sequence:\n{body}"
);
// The two text deltas should reconstruct "Hello, world!".
let mut combined = String::new();
for line in body.lines() {
if let Some(payload) = line.strip_prefix("data: ") {
if let Ok(v) = serde_json::from_str::<Value>(payload) {
if v.get("type").and_then(|t| t.as_str()) == Some("content_block_delta") {
if let Some(text) = v
.get("delta")
.and_then(|d| d.get("text"))
.and_then(|t| t.as_str())
{
combined.push_str(text);
}
}
}
}
}
assert_eq!(combined, "Hello, world!");
fixture.stop().await;
}
#[tokio::test]
async fn non_streaming_request_returns_messages_response() {
let fixture = BridgeFixture::start().await;
let url = format!("http://{}/v1/messages", fixture.addr);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&anthropic_request(false))
.send()
.await
.expect("send request");
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.expect("parse json");
assert_eq!(body["type"], "message");
assert_eq!(body["role"], "assistant");
assert_eq!(body["stop_reason"], "end_turn");
assert_eq!(body["usage"]["input_tokens"], 3);
assert_eq!(body["usage"]["output_tokens"], 4);
let content = body["content"].as_array().expect("content array");
assert_eq!(content.len(), 1);
assert_eq!(content[0]["type"], "text");
assert_eq!(content[0]["text"], "Hello, world!");
fixture.stop().await;
}

View file

@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Stand-in for the real `claude` CLI used by the brightstaff integration test.
# Reads stdin (so it does not exit early when the bridge writes the user
# JSONL turn) and emits a canned `--output-format stream-json` NDJSON
# sequence that mirrors a one-turn "Hello, world!" response.
#
# All CLI flags are accepted and ignored; only the NDJSON output matters for
# the bridge-side translation.
set -euo pipefail
# Drain any stdin the parent writes so it does not see EPIPE.
( cat > /dev/null ) &
DRAIN_PID=$!
trap 'kill ${DRAIN_PID} 2>/dev/null || true' EXIT
cat <<'EOF'
{"type":"system","subtype":"init","session_id":"fake-session","model":"sonnet","cwd":"/tmp","tools":[]}
{"type":"stream_event","event":{"type":"message_start","message":{"id":"msg_fake","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-6","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":3,"output_tokens":0}}}}
{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":", world!"}}}
{"type":"stream_event","event":{"type":"content_block_stop","index":0}}
{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":0,"output_tokens":4}}}
{"type":"stream_event","event":{"type":"message_stop"}}
{"type":"result","subtype":"success","is_error":false,"duration_ms":12,"num_turns":1,"result":"Hello, world!","total_cost_usd":0.0001,"usage":{"input_tokens":3,"output_tokens":4},"session_id":"fake-session"}
EOF

View file

@ -400,6 +400,10 @@ pub enum LlmProviderType {
Vercel,
#[serde(rename = "openrouter")]
OpenRouter,
/// Claude Code CLI invoked as a local subprocess. The bridge runs inside
/// brightstaff (`CLAUDE_CLI_LISTEN_ADDR`) and exposes Anthropic Messages.
#[serde(rename = "claude-cli")]
ClaudeCli,
}
impl Display for LlmProviderType {
@ -425,6 +429,7 @@ impl Display for LlmProviderType {
LlmProviderType::DigitalOcean => write!(f, "digitalocean"),
LlmProviderType::Vercel => write!(f, "vercel"),
LlmProviderType::OpenRouter => write!(f, "openrouter"),
LlmProviderType::ClaudeCli => write!(f, "claude-cli"),
}
}
}
@ -772,6 +777,7 @@ mod test {
for (yaml_value, expected) in [
("vercel", LlmProviderType::Vercel),
("openrouter", LlmProviderType::OpenRouter),
("claude-cli", LlmProviderType::ClaudeCli),
] {
let parsed: LlmProviderType =
serde_yaml::from_str(yaml_value).expect("variant should deserialize");

View file

@ -0,0 +1,952 @@
//! Translation between Anthropic Messages API and Claude Code CLI's
//! `--output-format stream-json` / `--input-format stream-json` wire format.
//!
//! Claude Code CLI is invoked as a subprocess by `brightstaff` with flags such
//! as `claude -p --output-format stream-json --input-format stream-json
//! --include-partial-messages --verbose`. Each line on stdout is one JSON event
//! (NDJSON), and each line on stdin is a user-message JSON. This module owns
//! the pure (no-I/O) types and conversions; the runtime layer in brightstaff
//! does the actual spawning and streaming.
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use serde_with::skip_serializing_none;
use thiserror::Error;
use uuid::Uuid;
use crate::apis::anthropic::{
MessagesContentBlock, MessagesContentDelta, MessagesMessageContent, MessagesMessageDelta,
MessagesRequest, MessagesResponse, MessagesRole, MessagesStopReason, MessagesStreamEvent,
MessagesStreamMessage, MessagesSystemPrompt, MessagesUsage,
};
/// Errors produced by translation between Anthropic Messages and Claude Code
/// stream-json.
#[derive(Debug, Error)]
pub enum ClaudeCliTranslationError {
#[error("Claude CLI returned an error: {message}")]
CliError { message: String },
#[error("Failed to serialize stdin payload: {0}")]
SerializeStdin(#[from] serde_json::Error),
#[error("Claude CLI stream ended before a terminal `result` event")]
UnexpectedEnd,
}
// ---------------------------------------------------------------------------
// Wire types — output (Claude CLI -> us)
// ---------------------------------------------------------------------------
/// One line of NDJSON emitted on stdout by `claude -p --output-format
/// stream-json`. The CLI tags variants with a top-level `type` field, and
/// `system`/`result` carry an additional `subtype`.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClaudeCliEvent {
/// `type=system` events. The actual classification lives in `subtype`
/// (e.g. `init`, `api_retry`, `rate_limit_event`). We keep the raw fields
/// rather than enumerating subtypes so a new CLI release that adds a
/// subtype does not break parsing.
System {
#[serde(default)]
subtype: Option<String>,
#[serde(default)]
session_id: Option<String>,
#[serde(default)]
model: Option<String>,
#[serde(default)]
cwd: Option<String>,
#[serde(flatten)]
extra: Value,
},
/// A complete assistant message (emitted after the corresponding
/// `stream_event` deltas finish). Useful for non-streaming consumers.
Assistant { message: ClaudeCliAssistantMessage },
/// A complete user message echoed back (when `--replay-user-messages` is
/// set). We currently ignore these in translation but keep the variant so
/// stray events do not cause deserialization failures.
User {
#[serde(default)]
message: Value,
},
/// Wrapped Anthropic SSE event. The CLI re-emits the raw streaming-API
/// shape here when `--include-partial-messages` is enabled.
StreamEvent { event: MessagesStreamEvent },
/// Terminal event marking the end of one CLI turn. `is_error == true`
/// means the underlying API call failed; `result` typically holds the
/// final assistant text or an error message.
Result {
#[serde(default)]
subtype: Option<String>,
#[serde(default)]
is_error: bool,
#[serde(default)]
duration_ms: Option<u64>,
#[serde(default)]
num_turns: Option<u32>,
#[serde(default)]
result: Option<String>,
#[serde(default)]
total_cost_usd: Option<f64>,
#[serde(default)]
usage: Option<ClaudeCliUsage>,
#[serde(default)]
session_id: Option<String>,
},
/// Catch-all for events the CLI may add in the future. We surface them in
/// logs but do not translate them to Anthropic events.
#[serde(other)]
Unknown,
}
/// Subset of the Anthropic message shape the CLI emits inside `assistant`
/// events. We keep `content` as `Value` so we can decode text + tool_use
/// blocks without re-deriving every Anthropic content variant here.
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaudeCliAssistantMessage {
pub id: Option<String>,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub role: Option<String>,
#[serde(default)]
pub content: Vec<ClaudeCliContentBlock>,
#[serde(default)]
pub stop_reason: Option<String>,
#[serde(default)]
pub stop_sequence: Option<String>,
#[serde(default)]
pub usage: Option<ClaudeCliUsage>,
}
/// The CLI's `assistant.message.content[]` entries are a subset of Anthropic's
/// content blocks. We deserialize them into `MessagesContentBlock` directly
/// where possible and fall back to a tagged enum for the few fields we care
/// about explicitly (text + tool_use).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ClaudeCliContentBlock {
/// Anthropic-shaped content block (text, tool_use, thinking, ...).
Anthropic(MessagesContentBlock),
/// Anything we do not recognize is preserved as raw JSON so we can still
/// surface it in the `result` aggregation.
Unknown(Value),
}
#[skip_serializing_none]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ClaudeCliUsage {
#[serde(default)]
pub input_tokens: Option<u32>,
#[serde(default)]
pub output_tokens: Option<u32>,
#[serde(default)]
pub cache_creation_input_tokens: Option<u32>,
#[serde(default)]
pub cache_read_input_tokens: Option<u32>,
}
impl From<ClaudeCliUsage> for MessagesUsage {
fn from(u: ClaudeCliUsage) -> Self {
MessagesUsage {
input_tokens: u.input_tokens.unwrap_or(0),
output_tokens: u.output_tokens.unwrap_or(0),
cache_creation_input_tokens: u.cache_creation_input_tokens,
cache_read_input_tokens: u.cache_read_input_tokens,
}
}
}
// ---------------------------------------------------------------------------
// Wire types — input (us -> Claude CLI)
// ---------------------------------------------------------------------------
/// One line of NDJSON written to the CLI's stdin when invoked with
/// `--input-format stream-json`.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClaudeCliInputEvent {
User {
message: ClaudeCliUserMessage,
/// The session id assigned by the CLI on first turn. Optional on the
/// first message; required (and must match) on subsequent turns.
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
},
}
#[derive(Debug, Clone, Serialize)]
pub struct ClaudeCliUserMessage {
pub role: &'static str,
pub content: Value,
}
// ---------------------------------------------------------------------------
// Conversions
// ---------------------------------------------------------------------------
/// Map a `MessagesRequest` into the JSONL payload that should be written to
/// the CLI's stdin. Returns one event per user turn, in order, so callers can
/// either replay the full conversation on first spawn or send only the latest
/// turn for a hot session.
///
/// `session_id` (when set) is attached to every event so the CLI can verify
/// the turn belongs to the expected session.
pub fn messages_request_to_stdin_payload(
req: &MessagesRequest,
session_id: Option<&str>,
) -> Result<Vec<ClaudeCliInputEvent>, ClaudeCliTranslationError> {
let mut out = Vec::new();
for msg in &req.messages {
if msg.role != MessagesRole::User {
// Assistant turns are managed by the CLI internally; we skip them.
continue;
}
let content = message_content_to_cli_value(&msg.content);
out.push(ClaudeCliInputEvent::User {
message: ClaudeCliUserMessage {
role: "user",
content,
},
session_id: session_id.map(str::to_string),
});
}
Ok(out)
}
/// Build the `--append-system-prompt` value that should be passed when
/// spawning the CLI for this request. Returns `None` when the request has no
/// system prompt.
pub fn extract_system_prompt(req: &MessagesRequest) -> Option<String> {
req.system.as_ref().map(|s| match s {
MessagesSystemPrompt::Single(text) => text.clone(),
MessagesSystemPrompt::Blocks(blocks) => blocks
.iter()
.filter_map(|b| match b {
MessagesContentBlock::Text { text, .. } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n"),
})
}
fn message_content_to_cli_value(content: &MessagesMessageContent) -> Value {
match content {
MessagesMessageContent::Single(s) => Value::String(s.clone()),
MessagesMessageContent::Blocks(blocks) => {
// Preserve the structured block array so tool_result / image
// blocks survive intact across the stdin boundary.
serde_json::to_value(blocks).unwrap_or_else(|_| Value::Array(vec![]))
}
}
}
/// Translate a single CLI event into a corresponding Anthropic
/// `MessagesStreamEvent`, when one exists. Returns `None` for events that
/// have no SSE counterpart (CLI-internal `system` notifications, terminal
/// `result`, unrecognized variants, ...).
pub fn cli_event_to_messages_stream_event(ev: &ClaudeCliEvent) -> Option<MessagesStreamEvent> {
match ev {
ClaudeCliEvent::StreamEvent { event } => Some(event.clone()),
_ => None,
}
}
/// Aggregate a sequence of CLI events into a single non-streaming
/// `MessagesResponse`. Used by the bridge when the client did not request
/// streaming.
///
/// The terminal `result` event is required: if the iterator ends without one,
/// we return [`ClaudeCliTranslationError::UnexpectedEnd`].
pub fn collect_to_messages_response<I>(
model: &str,
events: I,
) -> Result<MessagesResponse, ClaudeCliTranslationError>
where
I: IntoIterator<Item = ClaudeCliEvent>,
{
let mut content_blocks: Vec<MessagesContentBlock> = Vec::new();
// Accumulate per-index text deltas + tool-use input deltas as the CLI
// emits content_block_start -> content_block_delta(s) -> content_block_stop.
let mut text_accum: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
let mut tool_accum: std::collections::HashMap<u32, (String, String, String)> =
std::collections::HashMap::new();
let mut block_order: Vec<(u32, BlockKind)> = Vec::new();
let mut stop_reason = MessagesStopReason::EndTurn;
let mut stop_sequence: Option<String> = None;
let mut usage = MessagesUsage {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
};
let mut id = String::new();
let mut model_out = model.to_string();
let mut last_assistant_message: Option<ClaudeCliAssistantMessage> = None;
let mut saw_result = false;
let mut error_message: Option<String> = None;
for ev in events {
match ev {
ClaudeCliEvent::StreamEvent { event } => match event {
MessagesStreamEvent::MessageStart { message } => {
if id.is_empty() {
id.clone_from(&message.id);
}
if !message.model.is_empty() {
model_out.clone_from(&message.model);
}
usage = message.usage.clone();
}
MessagesStreamEvent::ContentBlockStart {
index,
content_block,
} => match content_block {
MessagesContentBlock::Text { text, .. } => {
text_accum.insert(index, text);
block_order.push((index, BlockKind::Text));
}
MessagesContentBlock::ToolUse {
id: tool_id, name, ..
} => {
// Anthropic streaming always starts a tool_use block
// with an empty `input` placeholder (`{}` or `null`);
// the real arguments arrive via `input_json_delta`s.
// Always start with an empty buffer so deltas
// assemble into valid JSON.
tool_accum.insert(index, (tool_id, name, String::new()));
block_order.push((index, BlockKind::ToolUse));
}
other => {
// Unknown block kind — preserve verbatim by pushing it
// immediately. We do not expect deltas for this index.
content_blocks.push(other);
}
},
MessagesStreamEvent::ContentBlockDelta { index, delta } => match delta {
MessagesContentDelta::TextDelta { text } => {
text_accum.entry(index).or_default().push_str(&text);
}
MessagesContentDelta::InputJsonDelta { partial_json } => {
if let Some((_, _, buf)) = tool_accum.get_mut(&index) {
buf.push_str(&partial_json);
}
}
// Thinking/signature deltas are surfaced to streaming
// clients but dropped from the non-streaming aggregate.
_ => {}
},
MessagesStreamEvent::MessageDelta {
delta,
usage: msg_usage,
} => {
let MessagesMessageDelta {
stop_reason: sr,
stop_sequence: ss,
} = delta;
stop_reason = sr;
stop_sequence = ss;
// The MessageDelta usage carries final output_tokens.
usage.output_tokens = msg_usage.output_tokens;
}
MessagesStreamEvent::ContentBlockStop { .. }
| MessagesStreamEvent::MessageStop
| MessagesStreamEvent::Ping => {}
},
ClaudeCliEvent::Assistant { message } => {
last_assistant_message = Some(message);
}
ClaudeCliEvent::Result {
is_error,
result,
usage: result_usage,
..
} => {
saw_result = true;
if is_error {
error_message = Some(result.unwrap_or_else(|| "Claude CLI failed".to_string()));
}
if let Some(u) = result_usage {
let merged: MessagesUsage = u.into();
if merged.input_tokens > 0 {
usage.input_tokens = merged.input_tokens;
}
if merged.output_tokens > 0 {
usage.output_tokens = merged.output_tokens;
}
if merged.cache_creation_input_tokens.is_some() {
usage.cache_creation_input_tokens = merged.cache_creation_input_tokens;
}
if merged.cache_read_input_tokens.is_some() {
usage.cache_read_input_tokens = merged.cache_read_input_tokens;
}
}
}
ClaudeCliEvent::System { .. }
| ClaudeCliEvent::User { .. }
| ClaudeCliEvent::Unknown => {}
}
}
if let Some(msg) = error_message {
return Err(ClaudeCliTranslationError::CliError { message: msg });
}
if !saw_result {
return Err(ClaudeCliTranslationError::UnexpectedEnd);
}
// Materialize accumulated blocks in the order they were started.
let mut sorted_indices = block_order.clone();
sorted_indices.sort_by_key(|(idx, _)| *idx);
for (idx, kind) in sorted_indices {
match kind {
BlockKind::Text => {
if let Some(text) = text_accum.remove(&idx) {
content_blocks.push(MessagesContentBlock::Text {
text,
cache_control: None,
});
}
}
BlockKind::ToolUse => {
if let Some((tool_id, name, raw_input)) = tool_accum.remove(&idx) {
let input_value = if raw_input.is_empty() {
Value::Object(Map::default())
} else {
serde_json::from_str(&raw_input)
.unwrap_or_else(|_| Value::String(raw_input))
};
content_blocks.push(MessagesContentBlock::ToolUse {
id: tool_id,
name,
input: input_value,
cache_control: None,
});
}
}
}
}
// If the streaming events did not include any content but the CLI sent a
// final `assistant` message (common for short responses), use that as the
// body of the response.
if content_blocks.is_empty() {
if let Some(msg) = last_assistant_message {
for block in msg.content {
if let ClaudeCliContentBlock::Anthropic(b) = block {
content_blocks.push(b);
}
}
if id.is_empty() {
if let Some(msg_id) = msg.id {
id = msg_id;
}
}
if let Some(m) = msg.model {
if !m.is_empty() {
model_out = m;
}
}
if let Some(u) = msg.usage {
let merged: MessagesUsage = u.into();
if usage.input_tokens == 0 {
usage.input_tokens = merged.input_tokens;
}
if usage.output_tokens == 0 {
usage.output_tokens = merged.output_tokens;
}
if usage.cache_creation_input_tokens.is_none() {
usage.cache_creation_input_tokens = merged.cache_creation_input_tokens;
}
if usage.cache_read_input_tokens.is_none() {
usage.cache_read_input_tokens = merged.cache_read_input_tokens;
}
}
}
}
if id.is_empty() {
id = format!("msg_cli_{}", Uuid::new_v4().simple());
}
Ok(MessagesResponse {
id,
obj_type: "message".to_string(),
role: MessagesRole::Assistant,
content: content_blocks,
model: model_out,
stop_reason,
stop_sequence,
usage,
container: None,
})
}
#[derive(Clone, Copy)]
enum BlockKind {
Text,
ToolUse,
}
/// Build an Anthropic-style error envelope JSON for a CLI-level failure. The
/// brightstaff bridge serializes this and returns it with a 502/500 status so
/// the existing `llm_gateway` error handling sees a familiar shape.
pub fn cli_error_to_anthropic_error_body(message: &str) -> Value {
json!({
"type": "error",
"error": {
"type": "claude_cli_error",
"message": message,
}
})
}
/// Synthesize a `message_start` event for streaming clients in cases where
/// the CLI did not emit one (it usually does, but very small turns can skip
/// straight to `assistant`/`result`).
pub fn synthetic_message_start(model: &str, session_id: Option<&str>) -> MessagesStreamEvent {
let id = session_id.map_or_else(
|| format!("msg_cli_{}", Uuid::new_v4().simple()),
|s| format!("msg_cli_{s}"),
);
MessagesStreamEvent::MessageStart {
message: MessagesStreamMessage {
id,
obj_type: "message".to_string(),
role: MessagesRole::Assistant,
content: Vec::new(),
model: model.to_string(),
stop_reason: None,
stop_sequence: None,
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
},
}
}
/// Convenience: parse one NDJSON line into a `ClaudeCliEvent`. Whitespace-only
/// lines deserialize to `None` so callers can simply skip them.
pub fn parse_ndjson_line(line: &str) -> Option<Result<ClaudeCliEvent, serde_json::Error>> {
let trimmed = line.trim();
if trimmed.is_empty() {
return None;
}
Some(serde_json::from_str(trimmed))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::apis::anthropic::{MessagesMessage, MessagesMessageContent};
fn user_request(text: &str) -> MessagesRequest {
MessagesRequest {
model: "claude-cli/sonnet".to_string(),
messages: vec![MessagesMessage {
role: MessagesRole::User,
content: MessagesMessageContent::Single(text.to_string()),
}],
max_tokens: 1024,
container: None,
mcp_servers: None,
system: None,
metadata: None,
service_tier: None,
thinking: None,
temperature: None,
top_p: None,
top_k: None,
stream: Some(true),
stop_sequences: None,
tools: None,
tool_choice: None,
}
}
#[test]
fn parses_system_init_event() {
let line = r#"{"type":"system","subtype":"init","session_id":"s1","model":"sonnet","cwd":"/tmp","tools":[]}"#;
let parsed = parse_ndjson_line(line).expect("non-empty").expect("ok");
match parsed {
ClaudeCliEvent::System {
subtype,
session_id,
model,
..
} => {
assert_eq!(subtype.as_deref(), Some("init"));
assert_eq!(session_id.as_deref(), Some("s1"));
assert_eq!(model.as_deref(), Some("sonnet"));
}
other => panic!("expected System, got {other:?}"),
}
}
#[test]
fn parses_text_stream_event() {
let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"hi"}}}"#;
let parsed = parse_ndjson_line(line).unwrap().unwrap();
let translated = cli_event_to_messages_stream_event(&parsed)
.expect("text_delta should translate to MessagesStreamEvent");
match translated {
MessagesStreamEvent::ContentBlockDelta { index, delta } => {
assert_eq!(index, 0);
match delta {
MessagesContentDelta::TextDelta { text } => assert_eq!(text, "hi"),
other => panic!("expected TextDelta, got {other:?}"),
}
}
other => panic!("expected ContentBlockDelta, got {other:?}"),
}
}
#[test]
fn parses_result_success_event() {
let line = r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":12,"num_turns":1,"result":"hi","total_cost_usd":0.001,"usage":{"input_tokens":4,"output_tokens":2},"session_id":"s1"}"#;
let parsed = parse_ndjson_line(line).unwrap().unwrap();
match parsed {
ClaudeCliEvent::Result {
is_error,
result,
usage,
..
} => {
assert!(!is_error);
assert_eq!(result.as_deref(), Some("hi"));
assert_eq!(usage.unwrap().output_tokens, Some(2));
}
other => panic!("expected Result, got {other:?}"),
}
}
#[test]
fn unknown_event_type_does_not_break_parser() {
let line = r#"{"type":"future_event_kind","data":{"foo":"bar"},"another":42}"#;
let parsed = parse_ndjson_line(line).unwrap().unwrap();
assert!(matches!(parsed, ClaudeCliEvent::Unknown));
}
#[test]
fn stdin_payload_skips_assistant_turns() {
let mut req = user_request("hello");
req.messages.push(MessagesMessage {
role: MessagesRole::Assistant,
content: MessagesMessageContent::Single("hi back".to_string()),
});
req.messages.push(MessagesMessage {
role: MessagesRole::User,
content: MessagesMessageContent::Single("how are you?".to_string()),
});
let payload = messages_request_to_stdin_payload(&req, Some("s1")).unwrap();
assert_eq!(payload.len(), 2);
for ev in &payload {
match ev {
ClaudeCliInputEvent::User {
message,
session_id,
} => {
assert_eq!(message.role, "user");
assert_eq!(session_id.as_deref(), Some("s1"));
}
}
}
}
#[test]
fn collect_to_messages_response_aggregates_text() {
let events = vec![
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageStart {
message: MessagesStreamMessage {
id: "msg_1".to_string(),
obj_type: "message".to_string(),
role: MessagesRole::Assistant,
content: vec![],
model: "claude-sonnet-4-6".to_string(),
stop_reason: None,
stop_sequence: None,
usage: MessagesUsage {
input_tokens: 7,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockStart {
index: 0,
content_block: MessagesContentBlock::Text {
text: String::new(),
cache_control: None,
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockDelta {
index: 0,
delta: MessagesContentDelta::TextDelta {
text: "Hello ".to_string(),
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockDelta {
index: 0,
delta: MessagesContentDelta::TextDelta {
text: "world".to_string(),
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockStop { index: 0 },
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: MessagesStopReason::EndTurn,
stop_sequence: None,
},
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 12,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageStop,
},
ClaudeCliEvent::Result {
subtype: Some("success".to_string()),
is_error: false,
duration_ms: Some(123),
num_turns: Some(1),
result: Some("Hello world".to_string()),
total_cost_usd: Some(0.001),
usage: Some(ClaudeCliUsage {
input_tokens: Some(7),
output_tokens: Some(12),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
}),
session_id: Some("s1".to_string()),
},
];
let resp = collect_to_messages_response("claude-cli/sonnet", events).unwrap();
assert_eq!(resp.id, "msg_1");
assert_eq!(resp.model, "claude-sonnet-4-6");
assert_eq!(resp.usage.input_tokens, 7);
assert_eq!(resp.usage.output_tokens, 12);
assert!(matches!(resp.stop_reason, MessagesStopReason::EndTurn));
match &resp.content[..] {
[MessagesContentBlock::Text { text, .. }] => assert_eq!(text, "Hello world"),
other => panic!("expected single Text block, got {other:?}"),
}
}
#[test]
fn collect_to_messages_response_aggregates_tool_use() {
let events = vec![
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageStart {
message: MessagesStreamMessage {
id: "msg_2".to_string(),
obj_type: "message".to_string(),
role: MessagesRole::Assistant,
content: vec![],
model: "sonnet".to_string(),
stop_reason: None,
stop_sequence: None,
usage: MessagesUsage {
input_tokens: 1,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockStart {
index: 0,
content_block: MessagesContentBlock::ToolUse {
id: "toolu_1".to_string(),
name: "get_weather".to_string(),
input: Value::Null,
cache_control: None,
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockDelta {
index: 0,
delta: MessagesContentDelta::InputJsonDelta {
partial_json: "{\"loc\":\"".to_string(),
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockDelta {
index: 0,
delta: MessagesContentDelta::InputJsonDelta {
partial_json: "SF\"}".to_string(),
},
},
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::ContentBlockStop { index: 0 },
},
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: MessagesStopReason::ToolUse,
stop_sequence: None,
},
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 5,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
},
},
ClaudeCliEvent::Result {
subtype: Some("success".to_string()),
is_error: false,
duration_ms: None,
num_turns: Some(1),
result: None,
total_cost_usd: None,
usage: None,
session_id: None,
},
];
let resp = collect_to_messages_response("sonnet", events).unwrap();
assert!(matches!(resp.stop_reason, MessagesStopReason::ToolUse));
match &resp.content[..] {
[MessagesContentBlock::ToolUse {
id, name, input, ..
}] => {
assert_eq!(id, "toolu_1");
assert_eq!(name, "get_weather");
assert_eq!(input["loc"], "SF");
}
other => panic!("expected ToolUse block, got {other:?}"),
}
}
#[test]
fn collect_to_messages_response_propagates_cli_error() {
let events = vec![ClaudeCliEvent::Result {
subtype: Some("error".to_string()),
is_error: true,
duration_ms: Some(5),
num_turns: Some(0),
result: Some("auth failed".to_string()),
total_cost_usd: None,
usage: None,
session_id: None,
}];
let err = collect_to_messages_response("sonnet", events).unwrap_err();
match err {
ClaudeCliTranslationError::CliError { message } => {
assert!(message.contains("auth failed"));
}
other => panic!("expected CliError, got {other:?}"),
}
}
#[test]
fn collect_to_messages_response_unexpected_end() {
let events: Vec<ClaudeCliEvent> = vec![ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::Ping,
}];
let err = collect_to_messages_response("sonnet", events).unwrap_err();
assert!(matches!(err, ClaudeCliTranslationError::UnexpectedEnd));
}
#[test]
fn collect_to_messages_response_uses_assistant_when_no_deltas() {
let assistant_msg = ClaudeCliAssistantMessage {
id: Some("msg_3".to_string()),
model: Some("sonnet".to_string()),
role: Some("assistant".to_string()),
content: vec![ClaudeCliContentBlock::Anthropic(
MessagesContentBlock::Text {
text: "ok".to_string(),
cache_control: None,
},
)],
stop_reason: Some("end_turn".to_string()),
stop_sequence: None,
usage: Some(ClaudeCliUsage {
input_tokens: Some(2),
output_tokens: Some(1),
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
}),
};
let events = vec![
ClaudeCliEvent::Assistant {
message: assistant_msg,
},
ClaudeCliEvent::Result {
subtype: Some("success".to_string()),
is_error: false,
duration_ms: None,
num_turns: Some(1),
result: None,
total_cost_usd: None,
usage: None,
session_id: None,
},
];
let resp = collect_to_messages_response("sonnet", events).unwrap();
assert_eq!(resp.id, "msg_3");
assert_eq!(resp.usage.input_tokens, 2);
assert_eq!(resp.usage.output_tokens, 1);
match &resp.content[..] {
[MessagesContentBlock::Text { text, .. }] => assert_eq!(text, "ok"),
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn extract_system_prompt_blocks_join_text() {
let req = MessagesRequest {
system: Some(MessagesSystemPrompt::Blocks(vec![
MessagesContentBlock::Text {
text: "line 1".to_string(),
cache_control: None,
},
MessagesContentBlock::Text {
text: "line 2".to_string(),
cache_control: None,
},
])),
..user_request("ignored")
};
assert_eq!(
extract_system_prompt(&req).as_deref(),
Some("line 1\nline 2")
);
}
#[test]
fn tool_result_content_round_trips_through_translation() {
// Sanity-check that ToolResultContent (used by future tool_result
// translation) stays linkable as the surface evolves.
use crate::apis::anthropic::ToolResultContent;
let _ = ToolResultContent::Text("noop".to_string());
}
}

View file

@ -1,5 +1,6 @@
pub mod amazon_bedrock;
pub mod anthropic;
pub mod claude_cli;
pub mod openai;
pub mod openai_responses;
pub mod streaming_shapes;

View file

@ -92,6 +92,19 @@ providers:
- mistralai/mistral-embed
- mistralai/codestral-embed
- mistralai/codestral-embed-2505
claude-cli:
# Family aliases (always resolve to the latest model in the family).
- claude-cli/sonnet
- claude-cli/opus
- claude-cli/haiku
# Dated full ids (sourced from the Claude Code model configuration article;
# refresh by re-fetching that doc whenever Anthropic ships new models).
- claude-cli/claude-opus-4-7
- claude-cli/claude-sonnet-4-6
- claude-cli/claude-opus-4-6
- claude-cli/claude-opus-4-5-20251101
- claude-cli/claude-haiku-4-5-20251001
- claude-cli/claude-sonnet-4-5-20250929
anthropic:
- anthropic/claude-sonnet-4-6
- anthropic/claude-opus-4-6

View file

@ -175,7 +175,10 @@ impl SupportedAPIsFromClient {
match self {
SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages) => {
match provider_id {
ProviderId::Anthropic | ProviderId::Vercel => {
// ClaudeCli speaks Anthropic Messages on the wire (the
// brightstaff bridge only accepts `POST /v1/messages`),
// so keep the path as-is just like the real Anthropic.
ProviderId::Anthropic | ProviderId::Vercel | ProviderId::ClaudeCli => {
build_endpoint("/v1", "/messages")
}
ProviderId::AmazonBedrock => {
@ -198,11 +201,18 @@ impl SupportedAPIsFromClient {
| ProviderId::XAI
| ProviderId::ChatGPT
| ProviderId::Vercel => route_by_provider("/responses"),
// ClaudeCli: bridge only accepts Anthropic Messages.
ProviderId::ClaudeCli => build_endpoint("/v1", "/messages"),
// All other providers: translate to /chat/completions
_ => route_by_provider("/chat/completions"),
}
}
SupportedAPIsFromClient::OpenAIChatCompletions(_) => {
// ClaudeCli: bridge only accepts Anthropic Messages, regardless
// of how the client framed the request.
if matches!(provider_id, ProviderId::ClaudeCli) {
return build_endpoint("/v1", "/messages");
}
// For Chat Completions API, use the standard chat/completions path
route_by_provider("/chat/completions")
}
@ -633,6 +643,35 @@ mod tests {
);
}
/// The brightstaff `claude-cli` bridge only accepts `POST /v1/messages`.
/// Make sure that no matter how a client framed the request, the upstream
/// path stays `/v1/messages`.
#[test]
fn test_claude_cli_endpoint_always_v1_messages() {
for client_api in [
SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages),
SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions),
SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses),
] {
for request_path in ["/v1/messages", "/v1/chat/completions", "/v1/responses"] {
assert_eq!(
client_api.target_endpoint_for_provider(
&ProviderId::ClaudeCli,
request_path,
"claude-cli/sonnet",
false,
None,
false
),
"/v1/messages",
"client_api={:?} request_path={} should map to /v1/messages",
client_api,
request_path,
);
}
}
}
#[test]
fn test_non_v1_request_paths() {
let api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions);

View file

@ -48,6 +48,11 @@ pub enum ProviderId {
DigitalOcean,
Vercel,
OpenRouter,
/// Claude Code CLI invoked as a local subprocess by brightstaff. On the
/// wire it speaks the Anthropic Messages API exactly like
/// [`ProviderId::Anthropic`]; the difference is that no Anthropic API key
/// or network call is involved — the local `claude` binary is.
ClaudeCli,
}
impl TryFrom<&str> for ProviderId {
@ -81,6 +86,7 @@ impl TryFrom<&str> for ProviderId {
"do_ai" => Ok(ProviderId::DigitalOcean), // alias
"vercel" => Ok(ProviderId::Vercel),
"openrouter" => Ok(ProviderId::OpenRouter),
"claude-cli" | "claude_cli" => Ok(ProviderId::ClaudeCli),
_ => Err(format!("Unknown provider: {}", value)),
}
}
@ -107,6 +113,7 @@ impl ProviderId {
ProviderId::Qwen => "qwen",
ProviderId::ChatGPT => "chatgpt",
ProviderId::DigitalOcean => "digitalocean",
ProviderId::ClaudeCli => "claude-cli",
_ => return Vec::new(),
};
@ -144,6 +151,14 @@ impl ProviderId {
SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions)
}
// ClaudeCli speaks the same wire protocol as Anthropic — the
// brightstaff bridge always presents itself as an Anthropic
// Messages API endpoint, so client requests in any shape get
// converted to AnthropicMessagesAPI on the way out.
(ProviderId::ClaudeCli, _) => {
SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages)
}
// Vercel AI Gateway natively supports all three API types
(ProviderId::Vercel, SupportedAPIsFromClient::AnthropicMessagesAPI(_)) => {
SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages)
@ -267,6 +282,7 @@ impl Display for ProviderId {
ProviderId::DigitalOcean => write!(f, "digitalocean"),
ProviderId::Vercel => write!(f, "vercel"),
ProviderId::OpenRouter => write!(f, "openrouter"),
ProviderId::ClaudeCli => write!(f, "claude-cli"),
}
}
}

View file

@ -0,0 +1,113 @@
//! End-to-end fixture tests for `apis::claude_cli`. Each NDJSON file under
//! `tests/fixtures/claude_cli/` represents one canned subprocess output. We
//! parse it line-by-line and feed it through the same translation entry points
//! the brightstaff bridge uses at runtime.
use std::fs;
use std::path::PathBuf;
use hermesllm::apis::anthropic::{
MessagesContentBlock, MessagesContentDelta, MessagesStopReason, MessagesStreamEvent,
};
use hermesllm::apis::claude_cli::{
cli_event_to_messages_stream_event, collect_to_messages_response, parse_ndjson_line,
ClaudeCliEvent, ClaudeCliTranslationError,
};
fn fixture_path(name: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("claude_cli")
.join(name)
}
fn load_events(name: &str) -> Vec<ClaudeCliEvent> {
let body = fs::read_to_string(fixture_path(name))
.unwrap_or_else(|e| panic!("read fixture {name}: {e}"));
body.lines()
.filter_map(|line| parse_ndjson_line(line).map(|r| r.unwrap_or_else(|e| panic!("{e}"))))
.collect()
}
#[test]
fn text_response_aggregates_into_messages_response() {
let events = load_events("text_response.ndjson");
let resp = collect_to_messages_response("claude-cli/sonnet", events.clone()).unwrap();
assert_eq!(resp.id, "msg_01ABC");
assert_eq!(resp.model, "claude-sonnet-4-6");
assert_eq!(resp.usage.input_tokens, 12);
assert_eq!(resp.usage.output_tokens, 4);
assert!(matches!(resp.stop_reason, MessagesStopReason::EndTurn));
match &resp.content[..] {
[MessagesContentBlock::Text { text, .. }] => assert_eq!(text, "Hello, world!"),
other => panic!("expected single Text, got {other:?}"),
}
// Verify the streaming projection emits exactly the events the Anthropic
// SSE wire protocol expects, in order.
let stream: Vec<MessagesStreamEvent> = events
.iter()
.filter_map(cli_event_to_messages_stream_event)
.collect();
assert!(matches!(
stream[0],
MessagesStreamEvent::MessageStart { .. }
));
let final_event = stream.last().unwrap();
assert!(matches!(final_event, MessagesStreamEvent::MessageStop));
let text_deltas: String = stream
.iter()
.filter_map(|ev| match ev {
MessagesStreamEvent::ContentBlockDelta {
delta: MessagesContentDelta::TextDelta { text },
..
} => Some(text.as_str()),
_ => None,
})
.collect();
assert_eq!(text_deltas, "Hello, world!");
}
#[test]
fn tool_use_response_assembles_partial_json() {
let events = load_events("tool_use_response.ndjson");
let resp = collect_to_messages_response("sonnet", events).unwrap();
assert!(matches!(resp.stop_reason, MessagesStopReason::ToolUse));
match &resp.content[..] {
[MessagesContentBlock::ToolUse {
id, name, input, ..
}] => {
assert_eq!(id, "toolu_W");
assert_eq!(name, "get_weather");
assert_eq!(input["city"], "Seattle");
}
other => panic!("expected single ToolUse block, got {other:?}"),
}
}
#[test]
fn error_response_returns_cli_error() {
let events = load_events("error_response.ndjson");
let err = collect_to_messages_response("sonnet", events).unwrap_err();
match err {
ClaudeCliTranslationError::CliError { message } => {
assert!(
message.contains("529"),
"expected 529 in error message, got: {message}"
);
}
other => panic!("expected CliError, got {other:?}"),
}
}
#[test]
fn retry_then_success_is_treated_as_success() {
let events = load_events("retry_then_success.ndjson");
let resp = collect_to_messages_response("sonnet", events).unwrap();
assert!(matches!(resp.stop_reason, MessagesStopReason::EndTurn));
match &resp.content[..] {
[MessagesContentBlock::Text { text, .. }] => assert_eq!(text, "ok"),
other => panic!("expected Text block, got {other:?}"),
}
}

View file

@ -0,0 +1,3 @@
{"type":"system","subtype":"init","session_id":"err-1","model":"sonnet","cwd":"/tmp","tools":[]}
{"type":"system","subtype":"api_retry","attempt":1,"reason":"529 overloaded"}
{"type":"result","subtype":"error","is_error":true,"duration_ms":1200,"num_turns":0,"result":"Anthropic API returned 529 after 3 retries","total_cost_usd":0,"session_id":"err-1"}

View file

@ -0,0 +1,10 @@
{"type":"system","subtype":"init","session_id":"retry-1","model":"sonnet","cwd":"/tmp","tools":[]}
{"type":"system","subtype":"api_retry","attempt":1,"reason":"529 overloaded"}
{"type":"system","subtype":"rate_limit_event","reset_at":"2026-05-04T18:30:00Z"}
{"type":"stream_event","event":{"type":"message_start","message":{"id":"msg_retry","type":"message","role":"assistant","content":[],"model":"sonnet","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":3,"output_tokens":0}}}}
{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"ok"}}}
{"type":"stream_event","event":{"type":"content_block_stop","index":0}}
{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":0,"output_tokens":1}}}
{"type":"stream_event","event":{"type":"message_stop"}}
{"type":"result","subtype":"success","is_error":false,"duration_ms":2100,"num_turns":1,"result":"ok","total_cost_usd":0.00009,"usage":{"input_tokens":3,"output_tokens":1},"session_id":"retry-1"}

View file

@ -0,0 +1,10 @@
{"type":"system","subtype":"init","session_id":"a1b2c3","model":"claude-sonnet-4-6","cwd":"/tmp","tools":["Bash","Read"]}
{"type":"stream_event","event":{"type":"message_start","message":{"id":"msg_01ABC","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-6","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":12,"output_tokens":0}}}}
{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":", world!"}}}
{"type":"stream_event","event":{"type":"content_block_stop","index":0}}
{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":0,"output_tokens":4}}}
{"type":"stream_event","event":{"type":"message_stop"}}
{"type":"assistant","message":{"id":"msg_01ABC","type":"message","role":"assistant","model":"claude-sonnet-4-6","content":[{"type":"text","text":"Hello, world!"}],"stop_reason":"end_turn","stop_sequence":null,"usage":{"input_tokens":12,"output_tokens":4}}}
{"type":"result","subtype":"success","is_error":false,"duration_ms":521,"num_turns":1,"result":"Hello, world!","total_cost_usd":0.00012,"usage":{"input_tokens":12,"output_tokens":4},"session_id":"a1b2c3"}

View file

@ -0,0 +1,9 @@
{"type":"system","subtype":"init","session_id":"tool-1","model":"sonnet","cwd":"/tmp","tools":[]}
{"type":"stream_event","event":{"type":"message_start","message":{"id":"msg_tool","type":"message","role":"assistant","content":[],"model":"sonnet","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":20,"output_tokens":0}}}}
{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"toolu_W","name":"get_weather","input":{}}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"city\":\""}}}
{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"Seattle\"}"}}}
{"type":"stream_event","event":{"type":"content_block_stop","index":0}}
{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"tool_use","stop_sequence":null},"usage":{"input_tokens":0,"output_tokens":7}}}
{"type":"stream_event","event":{"type":"message_stop"}}
{"type":"result","subtype":"success","is_error":false,"duration_ms":701,"num_turns":1,"result":null,"total_cost_usd":0.00021,"usage":{"input_tokens":20,"output_tokens":7},"session_id":"tool-1"}

View file

@ -0,0 +1,49 @@
# Claude Code CLI as a Plano provider
This demo wires the locally installed `claude` binary as a Plano
`model_provider`. The single line under `model_providers:`
```yaml
model_providers:
- model: claude-cli/*
default: true
```
is enough to:
1. Auto-fill `provider_interface: claude-cli`, `base_url: http://127.0.0.1:14001`
and a placeholder `access_key` (the CLI uses its own login keychain).
2. Start a localhost bridge inside `brightstaff` that spawns `claude -p
--output-format stream-json --input-format stream-json` for each
conversation.
3. Expose every Claude Code model — `claude-cli/sonnet`, `claude-cli/opus`,
`claude-cli/haiku`, plus dated full ids — at `GET /v1/models`.
## Running
```bash
# Make sure the CLI is logged in. You can use API krey billing or a paid Claude subscription.
claude auth login
# Start Plano in native mode.
planoai up demos/integrations/claude_cli/config.yaml
```
Then point any OpenAI- or Anthropic-style client at `http://localhost:12000`
and pick any `claude-cli/...` model. Plano routes the request through Envoy
to the brightstaff bridge, which asks the local `claude` binary to handle
it.
## Optional overrides
Set these env vars before `planoai up` if you need to tweak the bridge:
| Env var | Default | Meaning |
| ----------------------------- | ------------------- | -------------------------------------- |
| `CLAUDE_CLI_BIN` | `claude` | Path to the CLI binary. |
| `CLAUDE_CLI_PERMISSION_MODE` | `bypassPermissions` | `--permission-mode` flag value. |
| `CLAUDE_CLI_LISTEN_ADDR` | `127.0.0.1:14001` | Bridge listen address. |
| `CLAUDE_CLI_SESSION_TTL_SECS` | `600` | Idle TTL before a child is killed. |
| `CLAUDE_CLI_WATCHDOG_SECS` | `120` | Per-line watchdog inside one CLI turn. |
| `CLAUDE_CLI_MAX_SESSIONS` | `64` | Hard cap on concurrent CLI children. |

View file

@ -0,0 +1,29 @@
version: v0.4.0
# Claude Code CLI as a Plano model_provider.
#
# The single line below is everything you need: Plano detects the
# `claude-cli/*` namespace, auto-fills the provider_interface, base_url and
# placeholder access_key, and starts a localhost bridge inside brightstaff
# that shells out to the `claude` binary on your $PATH for each request.
#
# Requirements:
# - `claude --version` must work in the same shell as `planoai up`.
# - Auth happens via the CLI's own `claude auth login` (no API key needed
# in Plano).
#
# Optional overrides via env (set before `planoai up`):
# CLAUDE_CLI_BIN=/custom/path/to/claude
# CLAUDE_CLI_PERMISSION_MODE=default
# CLAUDE_CLI_LISTEN_ADDR=127.0.0.1:14001
listeners:
- type: model
name: model_listener
port: 12000
model_providers:
- model: claude-cli/*
tracing:
random_sampling: 100

View file

@ -63,4 +63,5 @@ Built by contributors to the widely adopted `Envoy Proxy <https://www.envoyproxy
resources/deployment
resources/configuration_reference
resources/cli_reference
resources/local_agent_providers
resources/llms_txt

View file

@ -0,0 +1,184 @@
.. _local-agent-providers:
Local-Agent Providers
=====================
Plano draws a hard line between two very different kinds of "providers"
that can sit behind a ``model_providers`` entry:
1. **Network LLM providers**``openai``, ``anthropic``, ``gemini``,
``vercel``, ``openrouter``, ``mistral``, ``groq``, ``digitalocean``,
``together_ai``, etc. These are stateless HTTPS APIs. The trust
boundary is the network call: Plano forwards the request to the
provider's server, the provider does whatever it does, and the
response comes back. The host never executes provider code.
2. **Local-agent providers** — currently ``claude-cli`` (and, by design,
any future ``codex-cli`` / ``chatgpt-cli`` / ``opencode`` /
``hermes`` integration). These are not LLMs; they are *agent
integrations*. Plano implements them as a localhost bridge inside
``brightstaff`` that **spawns a local CLI binary as a subprocess**
for every request and pipes the conversation through it.
These two classes of provider have fundamentally different security
properties, and conflating them in production is the kind of mistake
that turns into a postmortem. This page exists so the boundary is
explicit.
Why ``planoai up`` warns about them
-----------------------------------
When ``planoai up`` loads a config that contains a local-agent provider
(matched on ``provider_interface`` or on a ``<interface>/...`` prefix in
``model:``/``name:``), it prints a single warning panel listing the
triggering entries and refusing to proceed silently until the operator
acknowledges. This is intentional. The warning fires exactly once per
``planoai up`` run, regardless of how many local-agent entries the
config has.
Trust model
-----------
Spawning a local CLI binary as the operator's user is a very different
thing from making an HTTPS call. The subprocess inherits everything the
operator can do:
.. list-table::
:header-rows: 1
:widths: 30 35 35
* - Capability
- Network LLM provider
- Local-agent provider
* - Filesystem read
- No
- **Yes** — anything ``$USER`` can read
* - Filesystem write
- No
- **Yes** — anything ``$USER`` can write
* - Shell command execution
- No
- **Yes** — full shell as ``$USER``
* - Auth / credentials
- Per-provider API key
- **Host login keychain** (no per-tenant isolation)
* - Outbound network
- To the provider only
- **Anywhere the host can reach**
* - Reproducibility
- Deterministic given inputs
- Depends on local FS, env, CWD, installed tools
* - Suitable for production
- Yes
- **No — local development only**
Concretely, when a request hits a ``claude-cli/*`` model, brightstaff
runs (roughly):
.. code-block:: bash
claude -p --output-format stream-json --input-format stream-json \
--permission-mode bypassPermissions ...
Whatever Claude Code decides to do with the working directory, the
shell, ``rm``, ``git``, your SSH keys, your ``~/.aws/credentials``, your
production database connection strings — all of that is reachable. This
is the *correct* trust model for a single-developer workstation; it is
the *wrong* trust model for anything multi-tenant.
Local-agent providers are in the same category as standalone agent
runtimes like `OpenClaw`_, `OpenCode`_, and `Hermes`_: they are agent
integrations that happen to expose an LLM-shaped HTTP API, not
LLM providers that happen to run locally.
.. _OpenClaw: https://github.com/openclaw/openclaw
.. _OpenCode: https://github.com/sst/opencode
.. _Hermes: https://github.com/HermesAI/hermes
Recommended setup
-----------------
If you are using a local-agent provider, treat it like any other
developer-machine agent runtime:
- **Bind to loopback only.** Do not expose the bridge or the Plano
listener to a network interface. ``127.0.0.1`` only.
- **Single-developer use.** One operator, one host. Do not put a
load balancer in front of it. Do not share the deployment.
- **Opt-in.** Don't add a local-agent provider to a config that other
people deploy. Keep it in a config file that's clearly scoped to one
workstation.
- **Don't run as root** and don't run inside a container that mounts
more of the host filesystem than necessary. The subprocess inherits
the launching process's capabilities verbatim.
- **Audit the spawned binary** the same way you would audit anything
with ``sudo`` access. If the operator's ``claude`` (or future
``codex``) binary is compromised, so is the host.
Dismissing the warning
----------------------
The warning is dismissable per-host. The recommended path is the CLI
flag:
.. code-block:: bash
planoai up --ack-local-agents
That writes an ack file at ``~/.plano/state/local_agent_ack.json``
containing every triggering provider interface and the timestamp. On
subsequent ``planoai up`` runs, the warning is suppressed silently as
long as the ack covers every local-agent interface in the config.
If you prefer an environment variable (e.g. inside a personal
``direnv`` setup), set ``PLANO_ACK_LOCAL_AGENTS=1`` instead. Truthy
values are ``1``, ``true``, ``yes``, ``on`` (case-insensitive). Setting
the env var has the same effect as passing the flag — it writes the
ack file.
If a *new* local-agent interface appears later (e.g. you add a
hypothetical ``codex-cli/*`` after acknowledging ``claude-cli/*``), the
warning re-fires for the un-acked interface only.
Undoing the dismissal
~~~~~~~~~~~~~~~~~~~~~
To undo the dismissal — for example, when handing the host to another
developer or running through a security review — simply remove the
file:
.. code-block:: bash
rm ~/.plano/state/local_agent_ack.json
The next ``planoai up`` run will print the full warning panel again.
Adding a new local-agent provider type
--------------------------------------
The set of local-agent provider interfaces lives in
``cli/planoai/local_agent_warning.py`` as
``LOCAL_AGENT_PROVIDER_INTERFACES``. Adding a new entry — say, a future
``codex-cli`` bridge that spawns the OpenAI Codex CLI — is a one-line
change:
.. code-block:: python
LOCAL_AGENT_PROVIDER_INTERFACES = ("claude-cli", "codex-cli")
Detection automatically covers ``provider_interface: codex-cli`` as
well as ``model: codex-cli/...`` and ``name: codex-cli/...``, so users
who rely on the Python-side autofill for short-form configs are still
warned.
.. note::
At the time of writing, the only network ``provider_interface`` that
shares any naming with a local agent runtime is ``chatgpt`` — but
that is a stateless HTTPS provider against
``https://chatgpt.com/backend-api/codex``, **not** a local CLI
bridge. It is correctly excluded from
``LOCAL_AGENT_PROVIDER_INTERFACES``. The ``codex`` value accepted by
``planoai cli_agent codex`` is a *client* helper that points the
Codex CLI at a running Plano listener; it does not introduce a
provider into the config.