From 7b87c50de4408a797304f8e7aa983e074febd820 Mon Sep 17 00:00:00 2001 From: Spherrrical Date: Tue, 14 Apr 2026 12:32:07 -0700 Subject: [PATCH 1/7] Add ChatGPT token watchdog for seamless long-lived sessions --- cli/planoai/chatgpt_watchdog.py | 211 ++++++++++++++++++++++++++++++++ cli/planoai/consts.py | 2 + cli/planoai/native_runner.py | 41 ++++++- 3 files changed, 253 insertions(+), 1 deletion(-) create mode 100644 cli/planoai/chatgpt_watchdog.py diff --git a/cli/planoai/chatgpt_watchdog.py b/cli/planoai/chatgpt_watchdog.py new file mode 100644 index 00000000..ada42da0 --- /dev/null +++ b/cli/planoai/chatgpt_watchdog.py @@ -0,0 +1,211 @@ +""" +Background daemon that monitors ChatGPT OAuth token expiry and restarts +Plano processes with a fresh token before the old one expires. + +The watchdog is spawned by start_native() when ChatGPT providers are present. +It runs as a fully daemonized process (double-fork) and exits after triggering +a restart (a fresh watchdog is spawned by the new start_native() call). +""" + +import json +import os +import time +from typing import Optional + +from planoai.consts import ( + NATIVE_PID_FILE, + PLANO_RUN_DIR, + PLANO_WATCHDOG_LOG_FILE, + PLANO_WATCHDOG_STATE_FILE, +) + +# Wake up this many seconds before token expiry to refresh +WATCHDOG_REFRESH_LEAD_SECONDS = 5 * 60 # 5 minutes + +# How often the watchdog polls for expiry (in seconds) +WATCHDOG_POLL_INTERVAL_SECONDS = 30 + +# Env var sentinel: if set, spawn_watchdog() is a no-op (prevents recursive spawning) +_NO_WATCHDOG_ENV_VAR = "PLANO_NO_WATCHDOG" + + +def _log(msg: str) -> None: + print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] watchdog: {msg}", flush=True) + + +def _seconds_until_expiry() -> Optional[float]: + """Return seconds until the ChatGPT access token expires, or None if unknown.""" + try: + from planoai.chatgpt_auth import load_auth + + auth = load_auth() + if not auth: + return None + expires_at = auth.get("expires_at") + if not expires_at: + return None + return float(expires_at) - time.time() + except Exception: + return None + + +def _stop_services(skip_pids: set) -> None: + """Stop envoy and brightstaff without killing the watchdog (self).""" + from planoai.native_runner import stop_native + + stop_native(skip_pids=skip_pids) + + +def _do_restart(plano_config_file: str) -> bool: + """ + Refresh the ChatGPT token, stop Envoy+brightstaff, restart with the new token. + Returns True on success, False if the refresh failed. + """ + # 1. Load saved state (env dict + metadata) + if not os.path.exists(PLANO_WATCHDOG_STATE_FILE): + _log("Watchdog state file missing — cannot restart services") + return False + with open(PLANO_WATCHDOG_STATE_FILE) as f: + state = json.load(f) + env = state["env"] + with_tracing = state.get("with_tracing", False) + + # 2. Refresh the token + try: + from planoai.chatgpt_auth import get_access_token + + access_token, account_id = get_access_token() + except Exception as exc: + _log( + f"Token refresh failed: {exc} — " + "run 'planoai chatgpt login' to re-authenticate" + ) + return False + + env["CHATGPT_ACCESS_TOKEN"] = access_token + if account_id: + env["CHATGPT_ACCOUNT_ID"] = account_id + + # 3. Stop envoy + brightstaff (skip self so we don't self-terminate) + _stop_services(skip_pids={os.getpid()}) + + # 4. Unset the sentinel so start_native() spawns a fresh watchdog + os.environ.pop(_NO_WATCHDOG_ENV_VAR, None) + + # 5. Restart with the fresh token; this also spawns the next watchdog + from planoai.native_runner import start_native + + start_native( + plano_config_file, + env, + with_tracing=with_tracing, + spawn_watchdog=True, + ) + return True + + +def _watchdog_main(plano_config_file: str) -> None: + """Main loop running inside the watchdog daemon process.""" + _log(f"Watchdog started (PID {os.getpid()})") + + while True: + time.sleep(WATCHDOG_POLL_INTERVAL_SECONDS) + + secs = _seconds_until_expiry() + if secs is None: + _log("Cannot read token expiry — will retry next cycle") + continue + + if secs > WATCHDOG_REFRESH_LEAD_SECONDS: + continue # Token still healthy + + _log(f"Token expires in {secs:.0f}s — refreshing and restarting services") + success = _do_restart(plano_config_file) + if not success: + _log( + "Restart failed — exiting watchdog. " + "Services will continue until the token expires, " + "then requests will fail. Run 'planoai chatgpt login' to fix." + ) + # Either _do_restart spawned a new watchdog, or it failed. + # Either way, this watchdog's job is done. + return + + +def spawn_watchdog(plano_config_file: str) -> int: + """ + Spawn a background watchdog daemon to monitor ChatGPT token expiry. + + Called from start_native() after services are healthy. Returns the watchdog + daemon PID, or 0 if no watchdog was spawned (no ChatGPT providers, or + recursive spawn was prevented by _NO_WATCHDOG_ENV_VAR). + """ + # Prevent recursive spawning (watchdog calls start_native which calls us) + if os.environ.get(_NO_WATCHDOG_ENV_VAR): + return 0 + + # Only spawn if the config has ChatGPT providers + try: + import yaml + + with open(plano_config_file) as f: + config = yaml.safe_load(f) + providers = config.get("model_providers") or config.get("llm_providers") or [] + has_chatgpt = any( + str(p.get("model", "")).startswith("chatgpt/") for p in providers + ) + if not has_chatgpt: + return 0 + except Exception: + return 0 + + os.makedirs(PLANO_RUN_DIR, exist_ok=True) + log_fd = os.open( + PLANO_WATCHDOG_LOG_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644 + ) + + # Double-fork to daemonize (mirrors _daemon_exec in native_runner.py) + pid = os.fork() + if pid > 0: + # Parent: close log fd, wait for first child, read back grandchild PID + os.close(log_fd) + os.waitpid(pid, 0) + grandchild_pid_path = os.path.join(PLANO_RUN_DIR, f".daemon_pid_{pid}") + deadline = time.time() + 5 + while time.time() < deadline: + if os.path.exists(grandchild_pid_path): + with open(grandchild_pid_path) as f: + grandchild_pid = int(f.read().strip()) + os.unlink(grandchild_pid_path) + return grandchild_pid + time.sleep(0.05) + os.close(log_fd) if False else None # already closed above + return 0 # Timed out — watchdog did not start + + # First child: create new session, fork again + os.setsid() + grandchild_pid = os.fork() + if grandchild_pid > 0: + # Intermediate child: write grandchild PID and exit + pid_path = os.path.join(PLANO_RUN_DIR, f".daemon_pid_{os.getpid()}") + with open(pid_path, "w") as f: + f.write(str(grandchild_pid)) + os._exit(0) + + # Grandchild: the actual daemon + os.dup2(log_fd, 1) # stdout -> watchdog log + os.dup2(log_fd, 2) # stderr -> watchdog log + os.close(log_fd) + devnull = os.open(os.devnull, os.O_RDONLY) + os.dup2(devnull, 0) + os.close(devnull) + + # Set sentinel so any start_native() we call doesn't spawn another watchdog + os.environ[_NO_WATCHDOG_ENV_VAR] = "1" + + try: + _watchdog_main(plano_config_file) + except Exception as exc: + _log(f"Watchdog crashed: {exc}") + finally: + os._exit(0) diff --git a/cli/planoai/consts.py b/cli/planoai/consts.py index 5cafb817..97e08d7b 100644 --- a/cli/planoai/consts.py +++ b/cli/planoai/consts.py @@ -15,6 +15,8 @@ PLANO_BIN_DIR = os.path.join(PLANO_HOME, "bin") PLANO_PLUGINS_DIR = os.path.join(PLANO_HOME, "plugins") ENVOY_VERSION = "v1.37.0" # keep in sync with Dockerfile ARG ENVOY_VERSION NATIVE_PID_FILE = os.path.join(PLANO_RUN_DIR, "plano.pid") +PLANO_WATCHDOG_STATE_FILE = os.path.join(PLANO_RUN_DIR, "watchdog_state.json") +PLANO_WATCHDOG_LOG_FILE = os.path.join(PLANO_RUN_DIR, "watchdog.log") DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317" PLANO_GITHUB_REPO = "katanemo/archgw" diff --git a/cli/planoai/native_runner.py b/cli/planoai/native_runner.py index 1b58b36d..0a92c0d1 100644 --- a/cli/planoai/native_runner.py +++ b/cli/planoai/native_runner.py @@ -11,6 +11,8 @@ from collections.abc import Callable from planoai.consts import ( NATIVE_PID_FILE, PLANO_RUN_DIR, + PLANO_WATCHDOG_LOG_FILE, + PLANO_WATCHDOG_STATE_FILE, ) from planoai.docker_cli import health_check_endpoint from planoai.native_binaries import ( @@ -164,6 +166,7 @@ def start_native( foreground=False, with_tracing=False, progress_callback: Callable[[str], None] | None = None, + spawn_watchdog: bool = True, ): """Start Envoy and brightstaff natively.""" from planoai.core import _get_gateway_ports @@ -224,13 +227,14 @@ def start_native( if progress_callback: progress_callback(f"Started envoy (PID: {envoy_pid})...") - # Save PIDs + # Save PIDs (watchdog_pid filled in after health check) os.makedirs(PLANO_RUN_DIR, exist_ok=True) with open(NATIVE_PID_FILE, "w") as f: json.dump( { "envoy_pid": envoy_pid, "brightstaff_pid": brightstaff_pid, + "watchdog_pid": None, }, f, ) @@ -254,6 +258,39 @@ def start_native( for port in gateway_ports: log.info(f" http://localhost:{port}") + # Save watchdog state (env + metadata) for use during token refresh restarts + state_fd = os.open( + PLANO_WATCHDOG_STATE_FILE, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, + 0o600, + ) + with os.fdopen(state_fd, "w") as _f: + json.dump({"env": env, "with_tracing": with_tracing}, _f) + + # Spawn background token watchdog for ChatGPT providers + watchdog_pid = 0 + if spawn_watchdog: + from planoai.chatgpt_watchdog import ( + spawn_watchdog as _spawn_watchdog, + ) + + watchdog_pid = _spawn_watchdog(plano_config_file) + if watchdog_pid: + log.info(f"Started ChatGPT token watchdog (PID {watchdog_pid})") + if progress_callback: + progress_callback( + f"Started token watchdog (PID: {watchdog_pid})..." + ) + with open(NATIVE_PID_FILE, "w") as f: + json.dump( + { + "envoy_pid": envoy_pid, + "brightstaff_pid": brightstaff_pid, + "watchdog_pid": watchdog_pid, + }, + f, + ) + break # Check if processes are still alive @@ -387,11 +424,13 @@ def stop_native(skip_pids: set | None = None): envoy_pid = pids.get("envoy_pid") brightstaff_pid = pids.get("brightstaff_pid") + watchdog_pid = pids.get("watchdog_pid") had_running_process = False for name, pid in [ ("envoy", envoy_pid), ("brightstaff", brightstaff_pid), + ("watchdog", watchdog_pid), ]: if skip_pids and pid in skip_pids: continue From 0f19fe30c299a432d96d2ed57f954b3326e5aa40 Mon Sep 17 00:00:00 2001 From: Spherrrical Date: Tue, 21 Apr 2026 17:17:47 -0700 Subject: [PATCH 2/7] Replace ChatGPT watchdog/restart with passthrough_auth --- cli/planoai/chatgpt_watchdog.py | 211 -------------------------------- cli/planoai/consts.py | 2 - cli/planoai/native_runner.py | 41 +------ 3 files changed, 1 insertion(+), 253 deletions(-) delete mode 100644 cli/planoai/chatgpt_watchdog.py diff --git a/cli/planoai/chatgpt_watchdog.py b/cli/planoai/chatgpt_watchdog.py deleted file mode 100644 index ada42da0..00000000 --- a/cli/planoai/chatgpt_watchdog.py +++ /dev/null @@ -1,211 +0,0 @@ -""" -Background daemon that monitors ChatGPT OAuth token expiry and restarts -Plano processes with a fresh token before the old one expires. - -The watchdog is spawned by start_native() when ChatGPT providers are present. -It runs as a fully daemonized process (double-fork) and exits after triggering -a restart (a fresh watchdog is spawned by the new start_native() call). -""" - -import json -import os -import time -from typing import Optional - -from planoai.consts import ( - NATIVE_PID_FILE, - PLANO_RUN_DIR, - PLANO_WATCHDOG_LOG_FILE, - PLANO_WATCHDOG_STATE_FILE, -) - -# Wake up this many seconds before token expiry to refresh -WATCHDOG_REFRESH_LEAD_SECONDS = 5 * 60 # 5 minutes - -# How often the watchdog polls for expiry (in seconds) -WATCHDOG_POLL_INTERVAL_SECONDS = 30 - -# Env var sentinel: if set, spawn_watchdog() is a no-op (prevents recursive spawning) -_NO_WATCHDOG_ENV_VAR = "PLANO_NO_WATCHDOG" - - -def _log(msg: str) -> None: - print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] watchdog: {msg}", flush=True) - - -def _seconds_until_expiry() -> Optional[float]: - """Return seconds until the ChatGPT access token expires, or None if unknown.""" - try: - from planoai.chatgpt_auth import load_auth - - auth = load_auth() - if not auth: - return None - expires_at = auth.get("expires_at") - if not expires_at: - return None - return float(expires_at) - time.time() - except Exception: - return None - - -def _stop_services(skip_pids: set) -> None: - """Stop envoy and brightstaff without killing the watchdog (self).""" - from planoai.native_runner import stop_native - - stop_native(skip_pids=skip_pids) - - -def _do_restart(plano_config_file: str) -> bool: - """ - Refresh the ChatGPT token, stop Envoy+brightstaff, restart with the new token. - Returns True on success, False if the refresh failed. - """ - # 1. Load saved state (env dict + metadata) - if not os.path.exists(PLANO_WATCHDOG_STATE_FILE): - _log("Watchdog state file missing — cannot restart services") - return False - with open(PLANO_WATCHDOG_STATE_FILE) as f: - state = json.load(f) - env = state["env"] - with_tracing = state.get("with_tracing", False) - - # 2. Refresh the token - try: - from planoai.chatgpt_auth import get_access_token - - access_token, account_id = get_access_token() - except Exception as exc: - _log( - f"Token refresh failed: {exc} — " - "run 'planoai chatgpt login' to re-authenticate" - ) - return False - - env["CHATGPT_ACCESS_TOKEN"] = access_token - if account_id: - env["CHATGPT_ACCOUNT_ID"] = account_id - - # 3. Stop envoy + brightstaff (skip self so we don't self-terminate) - _stop_services(skip_pids={os.getpid()}) - - # 4. Unset the sentinel so start_native() spawns a fresh watchdog - os.environ.pop(_NO_WATCHDOG_ENV_VAR, None) - - # 5. Restart with the fresh token; this also spawns the next watchdog - from planoai.native_runner import start_native - - start_native( - plano_config_file, - env, - with_tracing=with_tracing, - spawn_watchdog=True, - ) - return True - - -def _watchdog_main(plano_config_file: str) -> None: - """Main loop running inside the watchdog daemon process.""" - _log(f"Watchdog started (PID {os.getpid()})") - - while True: - time.sleep(WATCHDOG_POLL_INTERVAL_SECONDS) - - secs = _seconds_until_expiry() - if secs is None: - _log("Cannot read token expiry — will retry next cycle") - continue - - if secs > WATCHDOG_REFRESH_LEAD_SECONDS: - continue # Token still healthy - - _log(f"Token expires in {secs:.0f}s — refreshing and restarting services") - success = _do_restart(plano_config_file) - if not success: - _log( - "Restart failed — exiting watchdog. " - "Services will continue until the token expires, " - "then requests will fail. Run 'planoai chatgpt login' to fix." - ) - # Either _do_restart spawned a new watchdog, or it failed. - # Either way, this watchdog's job is done. - return - - -def spawn_watchdog(plano_config_file: str) -> int: - """ - Spawn a background watchdog daemon to monitor ChatGPT token expiry. - - Called from start_native() after services are healthy. Returns the watchdog - daemon PID, or 0 if no watchdog was spawned (no ChatGPT providers, or - recursive spawn was prevented by _NO_WATCHDOG_ENV_VAR). - """ - # Prevent recursive spawning (watchdog calls start_native which calls us) - if os.environ.get(_NO_WATCHDOG_ENV_VAR): - return 0 - - # Only spawn if the config has ChatGPT providers - try: - import yaml - - with open(plano_config_file) as f: - config = yaml.safe_load(f) - providers = config.get("model_providers") or config.get("llm_providers") or [] - has_chatgpt = any( - str(p.get("model", "")).startswith("chatgpt/") for p in providers - ) - if not has_chatgpt: - return 0 - except Exception: - return 0 - - os.makedirs(PLANO_RUN_DIR, exist_ok=True) - log_fd = os.open( - PLANO_WATCHDOG_LOG_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644 - ) - - # Double-fork to daemonize (mirrors _daemon_exec in native_runner.py) - pid = os.fork() - if pid > 0: - # Parent: close log fd, wait for first child, read back grandchild PID - os.close(log_fd) - os.waitpid(pid, 0) - grandchild_pid_path = os.path.join(PLANO_RUN_DIR, f".daemon_pid_{pid}") - deadline = time.time() + 5 - while time.time() < deadline: - if os.path.exists(grandchild_pid_path): - with open(grandchild_pid_path) as f: - grandchild_pid = int(f.read().strip()) - os.unlink(grandchild_pid_path) - return grandchild_pid - time.sleep(0.05) - os.close(log_fd) if False else None # already closed above - return 0 # Timed out — watchdog did not start - - # First child: create new session, fork again - os.setsid() - grandchild_pid = os.fork() - if grandchild_pid > 0: - # Intermediate child: write grandchild PID and exit - pid_path = os.path.join(PLANO_RUN_DIR, f".daemon_pid_{os.getpid()}") - with open(pid_path, "w") as f: - f.write(str(grandchild_pid)) - os._exit(0) - - # Grandchild: the actual daemon - os.dup2(log_fd, 1) # stdout -> watchdog log - os.dup2(log_fd, 2) # stderr -> watchdog log - os.close(log_fd) - devnull = os.open(os.devnull, os.O_RDONLY) - os.dup2(devnull, 0) - os.close(devnull) - - # Set sentinel so any start_native() we call doesn't spawn another watchdog - os.environ[_NO_WATCHDOG_ENV_VAR] = "1" - - try: - _watchdog_main(plano_config_file) - except Exception as exc: - _log(f"Watchdog crashed: {exc}") - finally: - os._exit(0) diff --git a/cli/planoai/consts.py b/cli/planoai/consts.py index 97e08d7b..5cafb817 100644 --- a/cli/planoai/consts.py +++ b/cli/planoai/consts.py @@ -15,8 +15,6 @@ PLANO_BIN_DIR = os.path.join(PLANO_HOME, "bin") PLANO_PLUGINS_DIR = os.path.join(PLANO_HOME, "plugins") ENVOY_VERSION = "v1.37.0" # keep in sync with Dockerfile ARG ENVOY_VERSION NATIVE_PID_FILE = os.path.join(PLANO_RUN_DIR, "plano.pid") -PLANO_WATCHDOG_STATE_FILE = os.path.join(PLANO_RUN_DIR, "watchdog_state.json") -PLANO_WATCHDOG_LOG_FILE = os.path.join(PLANO_RUN_DIR, "watchdog.log") DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT = "http://localhost:4317" PLANO_GITHUB_REPO = "katanemo/archgw" diff --git a/cli/planoai/native_runner.py b/cli/planoai/native_runner.py index 0a92c0d1..1b58b36d 100644 --- a/cli/planoai/native_runner.py +++ b/cli/planoai/native_runner.py @@ -11,8 +11,6 @@ from collections.abc import Callable from planoai.consts import ( NATIVE_PID_FILE, PLANO_RUN_DIR, - PLANO_WATCHDOG_LOG_FILE, - PLANO_WATCHDOG_STATE_FILE, ) from planoai.docker_cli import health_check_endpoint from planoai.native_binaries import ( @@ -166,7 +164,6 @@ def start_native( foreground=False, with_tracing=False, progress_callback: Callable[[str], None] | None = None, - spawn_watchdog: bool = True, ): """Start Envoy and brightstaff natively.""" from planoai.core import _get_gateway_ports @@ -227,14 +224,13 @@ def start_native( if progress_callback: progress_callback(f"Started envoy (PID: {envoy_pid})...") - # Save PIDs (watchdog_pid filled in after health check) + # Save PIDs os.makedirs(PLANO_RUN_DIR, exist_ok=True) with open(NATIVE_PID_FILE, "w") as f: json.dump( { "envoy_pid": envoy_pid, "brightstaff_pid": brightstaff_pid, - "watchdog_pid": None, }, f, ) @@ -258,39 +254,6 @@ def start_native( for port in gateway_ports: log.info(f" http://localhost:{port}") - # Save watchdog state (env + metadata) for use during token refresh restarts - state_fd = os.open( - PLANO_WATCHDOG_STATE_FILE, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, - 0o600, - ) - with os.fdopen(state_fd, "w") as _f: - json.dump({"env": env, "with_tracing": with_tracing}, _f) - - # Spawn background token watchdog for ChatGPT providers - watchdog_pid = 0 - if spawn_watchdog: - from planoai.chatgpt_watchdog import ( - spawn_watchdog as _spawn_watchdog, - ) - - watchdog_pid = _spawn_watchdog(plano_config_file) - if watchdog_pid: - log.info(f"Started ChatGPT token watchdog (PID {watchdog_pid})") - if progress_callback: - progress_callback( - f"Started token watchdog (PID: {watchdog_pid})..." - ) - with open(NATIVE_PID_FILE, "w") as f: - json.dump( - { - "envoy_pid": envoy_pid, - "brightstaff_pid": brightstaff_pid, - "watchdog_pid": watchdog_pid, - }, - f, - ) - break # Check if processes are still alive @@ -424,13 +387,11 @@ def stop_native(skip_pids: set | None = None): envoy_pid = pids.get("envoy_pid") brightstaff_pid = pids.get("brightstaff_pid") - watchdog_pid = pids.get("watchdog_pid") had_running_process = False for name, pid in [ ("envoy", envoy_pid), ("brightstaff", brightstaff_pid), - ("watchdog", watchdog_pid), ]: if skip_pids and pid in skip_pids: continue From 843903c8bc13e9cce68eb79a15ec418520e49663 Mon Sep 17 00:00:00 2001 From: Tom Stoffer Date: Tue, 14 Apr 2026 23:56:18 +1200 Subject: [PATCH 3/7] Updating Contributing.md to include helpful build steps --- CONTRIBUTING.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9f9b0b9c..d6d9ad74 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -100,6 +100,16 @@ For library tests only: cargo test --lib ``` +**Build Rust artifacts with the correct targets:** + +```bash +cd crates +cargo build --release --target wasm32-wasip1 -p llm_gateway -p prompt_gateway +cargo build --release -p brightstaff -p hermesllm -p common +``` + +Do not run a blanket workspace-native build such as `cargo build --release` from `crates/`. The `llm_gateway` and `prompt_gateway` crates are Proxy-WASM `cdylib`s and must be built for `wasm32-wasip1`, while `brightstaff`, `hermesllm`, and `common` build natively. + **Run Python CLI tests:** ```bash From d8588521fa05f920811112413b93cb76ed14dd54 Mon Sep 17 00:00:00 2001 From: Tom Stoffer Date: Wed, 15 Apr 2026 20:32:17 +1200 Subject: [PATCH 4/7] Implemented request adaptor for ChatGPT codex subscription endpoints which do not match the standard openai ones. Made-with: Cursor --- crates/brightstaff/src/handlers/llm/mod.rs | 21 +- crates/hermesllm/src/apis/openai_responses.rs | 79 +++- crates/hermesllm/src/lib.rs | 1 + crates/hermesllm/src/providers/mod.rs | 2 + crates/hermesllm/src/providers/request.rs | 48 +++ .../src/providers/request_adapter.rs | 407 ++++++++++++++++++ .../src/transforms/request/from_openai.rs | 39 +- crates/llm_gateway/src/stream_context.rs | 7 +- 8 files changed, 574 insertions(+), 30 deletions(-) create mode 100644 crates/hermesllm/src/providers/request_adapter.rs diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 3336209f..505eb156 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -5,7 +5,7 @@ use common::llm_providers::LlmProviders; use hermesllm::apis::openai::Message; use hermesllm::apis::openai_responses::InputParam; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; -use hermesllm::{ProviderRequest, ProviderRequestType}; +use hermesllm::{serialize_for_upstream, ProviderRequest, ProviderRequestType}; use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::header::{self}; @@ -283,16 +283,15 @@ async fn llm_chat_inner( }; // Serialize request for upstream BEFORE router consumes it - let client_request_bytes_for_upstream: Bytes = - match ProviderRequestType::to_bytes(&client_request) { - Ok(bytes) => bytes.into(), - Err(err) => { - warn!(error = %err, "failed to serialize request for upstream"); - let mut r = Response::new(full(format!("Failed to serialize request: {}", err))); - *r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Ok(r); - } - }; + let client_request_bytes_for_upstream: Bytes = match serialize_for_upstream(&client_request, provider_id) { + Ok(bytes) => bytes.into(), + Err(err) => { + warn!(error = %err, "failed to serialize request for upstream"); + let mut r = Response::new(full(format!("Failed to serialize request: {}", err))); + *r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + return Ok(r); + } + }; // --- Phase 3: Route the request (or use pinned model from session cache) --- let resolved_model = if let Some(cached_model) = pinned_model { diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 92d362b2..2fa0c505 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -280,16 +280,31 @@ pub struct ConversationParam { pub id: Option, } -/// Tool definitions +/// Tool definitions. +/// +/// Supports both the canonical OpenAI Responses flat tool shape: +/// { "type": "function", "name": "...", "description": "...", "parameters": {...} } +/// and the nested chat-completions-compatible shape: +/// { "type": "function", "function": { "name": "...", "description": "...", "parameters": {...} } } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Tool { - /// Function tool - flat structure in Responses API + /// Function tool — accepts both flat and nested `function` object shapes. Function { - name: String, + /// Top-level name (flat shape). + name: Option, + /// Top-level description (flat shape). description: Option, + /// Top-level parameters (flat shape). parameters: Option, + /// Top-level strict flag (flat shape). strict: Option, + /// Nested `function` object (nested/compat shape). + /// + /// When present, `name`/`description`/`parameters` from the outer level are + /// ignored in favour of the values inside this object. + #[serde(default, flatten)] + function: Option, }, /// File search tool FileSearch { @@ -321,6 +336,49 @@ pub enum Tool { }, } +impl Tool { + pub fn name(&self) -> Option<&str> { + match self { + Tool::Function { name, function, .. } => { + function + .as_ref() + .and_then(|f| f.name.as_ref()) + .map(|s| s.as_str()) + .or_else(|| name.as_ref().map(|s| s.as_str())) + } + Tool::Custom { name, .. } => name.as_deref(), + _ => None, + } + } + + pub fn description(&self) -> Option<&String> { + match self { + Tool::Function { + description, + function, + .. + } => description + .as_ref() + .or_else(|| function.as_ref().and_then(|f| f.description.as_ref())), + Tool::Custom { description, .. } => description.as_ref(), + _ => None, + } + } + + pub fn parameters(&self) -> Option<&serde_json::Value> { + match self { + Tool::Function { + parameters, + function, + .. + } => parameters + .as_ref() + .or_else(|| function.as_ref().and_then(|f| f.parameters.as_ref())), + _ => None, + } + } +} + /// Ranking options for file search #[skip_serializing_none] #[derive(Debug, Clone, Serialize, Deserialize)] @@ -343,6 +401,16 @@ pub struct UserLocation { pub timezone: Option, } +/// Inner function definition — used inside the nested `function` object. +#[skip_serializing_none] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FunctionDef { + pub name: Option, + pub description: Option, + pub parameters: Option, + pub strict: Option, +} + /// Tool choice options #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] @@ -1158,7 +1226,10 @@ impl ProviderRequest for ResponsesAPIRequest { tools .iter() .filter_map(|tool| match tool { - Tool::Function { name, .. } => Some(name.clone()), + Tool::Function { name, function, .. } => function + .as_ref() + .and_then(|f| f.name.clone()) + .or_else(|| name.clone()), Tool::Custom { name: Some(name), .. } => Some(name.clone()), diff --git a/crates/hermesllm/src/lib.rs b/crates/hermesllm/src/lib.rs index 3b9611e0..645bb4e9 100644 --- a/crates/hermesllm/src/lib.rs +++ b/crates/hermesllm/src/lib.rs @@ -11,6 +11,7 @@ pub use apis::streaming_shapes::sse::{SseEvent, SseStreamIter}; pub use aws_smithy_eventstream::frame::DecodedFrame; pub use providers::id::ProviderId; pub use providers::request::{ProviderRequest, ProviderRequestError, ProviderRequestType}; +pub use providers::request_adapter::serialize_for_upstream; pub use providers::response::{ ProviderResponse, ProviderResponseError, ProviderResponseType, TokenUsage, }; diff --git a/crates/hermesllm/src/providers/mod.rs b/crates/hermesllm/src/providers/mod.rs index 4343022f..59d4605b 100644 --- a/crates/hermesllm/src/providers/mod.rs +++ b/crates/hermesllm/src/providers/mod.rs @@ -5,10 +5,12 @@ //! pub mod id; pub mod request; +pub mod request_adapter; pub mod response; pub mod streaming_response; pub use id::ProviderId; pub use request::{ProviderRequest, ProviderRequestError, ProviderRequestType}; +pub use request_adapter::serialize_for_upstream; pub use response::{ProviderResponse, ProviderResponseType, TokenUsage}; pub use streaming_response::{ProviderStreamResponse, ProviderStreamResponseType}; diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index aa100a17..f652972f 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -1015,6 +1015,54 @@ mod tests { } } + #[test] + fn test_normalize_for_upstream_chatgpt_sets_store_stream_and_wraps_input() { + use crate::apis::openai::OpenAIApi::Responses; + use crate::apis::openai_responses::InputParam; + + let responses_req = ResponsesAPIRequest { + model: "gpt-5.4".to_string(), + input: InputParam::Text("Hello, Codex!".to_string()), + temperature: None, + max_output_tokens: Some(8192), + stream: Some(false), + metadata: None, + tools: None, + tool_choice: None, + parallel_tool_calls: None, + instructions: None, + modalities: None, + user: None, + store: None, + reasoning_effort: None, + include: None, + audio: None, + text: None, + service_tier: None, + top_p: None, + top_logprobs: None, + stream_options: None, + truncation: None, + conversation: None, + previous_response_id: None, + max_tool_calls: None, + background: None, + }; + + let upstream_api = SupportedUpstreamAPIs::OpenAIResponsesAPI(Responses); + let mut request = ProviderRequestType::ResponsesAPIRequest(responses_req); + request.normalize_for_upstream(ProviderId::ChatGPT, &upstream_api); + + match request { + ProviderRequestType::ResponsesAPIRequest(req) => { + assert_eq!(req.max_output_tokens, Some(8192)); + assert_eq!(req.store, Some(false)); + assert_eq!(req.stream, Some(true)); + assert!(matches!(req.input, InputParam::Items(_))); + } + _ => panic!("Expected ResponsesAPIRequest variant"), + } + } #[test] fn test_chat_completions_to_responses_api_not_supported() { use crate::apis::openai::OpenAIApi::Responses; diff --git a/crates/hermesllm/src/providers/request_adapter.rs b/crates/hermesllm/src/providers/request_adapter.rs new file mode 100644 index 00000000..0ae70b93 --- /dev/null +++ b/crates/hermesllm/src/providers/request_adapter.rs @@ -0,0 +1,407 @@ +use crate::apis::openai_responses::ResponsesAPIRequest; +use crate::providers::id::ProviderId; +use crate::providers::request::{ProviderRequest, ProviderRequestError, ProviderRequestType}; + +/// Serialize a provider request for the upstream wire format. +/// +/// For most providers this is plain `to_bytes()`. ChatGPT's native /responses +/// backend has wire-format quirks that require post-serialization patching: +/// - `max_output_tokens` must be sent as `maxTokens` +/// - Structured content arrays (`input_text`/`output_text` typed parts) +/// must be flattened to plain text strings +pub fn serialize_for_upstream( + request: &ProviderRequestType, + provider_id: ProviderId, +) -> Result, ProviderRequestError> { + match (provider_id, request) { + (ProviderId::ChatGPT, ProviderRequestType::ResponsesAPIRequest(req)) => { + adapt_chatgpt_responses_request(req) + } + _ => request.to_bytes(), + } +} + +/// Apply ChatGPT-specific wire-format fixes to a ResponsesAPI request. +/// +/// Works at the JSON value level so we can rename keys and restructure +/// content without needing separate serde types for the ChatGPT variant. +fn adapt_chatgpt_responses_request( + req: &ResponsesAPIRequest, +) -> Result, ProviderRequestError> { + let mut value = serde_json::to_value(req).map_err(|e| ProviderRequestError { + message: format!("Failed to encode ChatGPT responses request as JSON value: {}", e), + source: Some(Box::new(e)), + })?; + + if let Some(obj) = value.as_object_mut() { + // ChatGPT rejects `max_output_tokens`; it expects `maxTokens` + if let Some(max_output_tokens) = obj.remove("max_output_tokens") { + if !max_output_tokens.is_null() { + obj.insert("maxTokens".to_string(), max_output_tokens); + } + } + + // ChatGPT rejects structured content arrays with typed parts + // (input_text, output_text); flatten them to plain text strings + flatten_input_content_parts(obj); + } + + serde_json::to_vec(&value).map_err(|e| ProviderRequestError { + message: format!("Failed to serialize ChatGPT responses request for upstream: {}", e), + source: Some(Box::new(e)), + }) +} + +/// Walk through `input[].content` and collapse typed content-part arrays +/// into plain text strings that ChatGPT accepts. +fn flatten_input_content_parts(obj: &mut serde_json::Map) { + let input = match obj.get_mut("input").and_then(|v| v.as_array_mut()) { + Some(arr) => arr, + None => return, + }; + + for item in input { + let content = match item + .as_object_mut() + .and_then(|m| m.get_mut("content")) + { + Some(c) => c, + None => continue, + }; + + let parts = match content.as_array() { + Some(p) => p, + None => continue, + }; + + let mut saw_text_part = false; + let text = parts + .iter() + .filter_map(|part| { + let part_obj = part.as_object()?; + match part_obj.get("type").and_then(|v| v.as_str()) { + Some("input_text") | Some("output_text") => { + saw_text_part = true; + Some( + part_obj + .get("text") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(), + ) + } + _ => None, + } + }) + .collect::>() + .join("\n"); + + // Even when all text parts are empty, we still need to collapse the array. + // Leaving typed parts in-place causes ChatGPT Codex endpoints to reject them. + if saw_text_part { + *content = serde_json::Value::String(text); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::apis::openai::OpenAIApi; + use crate::apis::openai_responses::{ + InputContent, InputItem, InputMessage, InputParam, MessageContent, MessageRole, + ResponsesAPIRequest, + }; + + fn make_responses_request(input: InputParam, max_output_tokens: Option) -> ResponsesAPIRequest { + ResponsesAPIRequest { + model: "gpt-5.4".to_string(), + input, + temperature: None, + max_output_tokens, + stream: Some(true), + metadata: None, + tools: None, + tool_choice: None, + parallel_tool_calls: None, + instructions: Some("You are Codex.".to_string()), + modalities: None, + user: None, + store: Some(false), + reasoning_effort: None, + include: None, + audio: None, + text: None, + service_tier: None, + top_p: None, + top_logprobs: None, + stream_options: None, + truncation: None, + conversation: None, + previous_response_id: None, + max_tool_calls: None, + background: None, + } + } + + // --------------------------------------------------------------- + // max_output_tokens → maxTokens rename + // --------------------------------------------------------------- + + #[test] + fn chatgpt_renames_max_output_tokens_to_max_tokens_on_wire() { + let req = make_responses_request( + InputParam::Text("Hello".to_string()), + Some(8192), + ); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(wire.get("max_output_tokens").is_none(), + "max_output_tokens should be absent from wire format"); + assert_eq!(wire.get("maxTokens").and_then(|v| v.as_i64()), Some(8192), + "maxTokens should be present with the original value"); + } + + #[test] + fn chatgpt_omits_max_tokens_when_max_output_tokens_is_none() { + let req = make_responses_request( + InputParam::Text("Hello".to_string()), + None, + ); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(wire.get("max_output_tokens").is_none()); + assert!(wire.get("maxTokens").is_none(), + "maxTokens should not appear when original was None"); + } + + #[test] + fn non_chatgpt_preserves_max_output_tokens_field_name() { + let req = make_responses_request( + InputParam::Text("Hello".to_string()), + Some(4096), + ); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::OpenAI).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert_eq!(wire.get("max_output_tokens").and_then(|v| v.as_i64()), Some(4096)); + assert!(wire.get("maxTokens").is_none()); + } + + // --------------------------------------------------------------- + // input_text / output_text content flattening + // --------------------------------------------------------------- + + #[test] + fn chatgpt_flattens_input_text_content_parts_to_plain_string() { + let input = InputParam::Items(vec![InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Items(vec![ + InputContent::InputText { text: "first line".to_string() }, + InputContent::InputText { text: "second line".to_string() }, + ]), + })]); + + let req = make_responses_request(input, None); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + let content = &wire["input"][0]["content"]; + assert!(content.is_string(), + "content should be flattened to a string, got: {}", content); + assert_eq!(content.as_str().unwrap(), "first line\nsecond line"); + } + + #[test] + fn chatgpt_flattens_output_text_content_parts() { + let input = InputParam::Items(vec![InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![ + InputContent::InputText { text: "assistant reply".to_string() }, + ]), + })]); + + let req = make_responses_request(input, None); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + let content = &wire["input"][0]["content"]; + assert!(content.is_string()); + assert_eq!(content.as_str().unwrap(), "assistant reply"); + } + + #[test] + fn chatgpt_flattens_empty_input_text_content_parts() { + let input = InputParam::Items(vec![InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![InputContent::InputText { + text: "".to_string(), + }]), + })]); + + let req = make_responses_request(input, None); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + let content = &wire["input"][0]["content"]; + assert!( + content.is_string(), + "content should be flattened to a string, got: {}", + content + ); + assert_eq!(content.as_str().unwrap(), ""); + } + + #[test] + fn chatgpt_preserves_plain_text_content_unchanged() { + let input = InputParam::Items(vec![InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Text("plain text message".to_string()), + })]); + + let req = make_responses_request(input, None); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + let content = &wire["input"][0]["content"]; + assert_eq!(content.as_str().unwrap(), "plain text message"); + } + + #[test] + fn non_chatgpt_does_not_flatten_content_parts() { + let input = InputParam::Items(vec![InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Items(vec![ + InputContent::InputText { text: "part one".to_string() }, + InputContent::InputText { text: "part two".to_string() }, + ]), + })]); + + let req = make_responses_request(input, None); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::OpenAI).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + let content = &wire["input"][0]["content"]; + assert!(content.is_array(), + "OpenAI should preserve array content, got: {}", content); + } + + // --------------------------------------------------------------- + // Both fixes together (realistic ChatGPT payload) + // --------------------------------------------------------------- + + #[test] + fn chatgpt_applies_both_fixes_together() { + let input = InputParam::Items(vec![ + InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Items(vec![ + InputContent::InputText { text: "Write a function".to_string() }, + ]), + }), + InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![ + InputContent::InputText { text: "def hello(): pass".to_string() }, + ]), + }), + InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Items(vec![ + InputContent::InputText { text: "Add a docstring".to_string() }, + ]), + }), + ]); + + let req = make_responses_request(input, Some(16384)); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + // max_output_tokens renamed + assert!(wire.get("max_output_tokens").is_none()); + assert_eq!(wire.get("maxTokens").and_then(|v| v.as_i64()), Some(16384)); + + // All content arrays flattened + for (i, item) in wire["input"].as_array().unwrap().iter().enumerate() { + let content = &item["content"]; + assert!(content.is_string(), + "input[{}].content should be a string, got: {}", i, content); + } + } + + // --------------------------------------------------------------- + // Non-ResponsesAPI requests pass through unchanged + // --------------------------------------------------------------- + + #[test] + fn chatgpt_chat_completions_request_passes_through() { + use crate::apis::openai::{ChatCompletionsRequest, Message, MessageContent as MC, Role}; + + let chat_req = ChatCompletionsRequest { + model: "gpt-5.4".to_string(), + messages: vec![Message { + role: Role::User, + content: Some(MC::Text("Hello".to_string())), + name: None, + tool_calls: None, + tool_call_id: None, + }], + max_completion_tokens: Some(1024), + ..Default::default() + }; + let request = ProviderRequestType::ChatCompletionsRequest(chat_req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert_eq!(wire.get("max_completion_tokens").and_then(|v| v.as_i64()), Some(1024)); + } + + // --------------------------------------------------------------- + // Normalize + serialize round-trip (full pipeline test) + // --------------------------------------------------------------- + + #[test] + fn chatgpt_full_pipeline_normalize_then_serialize() { + let input = InputParam::Text("Hello, Codex!".to_string()); + let req = make_responses_request(input, Some(8192)); + + let upstream_api = crate::clients::endpoints::SupportedUpstreamAPIs::OpenAIResponsesAPI( + OpenAIApi::Responses, + ); + let mut request = ProviderRequestType::ResponsesAPIRequest(req); + + // normalize_for_upstream sets store=false, stream=true, wraps input in Items + request.normalize_for_upstream(ProviderId::ChatGPT, &upstream_api); + + // serialize_for_upstream then renames max_output_tokens and flattens content + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(wire.get("max_output_tokens").is_none()); + assert_eq!(wire.get("maxTokens").and_then(|v| v.as_i64()), Some(8192)); + assert_eq!(wire.get("store"), Some(&serde_json::Value::Bool(false))); + assert_eq!(wire.get("stream"), Some(&serde_json::Value::Bool(true))); + assert!(wire["input"].is_array(), "input should be an array after normalize"); + } +} diff --git a/crates/hermesllm/src/transforms/request/from_openai.rs b/crates/hermesllm/src/transforms/request/from_openai.rs index b673af38..4f6b4113 100644 --- a/crates/hermesllm/src/transforms/request/from_openai.rs +++ b/crates/hermesllm/src/transforms/request/from_openai.rs @@ -513,15 +513,27 @@ impl TryFrom for ChatCompletionsRequest { description, parameters, strict, - } => converted_chat_tools.push(Tool { - tool_type: "function".to_string(), - function: crate::apis::openai::Function { - name, - description, - parameters: normalize_function_parameters(parameters, None), - strict, - }, - }), + function, + } => { + let resolved_name = function + .as_ref() + .and_then(|f| f.name.clone()) + .or_else(|| name.clone()) + .unwrap_or_else(|| "".to_string()); + let resolved_description = function + .as_ref() + .and_then(|f| f.description.clone()) + .or_else(|| description.clone()); + converted_chat_tools.push(Tool { + tool_type: "function".to_string(), + function: crate::apis::openai::Function { + name: resolved_name, + description: resolved_description, + parameters: normalize_function_parameters(parameters, None), + strict, + }, + }) + } ResponsesTool::WebSearchPreview { search_context_size, user_location, @@ -803,10 +815,10 @@ impl TryFrom for ConverseRequest { .into_iter() .map(|tool| BedrockTool::ToolSpec { tool_spec: ToolSpecDefinition { - name: tool.function.name, - description: tool.function.description, + name: tool.function.name.clone(), + description: tool.function.description.clone(), input_schema: ToolInputSchema { - json: tool.function.parameters, + json: tool.function.parameters.clone(), }, }, }) @@ -1349,7 +1361,7 @@ mod tests { output: serde_json::json!({"status":"ok","stdout":"hello"}), }]), tools: Some(vec![ResponsesTool::Function { - name: "exec_command".to_string(), + name: Some("exec_command".to_string()), description: Some("Execute a shell command".to_string()), parameters: Some(serde_json::json!({ "type": "object", @@ -1358,6 +1370,7 @@ mod tests { }, "required": ["cmd"] })), + function: None, strict: Some(false), }]), include: None, diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index fa9964dd..0ce10e9e 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -27,7 +27,8 @@ use hermesllm::clients::endpoints::SupportedAPIsFromClient; use hermesllm::providers::response::ProviderResponse; use hermesllm::providers::streaming_response::ProviderStreamResponse; use hermesllm::{ - DecodedFrame, ProviderId, ProviderRequest, ProviderRequestType, ProviderResponseType, + serialize_for_upstream, DecodedFrame, ProviderId, ProviderRequest, ProviderRequestType, + ProviderResponseType, ProviderStreamResponseType, }; @@ -1105,7 +1106,9 @@ impl HttpContext for StreamContext { ); return Action::Pause; } - } + }; + + request_bytes } Err(e) => { warn!( From d4e9e4a2412475bde0ba04a2761a9e14131c1e3d Mon Sep 17 00:00:00 2001 From: Tom Stoffer Date: Tue, 5 May 2026 14:40:43 +1200 Subject: [PATCH 5/7] Add chatgpt5.5 --- crates/hermesllm/src/bin/provider_models.yaml | 1 + demos/llm_routing/chatgpt_subscription/README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/hermesllm/src/bin/provider_models.yaml b/crates/hermesllm/src/bin/provider_models.yaml index 2e9e0a9b..0a2bcf4c 100644 --- a/crates/hermesllm/src/bin/provider_models.yaml +++ b/crates/hermesllm/src/bin/provider_models.yaml @@ -237,6 +237,7 @@ providers: - openai/gpt-5.4-nano-2026-03-17 - openai/gpt-5.4-nano - openai/gpt-5.4-mini-2026-03-17 + - openai/gpt-5.5 - openai/gpt-3.5-turbo-instruct - openai/gpt-3.5-turbo-instruct-0914 - openai/gpt-3.5-turbo-1106 diff --git a/demos/llm_routing/chatgpt_subscription/README.md b/demos/llm_routing/chatgpt_subscription/README.md index d091155a..c2b83225 100644 --- a/demos/llm_routing/chatgpt_subscription/README.md +++ b/demos/llm_routing/chatgpt_subscription/README.md @@ -48,6 +48,7 @@ bash test_chatgpt.sh ## Available models ``` +chatgpt/gpt-5.5 chatgpt/gpt-5.4 chatgpt/gpt-5.3-codex chatgpt/gpt-5.2 From 18779d493ed2cf56840d2b2c41f14c6b4413a151 Mon Sep 17 00:00:00 2001 From: Tom Stoffer Date: Mon, 25 May 2026 18:20:32 +1200 Subject: [PATCH 6/7] fix(chatgpt): support subscription response streaming --- cli/planoai/main.py | 8 +- crates/brightstaff/src/handlers/llm/mod.rs | 19 +- crates/hermesllm/src/apis/openai_responses.rs | 32 +- .../responses_api_streaming_buffer.rs | 135 ++++--- .../src/apis/streaming_shapes/sse.rs | 11 + crates/hermesllm/src/bin/provider_models.yaml | 1 + crates/hermesllm/src/providers/id.rs | 9 + .../src/providers/request_adapter.rs | 330 +++++++++++++++--- .../src/transforms/request/from_openai.rs | 2 + crates/llm_gateway/src/stream_context.rs | 101 +++++- 10 files changed, 506 insertions(+), 142 deletions(-) diff --git a/cli/planoai/main.py b/cli/planoai/main.py index ea43a1a8..04a3d41b 100644 --- a/cli/planoai/main.py +++ b/cli/planoai/main.py @@ -472,10 +472,12 @@ def up( else: env_file_dict = load_env_file_to_dict(app_env_file) for access_key in access_keys: - if env_file_dict.get(access_key) is None: - missing_keys.append(access_key) - else: + if env_file_dict.get(access_key) is not None: env_stage[access_key] = env_file_dict[access_key] + elif env.get(access_key) is not None: + env_stage[access_key] = env.get(access_key) + else: + missing_keys.append(access_key) if missing_keys: _print_missing_keys(console, missing_keys) diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 505eb156..c14bea34 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -283,15 +283,16 @@ async fn llm_chat_inner( }; // Serialize request for upstream BEFORE router consumes it - let client_request_bytes_for_upstream: Bytes = match serialize_for_upstream(&client_request, provider_id) { - Ok(bytes) => bytes.into(), - Err(err) => { - warn!(error = %err, "failed to serialize request for upstream"); - let mut r = Response::new(full(format!("Failed to serialize request: {}", err))); - *r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Ok(r); - } - }; + let client_request_bytes_for_upstream: Bytes = + match serialize_for_upstream(&client_request, provider_id) { + Ok(bytes) => bytes.into(), + Err(err) => { + warn!(error = %err, "failed to serialize request for upstream"); + let mut r = Response::new(full(format!("Failed to serialize request: {}", err))); + *r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + return Ok(r); + } + }; // --- Phase 3: Route the request (or use pinned model from session cache) --- let resolved_model = if let Some(cached_model) = pinned_model { diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 2fa0c505..e43fb2d4 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -126,16 +126,12 @@ pub enum InputParam { pub enum InputItem { /// Input message (role + content) Message(InputMessage), - /// Item reference - ItemReference { - #[serde(rename = "type")] - item_type: String, - id: String, - }, /// Function call emitted by model in prior turn FunctionCall { #[serde(rename = "type")] item_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + id: Option, name: String, arguments: String, call_id: String, @@ -147,6 +143,18 @@ pub enum InputItem { call_id: String, output: serde_json::Value, }, + /// Item reference + /// + /// Keep this after concrete item variants. Some Responses items include an + /// `id` plus additional required fields (`function_call` has `call_id`, + /// `name`, and `arguments`). With serde's untagged enum matching, placing + /// this broad reference shape first silently drops those fields and sends an + /// invalid upstream item. + ItemReference { + #[serde(rename = "type")] + item_type: String, + id: String, + }, } /// Input message with role and content @@ -339,13 +347,11 @@ pub enum Tool { impl Tool { pub fn name(&self) -> Option<&str> { match self { - Tool::Function { name, function, .. } => { - function - .as_ref() - .and_then(|f| f.name.as_ref()) - .map(|s| s.as_str()) - .or_else(|| name.as_ref().map(|s| s.as_str())) - } + Tool::Function { name, function, .. } => function + .as_ref() + .and_then(|f| f.name.as_ref()) + .map(|s| s.as_str()) + .or_else(|| name.as_ref().map(|s| s.as_str())), Tool::Custom { name, .. } => name.as_deref(), _ => None, } diff --git a/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs b/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs index 92589ccf..7ccaa84f 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs @@ -1,5 +1,5 @@ use crate::apis::openai_responses::{ - OutputItem, OutputItemStatus, Reasoning, ResponseStatus, ResponsesAPIResponse, + OutputContent, OutputItem, OutputItemStatus, Reasoning, ResponseStatus, ResponsesAPIResponse, ResponsesAPIStreamEvent, TextConfig, TextFormat, }; use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamBufferTrait}; @@ -14,6 +14,8 @@ fn event_to_sse(event: ResponsesAPIStreamEvent) -> SseEvent { ResponsesAPIStreamEvent::ResponseCompleted { .. } => "response.completed", ResponsesAPIStreamEvent::ResponseOutputItemAdded { .. } => "response.output_item.added", ResponsesAPIStreamEvent::ResponseOutputItemDone { .. } => "response.output_item.done", + ResponsesAPIStreamEvent::ResponseContentPartAdded { .. } => "response.content_part.added", + ResponsesAPIStreamEvent::ResponseContentPartDone { .. } => "response.content_part.done", ResponsesAPIStreamEvent::ResponseOutputTextDelta { .. } => "response.output_text.delta", ResponsesAPIStreamEvent::ResponseOutputTextDone { .. } => "response.output_text.done", ResponsesAPIStreamEvent::ResponseFunctionCallArgumentsDelta { .. } => { @@ -178,6 +180,22 @@ impl ResponsesAPIStreamBuffer { event_to_sse(event) } + /// Create content_part.added event for text + fn create_content_part_added_event(&mut self, output_index: i32, item_id: &str) -> SseEvent { + let event = ResponsesAPIStreamEvent::ResponseContentPartAdded { + item_id: item_id.to_string(), + output_index, + content_index: 0, + part: OutputContent::OutputText { + text: String::new(), + annotations: vec![], + logprobs: None, + }, + sequence_number: self.next_sequence_number(), + }; + event_to_sse(event) + } + /// Create output_item.added event for tool call fn create_tool_call_added_event( &mut self, @@ -291,44 +309,10 @@ impl ResponsesAPIStreamBuffer { // Emit done events for all accumulated content - // Text content done events - let text_items: Vec<_> = self - .text_content - .iter() - .map(|(id, content)| (id.clone(), content.clone())) - .collect(); - for (item_id, content) in text_items { - let output_index = self - .output_items_added - .iter() - .find(|(_, id)| **id == item_id) - .map(|(idx, _)| *idx) - .unwrap_or(0); - - let seq1 = self.next_sequence_number(); - let text_done_event = ResponsesAPIStreamEvent::ResponseOutputTextDone { - item_id: item_id.clone(), - output_index, - content_index: 0, - text: content.clone(), - logprobs: vec![], - sequence_number: seq1, - }; - events.push(event_to_sse(text_done_event)); - - let seq2 = self.next_sequence_number(); - let item_done_event = ResponsesAPIStreamEvent::ResponseOutputItemDone { - output_index, - item: OutputItem::Message { - id: item_id.clone(), - status: OutputItemStatus::Completed, - role: "assistant".to_string(), - content: vec![], - }, - sequence_number: seq2, - }; - events.push(event_to_sse(item_done_event)); - } + // Text completion is represented in response.completed.output below. + // Avoid emitting optional per-item done events here: some proxy-wasm + // streaming paths truncate size-changing final chunks, and strict + // Responses clients only require the terminal completed response. // Function call done events let func_items: Vec<_> = self @@ -541,6 +525,7 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer { self.output_items_added .insert(*output_index, item_id.clone()); events.push(self.create_output_item_added_event(*output_index, &item_id)); + events.push(self.create_content_part_added_event(*output_index, &item_id)); } // Accumulate text content @@ -622,6 +607,26 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer { } events.push(event_to_sse(delta_event)); } + ResponsesAPIStreamEvent::ResponseCompleted { response, .. } => { + // Some upstream Responses API streams emit response.completed with an + // empty response.output even after text deltas were streamed. Clients + // such as OpenClaw may read the final response.output rather than the + // deltas, so synthesize our completed event from accumulated stream + // state while preserving upstream metadata like usage/service tier. + self.upstream_response_metadata = Some(response.clone()); + self.finalize(); + return; + } + ResponsesAPIStreamEvent::ResponseOutputItemAdded { .. } + | ResponsesAPIStreamEvent::ResponseContentPartAdded { .. } + | ResponsesAPIStreamEvent::ResponseOutputTextDone { .. } + | ResponsesAPIStreamEvent::ResponseContentPartDone { .. } + | ResponsesAPIStreamEvent::ResponseOutputItemDone { .. } => { + // We generate a canonical Responses lifecycle from deltas/finalize. + // Passing upstream lifecycle events through can create duplicate or + // out-of-order item/content IDs that strict clients may ignore. + return; + } _ => { // For other event types, just pass through with sequence number let other_event = stream_event.as_ref().clone(); @@ -637,7 +642,7 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer { fn to_bytes(&mut self) -> Vec { // For Responses API, we need special handling: // - Most events are already in buffered_events from add_transformed_event - // - We should NOT finalize here - finalization happens when we detect [DONE] or end of stream + // - Finalization happens when we detect [DONE], response.completed, or transport end // - Just flush the accumulated events and clear the buffer // Convert all accumulated events to bytes and clear buffer @@ -715,14 +720,6 @@ mod tests { output.contains("response.output_text.delta"), "Should have text deltas" ); - assert!( - output.contains("response.output_text.done"), - "Should have text.done" - ); - assert!( - output.contains("response.output_item.done"), - "Should have output_item.done" - ); assert!( output.contains("response.completed"), "Should have response.completed" @@ -731,8 +728,8 @@ mod tests { println!("\nVALIDATION SUMMARY:"); println!("{}", "-".repeat(80)); println!("✓ Lifecycle events: response.created, response.in_progress, response.completed"); - println!("✓ Output item lifecycle: output_item.added, output_item.done"); - println!("✓ Text streaming: output_text.delta (2 deltas), output_text.done"); + println!("✓ Output item lifecycle: output_item.added, response.completed output"); + println!("✓ Text streaming: output_text.delta (2 deltas), response.completed output"); println!("✓ Complete transformation with finalization ([DONE] processed)\n"); } @@ -853,4 +850,42 @@ data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890 "response.completed should be emitted exactly once" ); } + + #[test] + fn test_responses_transport_end_finalizes_captured_gpt55_shape() { + // Captured from the OpenClaw GPT-5.5 repro: upstream emitted Responses + // text deltas, then the transport ended without an explicit [DONE] or + // response.completed event. The gateway must synthesize the terminal + // lifecycle so strict Responses clients can produce a visible result. + let raw_input = r#"event: response.output_item.added +data: {"type":"response.output_item.added","output_index":1,"item":{"type":"message","id":"msg_upstream","status":"in_progress","role":"assistant","content":[]},"sequence_number":2} + +event: response.content_part.added +data: {"type":"response.content_part.added","item_id":"msg_upstream","output_index":1,"content_index":0,"part":{"type":"output_text","text":"","annotations":[],"logprobs":null},"sequence_number":3} + +event: response.output_text.delta +data: {"type":"response.output_text.delta","item_id":"msg_upstream","output_index":1,"content_index":0,"delta":"PAR","logprobs":[],"sequence_number":4} + +event: response.output_text.delta +data: {"type":"response.output_text.delta","item_id":"msg_upstream","output_index":1,"content_index":0,"delta":"IS","logprobs":[],"sequence_number":5}"#; + + let client_api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + let upstream_api = SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses); + let stream_iter = SseStreamIter::try_from(raw_input.as_bytes()).unwrap(); + let mut buffer = ResponsesAPIStreamBuffer::new(); + + for raw_event in stream_iter { + let transformed_event = + SseEvent::try_from((raw_event, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(transformed_event); + } + let partial_output = String::from_utf8_lossy(&buffer.to_bytes()).to_string(); + assert!(partial_output.contains("response.output_text.delta")); + assert!(!partial_output.contains("response.completed")); + + buffer.finalize(); + let terminal_output = String::from_utf8_lossy(&buffer.to_bytes()).to_string(); + assert!(terminal_output.contains("event: response.completed")); + assert!(terminal_output.contains("PARIS")); + } } diff --git a/crates/hermesllm/src/apis/streaming_shapes/sse.rs b/crates/hermesllm/src/apis/streaming_shapes/sse.rs index f0d31e0a..fac4040b 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/sse.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/sse.rs @@ -38,6 +38,11 @@ pub trait SseStreamBufferTrait: Send + Sync { /// # Returns /// Bytes ready for wire transmission (may be empty if no events were accumulated) fn to_bytes(&mut self) -> Vec; + + /// Finalize a stream when the transport ends without an explicit provider + /// terminal event. Most buffers do not need this; Responses API buffering + /// uses it to synthesize *.done and response.completed lifecycle events. + fn finalize_stream(&mut self) {} } /// Unified SSE Stream Buffer enum that provides a zero-cost abstraction @@ -66,6 +71,12 @@ impl SseStreamBufferTrait for SseStreamBuffer { Self::OpenAIResponses(buffer) => buffer.to_bytes(), } } + + fn finalize_stream(&mut self) { + if let Self::OpenAIResponses(buffer) = self { + buffer.finalize(); + } + } } // ============================================================================ diff --git a/crates/hermesllm/src/bin/provider_models.yaml b/crates/hermesllm/src/bin/provider_models.yaml index 0a2bcf4c..75df280f 100644 --- a/crates/hermesllm/src/bin/provider_models.yaml +++ b/crates/hermesllm/src/bin/provider_models.yaml @@ -331,6 +331,7 @@ providers: - xiaomi/mimo-v2-omni - xiaomi/mimo-v2-pro chatgpt: + - chatgpt/gpt-5.5 - chatgpt/gpt-5.4 - chatgpt/gpt-5.3-codex - chatgpt/gpt-5.2 diff --git a/crates/hermesllm/src/providers/id.rs b/crates/hermesllm/src/providers/id.rs index 4fa7d19d..482a1848 100644 --- a/crates/hermesllm/src/providers/id.rs +++ b/crates/hermesllm/src/providers/id.rs @@ -331,6 +331,15 @@ mod tests { ); } + #[test] + fn test_chatgpt_models_include_gpt_5_5() { + let chatgpt_models = ProviderId::ChatGPT.models(); + assert!( + chatgpt_models.iter().any(|model| model == "gpt-5.5"), + "ChatGPT models should include gpt-5.5" + ); + } + #[test] fn test_unsupported_providers_return_empty() { // Providers without models should return empty vec diff --git a/crates/hermesllm/src/providers/request_adapter.rs b/crates/hermesllm/src/providers/request_adapter.rs index 0ae70b93..0e610e71 100644 --- a/crates/hermesllm/src/providers/request_adapter.rs +++ b/crates/hermesllm/src/providers/request_adapter.rs @@ -6,7 +6,9 @@ use crate::providers::request::{ProviderRequest, ProviderRequestError, ProviderR /// /// For most providers this is plain `to_bytes()`. ChatGPT's native /responses /// backend has wire-format quirks that require post-serialization patching: -/// - `max_output_tokens` must be sent as `maxTokens` +/// - `max_output_tokens` must be sent as `maxTokens` for GPT-5.4-era models, +/// but omitted for GPT-5.5, which rejects `maxTokens` +/// - `truncation` must be omitted; ChatGPT Codex rejects it /// - Structured content arrays (`input_text`/`output_text` typed parts) /// must be flattened to plain text strings pub fn serialize_for_upstream( @@ -29,25 +31,51 @@ fn adapt_chatgpt_responses_request( req: &ResponsesAPIRequest, ) -> Result, ProviderRequestError> { let mut value = serde_json::to_value(req).map_err(|e| ProviderRequestError { - message: format!("Failed to encode ChatGPT responses request as JSON value: {}", e), + message: format!( + "Failed to encode ChatGPT responses request as JSON value: {}", + e + ), source: Some(Box::new(e)), })?; if let Some(obj) = value.as_object_mut() { - // ChatGPT rejects `max_output_tokens`; it expects `maxTokens` + let is_gpt_55 = obj + .get("model") + .and_then(|v| v.as_str()) + .map(|model| model == "gpt-5.5" || model.starts_with("gpt-5.5-")) + .unwrap_or(false); + + // ChatGPT rejects `max_output_tokens`. GPT-5.4-era Codex expects + // `maxTokens`, but GPT-5.5 rejects `maxTokens` too, so omit it there. if let Some(max_output_tokens) = obj.remove("max_output_tokens") { - if !max_output_tokens.is_null() { + if !is_gpt_55 && !max_output_tokens.is_null() { obj.insert("maxTokens".to_string(), max_output_tokens); } } + // ChatGPT Codex rejects this OpenAI Responses field. + obj.remove("truncation"); + // ChatGPT rejects structured content arrays with typed parts // (input_text, output_text); flatten them to plain text strings flatten_input_content_parts(obj); + + // ChatGPT does not persist output item references when store=false. + // OpenClaw uses store=false, so replayed hidden reasoning references + // must be dropped instead of sent back as `type=reasoning,id=rs_*`. + // The visible assistant/user transcript remains in the request. + remove_unstored_reasoning_input_refs(obj); + + // ChatGPT requires remaining reasoning input items to carry a summary + // array. This covers stored conversations where reasoning refs are valid. + ensure_reasoning_input_summaries(obj); } serde_json::to_vec(&value).map_err(|e| ProviderRequestError { - message: format!("Failed to serialize ChatGPT responses request for upstream: {}", e), + message: format!( + "Failed to serialize ChatGPT responses request for upstream: {}", + e + ), source: Some(Box::new(e)), }) } @@ -61,10 +89,7 @@ fn flatten_input_content_parts(obj: &mut serde_json::Map c, None => continue, }; @@ -104,6 +129,46 @@ fn flatten_input_content_parts(obj: &mut serde_json::Map) { + if obj.get("store").and_then(|v| v.as_bool()) != Some(false) { + return; + } + + let input = match obj.get_mut("input").and_then(|v| v.as_array_mut()) { + Some(arr) => arr, + None => return, + }; + + input.retain(|item| { + let Some(item) = item.as_object() else { + return true; + }; + + !(item.get("type").and_then(|v| v.as_str()) == Some("reasoning") + && item.get("id").and_then(|v| v.as_str()).is_some()) + }); +} + +fn ensure_reasoning_input_summaries(obj: &mut serde_json::Map) { + let input = match obj.get_mut("input").and_then(|v| v.as_array_mut()) { + Some(arr) => arr, + None => return, + }; + + for item in input { + let item = match item.as_object_mut() { + Some(item) => item, + None => continue, + }; + + if item.get("type").and_then(|v| v.as_str()) == Some("reasoning") + && !item.contains_key("summary") + { + item.insert("summary".to_string(), serde_json::Value::Array(Vec::new())); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -113,7 +178,10 @@ mod tests { ResponsesAPIRequest, }; - fn make_responses_request(input: InputParam, max_output_tokens: Option) -> ResponsesAPIRequest { + fn make_responses_request( + input: InputParam, + max_output_tokens: Option, + ) -> ResponsesAPIRequest { ResponsesAPIRequest { model: "gpt-5.4".to_string(), input, @@ -150,49 +218,81 @@ mod tests { #[test] fn chatgpt_renames_max_output_tokens_to_max_tokens_on_wire() { - let req = make_responses_request( - InputParam::Text("Hello".to_string()), - Some(8192), - ); + let req = make_responses_request(InputParam::Text("Hello".to_string()), Some(8192)); let request = ProviderRequestType::ResponsesAPIRequest(req); let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - assert!(wire.get("max_output_tokens").is_none(), - "max_output_tokens should be absent from wire format"); - assert_eq!(wire.get("maxTokens").and_then(|v| v.as_i64()), Some(8192), - "maxTokens should be present with the original value"); + assert!( + wire.get("max_output_tokens").is_none(), + "max_output_tokens should be absent from wire format" + ); + assert_eq!( + wire.get("maxTokens").and_then(|v| v.as_i64()), + Some(8192), + "maxTokens should be present with the original value" + ); } #[test] fn chatgpt_omits_max_tokens_when_max_output_tokens_is_none() { - let req = make_responses_request( - InputParam::Text("Hello".to_string()), - None, - ); + let req = make_responses_request(InputParam::Text("Hello".to_string()), None); let request = ProviderRequestType::ResponsesAPIRequest(req); let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); assert!(wire.get("max_output_tokens").is_none()); - assert!(wire.get("maxTokens").is_none(), - "maxTokens should not appear when original was None"); + assert!( + wire.get("maxTokens").is_none(), + "maxTokens should not appear when original was None" + ); + } + + #[test] + fn chatgpt_gpt55_omits_max_tokens_on_wire() { + let mut req = make_responses_request(InputParam::Text("Hello".to_string()), Some(8192)); + req.model = "gpt-5.5".to_string(); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(wire.get("max_output_tokens").is_none()); + assert!( + wire.get("maxTokens").is_none(), + "GPT-5.5 ChatGPT Codex rejects maxTokens, so it must be omitted" + ); + } + + #[test] + fn chatgpt_omits_truncation_on_wire() { + let mut req = make_responses_request(InputParam::Text("Hello".to_string()), None); + req.truncation = Some("disabled".to_string()); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert!( + wire.get("truncation").is_none(), + "ChatGPT Codex rejects truncation, so it must be omitted" + ); } #[test] fn non_chatgpt_preserves_max_output_tokens_field_name() { - let req = make_responses_request( - InputParam::Text("Hello".to_string()), - Some(4096), - ); + let req = make_responses_request(InputParam::Text("Hello".to_string()), Some(4096)); let request = ProviderRequestType::ResponsesAPIRequest(req); let bytes = serialize_for_upstream(&request, ProviderId::OpenAI).unwrap(); let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - assert_eq!(wire.get("max_output_tokens").and_then(|v| v.as_i64()), Some(4096)); + assert_eq!( + wire.get("max_output_tokens").and_then(|v| v.as_i64()), + Some(4096) + ); assert!(wire.get("maxTokens").is_none()); } @@ -205,8 +305,12 @@ mod tests { let input = InputParam::Items(vec![InputItem::Message(InputMessage { role: MessageRole::User, content: MessageContent::Items(vec![ - InputContent::InputText { text: "first line".to_string() }, - InputContent::InputText { text: "second line".to_string() }, + InputContent::InputText { + text: "first line".to_string(), + }, + InputContent::InputText { + text: "second line".to_string(), + }, ]), })]); @@ -217,8 +321,11 @@ mod tests { let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); let content = &wire["input"][0]["content"]; - assert!(content.is_string(), - "content should be flattened to a string, got: {}", content); + assert!( + content.is_string(), + "content should be flattened to a string, got: {}", + content + ); assert_eq!(content.as_str().unwrap(), "first line\nsecond line"); } @@ -226,9 +333,9 @@ mod tests { fn chatgpt_flattens_output_text_content_parts() { let input = InputParam::Items(vec![InputItem::Message(InputMessage { role: MessageRole::Assistant, - content: MessageContent::Items(vec![ - InputContent::InputText { text: "assistant reply".to_string() }, - ]), + content: MessageContent::Items(vec![InputContent::InputText { + text: "assistant reply".to_string(), + }]), })]); let req = make_responses_request(input, None); @@ -288,8 +395,12 @@ mod tests { let input = InputParam::Items(vec![InputItem::Message(InputMessage { role: MessageRole::User, content: MessageContent::Items(vec![ - InputContent::InputText { text: "part one".to_string() }, - InputContent::InputText { text: "part two".to_string() }, + InputContent::InputText { + text: "part one".to_string(), + }, + InputContent::InputText { + text: "part two".to_string(), + }, ]), })]); @@ -300,8 +411,111 @@ mod tests { let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); let content = &wire["input"][0]["content"]; - assert!(content.is_array(), - "OpenAI should preserve array content, got: {}", content); + assert!( + content.is_array(), + "OpenAI should preserve array content, got: {}", + content + ); + } + + // --------------------------------------------------------------- + // Reasoning item compatibility + // --------------------------------------------------------------- + + #[test] + fn chatgpt_adds_empty_summary_to_stored_reasoning_input_items() { + let req: ResponsesAPIRequest = serde_json::from_value(serde_json::json!({ + "model": "gpt-5.5", + "input": [ + {"type": "reasoning", "id": "rs_123"}, + {"role": "user", "content": "Are you there?"} + ], + "store": true, + "stream": true + })) + .unwrap(); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert_eq!( + wire["input"][0]["summary"], + serde_json::Value::Array(Vec::new()), + "GPT-5.5 ChatGPT rejects reasoning input items without summary" + ); + } + + #[test] + fn chatgpt_drops_reasoning_item_references_when_store_false() { + let req: ResponsesAPIRequest = serde_json::from_value(serde_json::json!({ + "model": "gpt-5.5", + "input": [ + {"type": "reasoning", "id": "rs_123"}, + {"role": "user", "content": "Are you there?"} + ], + "store": false, + "stream": true + })) + .unwrap(); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + let input = wire["input"].as_array().unwrap(); + assert_eq!(input.len(), 1); + assert_eq!(input[0]["content"], "Are you there?"); + } + + #[test] + fn chatgpt_preserves_function_call_fields_with_id() { + let req: ResponsesAPIRequest = serde_json::from_value(serde_json::json!({ + "model": "gpt-5.5", + "input": [ + { + "type": "function_call", + "id": "fc_123", + "call_id": "call_123", + "name": "exec", + "arguments": "{}" + }, + {"type": "function_call_output", "call_id": "call_123", "output": "ok"}, + {"role": "user", "content": "continue"} + ], + "store": false, + "stream": true + })) + .unwrap(); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert_eq!(wire["input"][0]["type"], "function_call"); + assert_eq!(wire["input"][0]["id"], "fc_123"); + assert_eq!(wire["input"][0]["call_id"], "call_123"); + assert_eq!(wire["input"][0]["name"], "exec"); + assert_eq!(wire["input"][0]["arguments"], "{}"); + } + + #[test] + fn non_chatgpt_preserves_reasoning_item_reference_without_summary() { + let req: ResponsesAPIRequest = serde_json::from_value(serde_json::json!({ + "model": "gpt-5.5", + "input": [ + {"type": "reasoning", "id": "rs_123"}, + {"role": "user", "content": "Are you there?"} + ], + "stream": true + })) + .unwrap(); + let request = ProviderRequestType::ResponsesAPIRequest(req); + + let bytes = serialize_for_upstream(&request, ProviderId::OpenAI).unwrap(); + let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + + assert!(wire["input"][0].get("summary").is_none()); } // --------------------------------------------------------------- @@ -313,21 +527,21 @@ mod tests { let input = InputParam::Items(vec![ InputItem::Message(InputMessage { role: MessageRole::User, - content: MessageContent::Items(vec![ - InputContent::InputText { text: "Write a function".to_string() }, - ]), + content: MessageContent::Items(vec![InputContent::InputText { + text: "Write a function".to_string(), + }]), }), InputItem::Message(InputMessage { role: MessageRole::Assistant, - content: MessageContent::Items(vec![ - InputContent::InputText { text: "def hello(): pass".to_string() }, - ]), + content: MessageContent::Items(vec![InputContent::InputText { + text: "def hello(): pass".to_string(), + }]), }), InputItem::Message(InputMessage { role: MessageRole::User, - content: MessageContent::Items(vec![ - InputContent::InputText { text: "Add a docstring".to_string() }, - ]), + content: MessageContent::Items(vec![InputContent::InputText { + text: "Add a docstring".to_string(), + }]), }), ]); @@ -344,8 +558,12 @@ mod tests { // All content arrays flattened for (i, item) in wire["input"].as_array().unwrap().iter().enumerate() { let content = &item["content"]; - assert!(content.is_string(), - "input[{}].content should be a string, got: {}", i, content); + assert!( + content.is_string(), + "input[{}].content should be a string, got: {}", + i, + content + ); } } @@ -374,7 +592,10 @@ mod tests { let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap(); let wire: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); - assert_eq!(wire.get("max_completion_tokens").and_then(|v| v.as_i64()), Some(1024)); + assert_eq!( + wire.get("max_completion_tokens").and_then(|v| v.as_i64()), + Some(1024) + ); } // --------------------------------------------------------------- @@ -402,6 +623,9 @@ mod tests { assert_eq!(wire.get("maxTokens").and_then(|v| v.as_i64()), Some(8192)); assert_eq!(wire.get("store"), Some(&serde_json::Value::Bool(false))); assert_eq!(wire.get("stream"), Some(&serde_json::Value::Bool(true))); - assert!(wire["input"].is_array(), "input should be an array after normalize"); + assert!( + wire["input"].is_array(), + "input should be an array after normalize" + ); } } diff --git a/crates/hermesllm/src/transforms/request/from_openai.rs b/crates/hermesllm/src/transforms/request/from_openai.rs index 4f6b4113..bc851f1a 100644 --- a/crates/hermesllm/src/transforms/request/from_openai.rs +++ b/crates/hermesllm/src/transforms/request/from_openai.rs @@ -196,6 +196,7 @@ impl TryFrom for Vec { } InputItem::FunctionCall { item_type: _, + id: _, name, arguments, call_id, @@ -1423,6 +1424,7 @@ mod tests { }), InputItem::FunctionCall { item_type: "function_call".to_string(), + id: Some("fc_abc123".to_string()), name: "exec_command".to_string(), arguments: "{\"cmd\":\"pwd\"}".to_string(), call_id: "toolu_abc123".to_string(), diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 0ce10e9e..07f2682a 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -28,8 +28,7 @@ use hermesllm::providers::response::ProviderResponse; use hermesllm::providers::streaming_response::ProviderStreamResponse; use hermesllm::{ serialize_for_upstream, DecodedFrame, ProviderId, ProviderRequest, ProviderRequestType, - ProviderResponseType, - ProviderStreamResponseType, + ProviderResponseType, ProviderStreamResponseType, }; pub struct StreamContext { @@ -57,6 +56,7 @@ pub struct StreamContext { http_protocol: Option, sse_buffer: Option, sse_chunk_processor: Option, + deferred_responses_stream_body: Vec, } impl StreamContext { @@ -88,6 +88,7 @@ impl StreamContext { http_protocol: None, sse_buffer: None, sse_chunk_processor: None, + deferred_responses_stream_body: Vec::new(), } } @@ -261,6 +262,18 @@ impl StreamContext { self.set_http_request_header("content-length", None); } + fn force_identity_response_encoding_for_streams(&mut self) { + // The WASM gateway parses upstream SSE bytes directly. If the client's + // Accept-Encoding (for example Python requests' default gzip/deflate/br) + // is forwarded upstream, ChatGPT Codex can return compressed SSE chunks; + // those are not parseable as SSE here and can surface as a clean HTTP 200 + // with only lifecycle events reaching the client. Ask upstream for plain + // bytes whenever this filter is responsible for streaming transforms. + if self.streaming_response { + self.set_http_request_header("accept-encoding", Some("identity")); + } + } + fn save_ratelimit_header(&mut self) { self.ratelimit_selector = self .get_http_request_header(RATELIMIT_SELECTOR_HEADER_KEY) @@ -903,6 +916,7 @@ impl HttpContext for StreamContext { } self.delete_content_length_header(); + self.force_identity_response_encoding_for_streams(); self.save_ratelimit_header(); self.request_id = self.get_http_request_header(REQUEST_ID_HEADER); @@ -1083,14 +1097,15 @@ impl HttpContext for StreamContext { ); return Action::Pause; } - debug!( - "request_id={}: upstream request payload: {}", - self.request_identifier(), - String::from_utf8_lossy(&request.to_bytes().unwrap_or_default()) - ); - - match request.to_bytes() { - Ok(bytes) => bytes, + match serialize_for_upstream(&request, self.get_provider_id()) { + Ok(bytes) => { + debug!( + "request_id={}: upstream request payload: {}", + self.request_identifier(), + String::from_utf8_lossy(&bytes) + ); + bytes + } Err(e) => { warn!( "request_id={}: failed to serialize request body: {}", @@ -1106,9 +1121,7 @@ impl HttpContext for StreamContext { ); return Action::Pause; } - }; - - request_bytes + } } Err(e) => { warn!( @@ -1182,6 +1195,25 @@ impl HttpContext for StreamContext { self.request_identifier(), body_size ); + if self.streaming_response { + if let Some(buffer) = self.sse_buffer.as_mut() { + buffer.finalize_stream(); + let bytes = buffer.to_bytes(); + if matches!( + self.client_api, + Some(SupportedAPIsFromClient::OpenAIResponsesAPI(_)) + ) { + self.deferred_responses_stream_body + .extend_from_slice(&bytes); + let full_body = std::mem::take(&mut self.deferred_responses_stream_body); + if !full_body.is_empty() { + set_response_body_allow_growth(self, body_size, &full_body); + } + } else if !bytes.is_empty() { + self.set_http_response_body(0, body_size, &bytes); + } + } + } self.handle_end_of_request_metrics_and_traces(current_time); return Action::Continue; } @@ -1246,7 +1278,37 @@ impl HttpContext for StreamContext { if self.streaming_response { match self.handle_streaming_response(&body, provider_id) { Ok(serialized_body) => { - self.set_http_response_body(0, body_size, &serialized_body); + let terminal_bytes = if end_of_stream { + if let Some(buffer) = self.sse_buffer.as_mut() { + buffer.finalize_stream(); + buffer.to_bytes() + } else { + Vec::new() + } + } else { + Vec::new() + }; + if matches!( + self.client_api, + Some(SupportedAPIsFromClient::OpenAIResponsesAPI(_)) + ) { + self.deferred_responses_stream_body + .extend_from_slice(&serialized_body); + self.deferred_responses_stream_body + .extend_from_slice(&terminal_bytes); + if end_of_stream { + let full_body = + std::mem::take(&mut self.deferred_responses_stream_body); + set_response_body_allow_growth(self, body_size, &full_body); + } else { + self.set_http_response_body(0, body_size, &[]); + } + } else { + set_response_body_allow_growth(self, body_size, &serialized_body); + if !terminal_bytes.is_empty() { + self.set_http_response_body(serialized_body.len(), 0, &terminal_bytes); + } + } } Err(action) => return action, } @@ -1263,6 +1325,17 @@ impl HttpContext for StreamContext { } } +fn set_response_body_allow_growth(ctx: &dyn HttpContext, original_size: usize, body: &[u8]) { + if body.len() <= original_size { + ctx.set_http_response_body(0, original_size, body); + return; + } + + let (replacement, extra) = body.split_at(original_size); + ctx.set_http_response_body(0, original_size, replacement); + ctx.set_http_response_body(original_size, 0, extra); +} + fn current_time_ns() -> u128 { SystemTime::now() .duration_since(UNIX_EPOCH) From 2656af89bfdb5dd0ee0b5fa85fb03200f40a31c0 Mon Sep 17 00:00:00 2001 From: Tom Stoffer Date: Mon, 25 May 2026 21:28:06 +1200 Subject: [PATCH 7/7] fix(request_adapter): ensure normalization of ChatGPT responses request --- crates/hermesllm/src/providers/request.rs | 6 ++++-- crates/hermesllm/src/providers/request_adapter.rs | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index f652972f..c4cfd283 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -1025,7 +1025,7 @@ mod tests { input: InputParam::Text("Hello, Codex!".to_string()), temperature: None, max_output_tokens: Some(8192), - stream: Some(false), + stream: None, metadata: None, tools: None, tool_choice: None, @@ -1051,7 +1051,9 @@ mod tests { let upstream_api = SupportedUpstreamAPIs::OpenAIResponsesAPI(Responses); let mut request = ProviderRequestType::ResponsesAPIRequest(responses_req); - request.normalize_for_upstream(ProviderId::ChatGPT, &upstream_api); + request + .normalize_for_upstream(ProviderId::ChatGPT, &upstream_api) + .expect("ChatGPT responses request should normalize"); match request { ProviderRequestType::ResponsesAPIRequest(req) => { diff --git a/crates/hermesllm/src/providers/request_adapter.rs b/crates/hermesllm/src/providers/request_adapter.rs index 0e610e71..97cfb6a5 100644 --- a/crates/hermesllm/src/providers/request_adapter.rs +++ b/crates/hermesllm/src/providers/request_adapter.rs @@ -613,7 +613,9 @@ mod tests { let mut request = ProviderRequestType::ResponsesAPIRequest(req); // normalize_for_upstream sets store=false, stream=true, wraps input in Items - request.normalize_for_upstream(ProviderId::ChatGPT, &upstream_api); + request + .normalize_for_upstream(ProviderId::ChatGPT, &upstream_api) + .expect("ChatGPT responses request should normalize"); // serialize_for_upstream then renames max_output_tokens and flattens content let bytes = serialize_for_upstream(&request, ProviderId::ChatGPT).unwrap();