diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 9d4a2dfb..2c3a0124 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use common::configuration::{FilterPipeline, ModelAlias}; -use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER}; +use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, SESSION_ID_HEADER}; use common::llm_providers::LlmProviders; use hermesllm::apis::openai::Message; use hermesllm::apis::openai_responses::InputParam; @@ -92,6 +92,21 @@ async fn llm_chat_inner( let traceparent = extract_or_generate_traceparent(&request_headers); + // Session pinning: extract session ID and check cache before routing + let session_id: Option = request_headers + .get(SESSION_ID_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()); + let pinned_model: Option = if let Some(ref sid) = session_id { + state + .router_service + .get_cached_route(sid) + .await + .map(|c| c.model_name) + } else { + None + }; + let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path); // --- Phase 1: Parse and validate the incoming request --- @@ -242,46 +257,65 @@ async fn llm_chat_inner( } }; - // --- Phase 3: Route the request --- - let routing_span = info_span!( - "routing", - component = "routing", - http.method = "POST", - http.target = %request_path, - model.requested = %model_from_request, - model.alias_resolved = %alias_resolved_model, - route.selected_model = tracing::field::Empty, - routing.determination_ms = tracing::field::Empty, - ); - let routing_result = match async { - set_service_name(operation_component::ROUTING); - router_chat_get_upstream_model( - Arc::clone(&state.router_service), - client_request, - &traceparent, - &request_path, - &request_id, - inline_routing_policy, - ) - .await - } - .instrument(routing_span) - .await - { - Ok(result) => result, - Err(err) => { - let mut internal_error = Response::new(full(err.message)); - *internal_error.status_mut() = err.status_code; - return Ok(internal_error); - } - }; - - // Determine final model (router returns "none" when it doesn't select a specific model) - let router_selected_model = routing_result.model_name; - let resolved_model = if router_selected_model != "none" { - router_selected_model + // --- Phase 3: Route the request (or use pinned model from session cache) --- + let resolved_model = if let Some(cached_model) = pinned_model { + info!( + session_id = %session_id.as_deref().unwrap_or(""), + model = %cached_model, + "using pinned routing decision from cache" + ); + cached_model } else { - alias_resolved_model.clone() + let routing_span = info_span!( + "routing", + component = "routing", + http.method = "POST", + http.target = %request_path, + model.requested = %model_from_request, + model.alias_resolved = %alias_resolved_model, + route.selected_model = tracing::field::Empty, + routing.determination_ms = tracing::field::Empty, + ); + let routing_result = match async { + set_service_name(operation_component::ROUTING); + router_chat_get_upstream_model( + Arc::clone(&state.router_service), + client_request, + &traceparent, + &request_path, + &request_id, + inline_routing_policy, + ) + .await + } + .instrument(routing_span) + .await + { + Ok(result) => result, + Err(err) => { + let mut internal_error = Response::new(full(err.message)); + *internal_error.status_mut() = err.status_code; + return Ok(internal_error); + } + }; + + let (router_selected_model, route_name) = + (routing_result.model_name, routing_result.route_name); + let model = if router_selected_model != "none" { + router_selected_model + } else { + alias_resolved_model.clone() + }; + + // Cache the routing decision so subsequent requests with the same session ID are pinned + if let Some(ref sid) = session_id { + state + .router_service + .cache_route(sid.clone(), model.clone(), route_name) + .await; + } + + model }; tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str()); diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index e075f7b0..bdf54ab6 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -162,15 +162,9 @@ async fn init_app_state( .map(|p| p.name.clone()) .unwrap_or_else(|| DEFAULT_ROUTING_LLM_PROVIDER.to_string()); - let session_ttl_seconds = config - .routing - .as_ref() - .and_then(|r| r.session_ttl_seconds); + let session_ttl_seconds = config.routing.as_ref().and_then(|r| r.session_ttl_seconds); - let session_max_entries = config - .routing - .as_ref() - .and_then(|r| r.session_max_entries); + let session_max_entries = config.routing.as_ref().and_then(|r| r.session_max_entries); let router_service = Arc::new(RouterService::new( config.model_providers.clone(), diff --git a/demos/llm_routing/session_pinning/README.md b/demos/llm_routing/session_pinning/README.md index 500d8ef6..a84d440e 100644 --- a/demos/llm_routing/session_pinning/README.md +++ b/demos/llm_routing/session_pinning/README.md @@ -4,18 +4,18 @@ ## Why Session Pinning? -When an agent runs in a loop — research → plan → implement → review → refine — each iteration hits Plano's router independently. Since the prompts vary in intent, the router may select **different models** for each step, breaking consistency mid-workflow. +When an agent runs in a loop — research → analyse → implement → evaluate → summarise — each step hits Plano's router independently. Because prompts vary in intent, the router may select **different models** for each step, fragmenting context mid-session. -**Session pinning** solves this: send an `X-Session-Id` header and the first request runs routing as usual, caching the decision. Every subsequent request with the same session ID returns the **same model** instantly (`"pinned": true`), without re-running the router. +**Session pinning** solves this: send an `X-Session-Id` header and the first request runs routing as usual, caching the decision. Every subsequent request with the same session ID returns the **same model**, without re-running the router. ``` Without pinning With pinning (X-Session-Id) -───────────────── ─────────────────────────── -Step 1 → Claude (code_generation) Step 1 → Claude (code_generation) ← routed -Step 2 → GPT-4o (complex_reasoning) Step 2 → Claude (pinned ✓) -Step 3 → Claude (code_generation) Step 3 → Claude (pinned ✓) -Step 4 → GPT-4o (complex_reasoning) Step 4 → Claude (pinned ✓) -Step 5 → Claude (code_generation) Step 5 → Claude (pinned ✓) +───────────────── ────────────────────────── +Step 1 → claude-sonnet (code_gen) Step 1 → claude-sonnet ← routed +Step 2 → gpt-4o (reasoning) Step 2 → claude-sonnet ← pinned ✓ +Step 3 → claude-sonnet (code_gen) Step 3 → claude-sonnet ← pinned ✓ +Step 4 → gpt-4o (reasoning) Step 4 → claude-sonnet ← pinned ✓ +Step 5 → claude-sonnet (code_gen) Step 5 → claude-sonnet ← pinned ✓ ↑ model switches every step ↑ one model, start to finish ``` @@ -32,53 +32,106 @@ export ANTHROPIC_API_KEY= cd demos/llm_routing/session_pinning planoai up config.yaml -# 3. Run the demo -./demo.sh # or: python3 demo.py +# 3. Run the demo (uv manages dependencies automatically) +./demo.sh # or: uv run demo.py ``` --- ## What the Demo Does -The script simulates an agent building a task management app in **5 iterative steps**, deliberately mixing intents: +A **Database Research Agent** investigates whether to use PostgreSQL or MongoDB +for an e-commerce platform. It runs 5 steps, each building on prior findings via +accumulated message history. Steps alternate between `code_generation` and +`complex_reasoning` intents so Plano routes to different models without pinning. -| Step | Prompt | Intent | -|:----:|--------|--------| -| 1 | Design a REST API schema for a task management app… | code generation | -| 2 | Analyze SQL vs NoSQL trade-offs for this system… | complex reasoning | -| 3 | Write the SQLAlchemy database models… | code generation | -| 4 | Review the API design for security vulnerabilities… | complex reasoning | -| 5 | Implement JWT authentication middleware… | code generation | +| Step | Task | Intent | +|:----:|------|--------| +| 1 | List technical requirements | code_generation → claude-sonnet | +| 2 | Compare PostgreSQL vs MongoDB | complex_reasoning → gpt-4o | +| 3 | Write schema (CREATE TABLE) | code_generation → claude-sonnet | +| 4 | Assess scalability trade-offs | complex_reasoning → gpt-4o | +| 5 | Write final recommendation report | code_generation → claude-sonnet | -It runs this loop **twice** against the `/routing/v1/chat/completions` endpoint (routing decisions only — no actual LLM calls): +The demo runs the loop **twice** against `/v1/chat/completions` using the +[OpenAI SDK](https://github.com/openai/openai-python): -1. **Without pinning** — no `X-Session-Id` header; models switch between steps -2. **With pinning** — `X-Session-Id` header included; the model selected in step 1 is reused for all 5 steps +1. **Without pinning** — no `X-Session-Id`; models alternate per step +2. **With pinning** — `X-Session-Id` header included; model is pinned from step 1 + +Each step makes real LLM calls. Step 5's report explicitly references findings +from earlier steps, demonstrating why coherent context requires a consistent model. ### Expected Output ``` -══════════════════════════════════════════════════════════════════ Run 1: WITHOUT Session Pinning -────────────────────────────────────────────────────────────────── - Step 1: Design a REST API schema… → anthropic/claude-sonnet-4-20250514 - Step 2: Analyze SQL vs NoSQL… → openai/gpt-4o - Step 3: Write SQLAlchemy models… → anthropic/claude-sonnet-4-20250514 - Step 4: Review API for security… → openai/gpt-4o - Step 5: Implement JWT auth… → anthropic/claude-sonnet-4-20250514 + ───────────────────────────────────────────────────────────────────── + step 1 [claude-sonnet-4-20250514] List requirements + "Critical requirements: 1. ACID transactions for order integrity…" - ✗ Models varied: anthropic/claude-sonnet-4-20250514, openai/gpt-4o + step 2 [gpt-4o ] Compare databases ← switched + "PostgreSQL excels at joins and ACID guarantees…" -══════════════════════════════════════════════════════════════════ - Run 2: WITH Session Pinning (X-Session-Id: a1b2c3d4-…) -────────────────────────────────────────────────────────────────── - Step 1: Design a REST API schema… → anthropic/claude-sonnet-4-20250514 (pinned=false) - Step 2: Analyze SQL vs NoSQL… → anthropic/claude-sonnet-4-20250514 (pinned=true) - Step 3: Write SQLAlchemy models… → anthropic/claude-sonnet-4-20250514 (pinned=true) - Step 4: Review API for security… → anthropic/claude-sonnet-4-20250514 (pinned=true) - Step 5: Implement JWT auth… → anthropic/claude-sonnet-4-20250514 (pinned=true) + step 3 [claude-sonnet-4-20250514] Write schema ← switched + "CREATE TABLE orders (\n id SERIAL PRIMARY KEY…" - ✓ All 5 steps routed to anthropic/claude-sonnet-4-20250514 + step 4 [gpt-4o ] Assess scalability ← switched + "At high write volume, PostgreSQL row-level locking…" + + step 5 [claude-sonnet-4-20250514] Write report ← switched + "RECOMMENDATION: PostgreSQL is the right choice…" + + ✗ Without pinning: model switched 4 time(s) — gpt-4o, claude-sonnet-4-20250514 + + + Run 2: WITH Session Pinning (X-Session-Id: a1b2c3d4…) + ───────────────────────────────────────────────────────────────────── + step 1 [claude-sonnet-4-20250514] List requirements + "Critical requirements: 1. ACID transactions for order integrity…" + + step 2 [claude-sonnet-4-20250514] Compare databases + "Building on the requirements I just outlined: PostgreSQL…" + + step 3 [claude-sonnet-4-20250514] Write schema + "Following the comparison above, here is the PostgreSQL schema…" + + step 4 [claude-sonnet-4-20250514] Assess scalability + "Given the schema I designed, PostgreSQL's row-level locking…" + + step 5 [claude-sonnet-4-20250514] Write report + "RECOMMENDATION: Based on my analysis of requirements, comparison…" + + ✓ With pinning: claude-sonnet-4-20250514 held for all 5 steps + + ══ Final Report (pinned session) ═════════════════════════════════════ + RECOMMENDATION: Based on my analysis of requirements, the head-to-head + comparison, the schema I designed, and the scalability trade-offs… + ══════════════════════════════════════════════════════════════════════ +``` + +### How It Works + +Session pinning is implemented in brightstaff. When `X-Session-Id` is present: + +1. **First request** — routing runs normally, result is cached keyed by session ID +2. **Subsequent requests** — cache hit skips routing and returns the cached model instantly + +The `X-Session-Id` header is forwarded transparently; no changes to your OpenAI +SDK calls beyond adding the header. + +```python +from openai import OpenAI + +client = OpenAI(base_url="http://localhost:12000/v1", api_key="EMPTY") + +session_id = str(uuid.uuid4()) + +response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": prompt}], + extra_headers={"X-Session-Id": session_id}, # pin the session +) ``` --- @@ -93,10 +146,11 @@ routing: session_max_entries: 10000 # Max cached sessions before LRU eviction ``` -Without the `X-Session-Id` header, routing runs fresh every time — no breaking change to existing clients. +Without the `X-Session-Id` header, routing runs fresh every time — no breaking +change to existing clients. --- ## See Also -- [Model Routing Service Demo](../model_routing_service/) — curl-based examples of the routing endpoint and session pinning +- [Model Routing Service Demo](../model_routing_service/) — curl-based examples of the routing endpoint diff --git a/demos/llm_routing/session_pinning/agent.py b/demos/llm_routing/session_pinning/agent.py new file mode 100644 index 00000000..ffb553d3 --- /dev/null +++ b/demos/llm_routing/session_pinning/agent.py @@ -0,0 +1,429 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.12" +# dependencies = ["fastapi>=0.115", "uvicorn>=0.30", "openai>=1.0.0"] +# /// +""" +Research Agent — FastAPI service exposing /v1/chat/completions. + +For each incoming request the agent runs 3 independent research tasks, +each with its own tool-calling loop. The tasks deliberately alternate between +code_generation and complex_reasoning intents so Plano's preference-based +router selects different models for each task. + +If the client sends X-Session-Id, the agent forwards it on every outbound +call to Plano. The first task pins the model; all subsequent tasks skip the +router and reuse it — keeping the whole session on one consistent model. + +Run standalone: + uv run agent.py + PLANO_URL=http://myhost:12000 AGENT_PORT=8000 uv run agent.py +""" + +import json +import logging +import os +import uuid + +import uvicorn +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +from openai import AsyncOpenAI +from openai.types.chat import ChatCompletionMessageParam + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [AGENT] %(levelname)s %(message)s", +) +log = logging.getLogger(__name__) + +PLANO_URL = os.environ.get("PLANO_URL", "http://localhost:12000") +PORT = int(os.environ.get("AGENT_PORT", "8000")) + +# --------------------------------------------------------------------------- +# Tasks — each has its own conversation so Plano routes each independently. +# Intent alternates: code_generation → complex_reasoning → code_generation. +# --------------------------------------------------------------------------- + +TASKS = [ + { + "name": "generate_comparison", + # Triggers code_generation routing preference (write/generate output) + "prompt": ( + "Use the tools to fetch benchmark data for PostgreSQL and MongoDB " + "under a mixed workload. Then generate a compact Markdown comparison " + "table with columns: metric, PostgreSQL, MongoDB. Cover read QPS, " + "write QPS, p99 latency ms, ACID support, and horizontal scaling." + ), + }, + { + "name": "analyse_tradeoffs", + # Triggers complex_reasoning routing preference (analyse/reason/evaluate) + "prompt": ( + "Context from prior research:\n{context}\n\n" + "Perform a deep analysis: for a high-traffic e-commerce platform that " + "requires ACID guarantees for order processing but flexible schemas for " + "product attributes, carefully reason through and evaluate the long-term " + "architectural trade-offs of each database. Consider consistency " + "guarantees, operational complexity, and scalability risks." + ), + }, + { + "name": "write_schema", + # Triggers code_generation routing preference (write SQL / generate code) + "prompt": ( + "Context from prior research:\n{context}\n\n" + "Write the CREATE TABLE SQL schema for the database you would recommend " + "from the analysis above. Include: orders, order_items, products, and " + "users tables with appropriate primary keys, foreign keys, and indexes." + ), + }, +] + +SYSTEM_PROMPT = ( + "You are a database selection analyst for an e-commerce platform. " + "Use the available tools when you need data. " + "Be concise — each response should be a compact table, code block, " + "or 3–5 clear sentences." +) + +# --------------------------------------------------------------------------- +# Tool definitions +# --------------------------------------------------------------------------- + +TOOLS = [ + { + "type": "function", + "function": { + "name": "get_db_benchmarks", + "description": ( + "Fetch performance benchmark data for a database. " + "Returns read/write throughput, latency, and scaling characteristics." + ), + "parameters": { + "type": "object", + "properties": { + "database": { + "type": "string", + "enum": ["postgresql", "mongodb"], + }, + "workload": { + "type": "string", + "enum": ["read_heavy", "write_heavy", "mixed"], + }, + }, + "required": ["database", "workload"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_case_studies", + "description": "Retrieve e-commerce case studies for a database.", + "parameters": { + "type": "object", + "properties": { + "database": {"type": "string", "enum": ["postgresql", "mongodb"]}, + }, + "required": ["database"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "check_feature_support", + "description": ( + "Check whether a database supports a specific feature " + "(e.g. ACID transactions, horizontal sharding, JSON documents)." + ), + "parameters": { + "type": "object", + "properties": { + "database": {"type": "string", "enum": ["postgresql", "mongodb"]}, + "feature": {"type": "string"}, + }, + "required": ["database", "feature"], + }, + }, + }, +] + +# --------------------------------------------------------------------------- +# Tool implementations (simulated — no external calls) +# --------------------------------------------------------------------------- + +_BENCHMARKS = { + ("postgresql", "read_heavy"): { + "read_qps": 55_000, + "write_qps": 18_000, + "p99_ms": 4, + "notes": "Excellent for complex joins; connection pooling via pgBouncer recommended", + }, + ("postgresql", "write_heavy"): { + "read_qps": 30_000, + "write_qps": 24_000, + "p99_ms": 8, + "notes": "WAL overhead increases at very high write volume; partitioning helps", + }, + ("postgresql", "mixed"): { + "read_qps": 42_000, + "write_qps": 21_000, + "p99_ms": 6, + "notes": "Solid all-round; MVCC keeps reads non-blocking", + }, + ("mongodb", "read_heavy"): { + "read_qps": 85_000, + "write_qps": 30_000, + "p99_ms": 2, + "notes": "Atlas Search built-in; sharding distributes read load well", + }, + ("mongodb", "write_heavy"): { + "read_qps": 40_000, + "write_qps": 65_000, + "p99_ms": 3, + "notes": "WiredTiger compression reduces I/O; journal writes are async-safe", + }, + ("mongodb", "mixed"): { + "read_qps": 60_000, + "write_qps": 50_000, + "p99_ms": 3, + "notes": "Flexible schema accelerates feature iteration", + }, +} + +_CASE_STUDIES = { + "postgresql": [ + { + "company": "Shopify", + "scale": "100 B+ req/day", + "notes": "Moved critical order tables back to Postgres for ACID guarantees", + }, + { + "company": "Zalando", + "scale": "50 M customers", + "notes": "Uses Postgres + Citus for sharded order processing", + }, + { + "company": "Instacart", + "scale": "10 M orders/mo", + "notes": "Postgres for inventory; strict consistency required for stock levels", + }, + ], + "mongodb": [ + { + "company": "eBay", + "scale": "1.5 B listings", + "notes": "Product catalogue in MongoDB for flexible attribute schemas", + }, + { + "company": "Alibaba", + "scale": "billions of docs", + "notes": "Session and cart data in MongoDB; high write throughput", + }, + { + "company": "Foursquare", + "scale": "10 B+ check-ins", + "notes": "Geospatial queries and flexible location schemas", + }, + ], +} + +_FEATURES = { + ("postgresql", "acid transactions"): { + "supported": True, + "notes": "Full ACID with serialisable isolation", + }, + ("postgresql", "horizontal sharding"): { + "supported": True, + "notes": "Via Citus extension or manual partitioning; not native", + }, + ("postgresql", "json documents"): { + "supported": True, + "notes": "JSONB with indexing; flexible but slower than native doc store", + }, + ("postgresql", "full-text search"): { + "supported": True, + "notes": "Built-in tsvector/tsquery; Elasticsearch for advanced use cases", + }, + ("postgresql", "multi-document transactions"): { + "supported": True, + "notes": "Native cross-table ACID", + }, + ("mongodb", "acid transactions"): { + "supported": True, + "notes": "Multi-document ACID since v4.0; single-doc always atomic", + }, + ("mongodb", "horizontal sharding"): { + "supported": True, + "notes": "Native sharding; auto-balancing across shards", + }, + ("mongodb", "json documents"): { + "supported": True, + "notes": "Native BSON document model; schema-free by default", + }, + ("mongodb", "full-text search"): { + "supported": True, + "notes": "Atlas Search (Lucene-based) for advanced full-text", + }, + ("mongodb", "multi-document transactions"): { + "supported": True, + "notes": "Available but adds latency; best avoided on hot paths", + }, +} + + +def _dispatch(name: str, args: dict) -> str: + if name == "get_db_benchmarks": + key = (args["database"].lower(), args["workload"].lower()) + return json.dumps(_BENCHMARKS.get(key, {"error": f"no data for {key}"})) + + if name == "get_case_studies": + db = args["database"].lower() + return json.dumps(_CASE_STUDIES.get(db, {"error": f"unknown db '{db}'"})) + + if name == "check_feature_support": + key = (args["database"].lower(), args["feature"].lower()) + for k, v in _FEATURES.items(): + if k[0] == key[0] and k[1] in key[1]: + return json.dumps(v) + return json.dumps({"error": f"feature '{args['feature']}' not in dataset"}) + + return json.dumps({"error": f"unknown tool '{name}'"}) + + +# --------------------------------------------------------------------------- +# Task runner — one independent conversation per task +# --------------------------------------------------------------------------- + + +async def run_task( + client: AsyncOpenAI, + task_name: str, + prompt: str, + session_id: str | None, +) -> tuple[str, str]: + """ + Run a single research task with its own tool-calling loop. + + Each task is an independent conversation so the router sees only + this task's intent — not the accumulated context of previous tasks. + Session pinning via X-Session-Id pins the model from the first task + onward, so all tasks stay on the same model. + + Returns (answer, first_model_used). + """ + headers = {"X-Session-Id": session_id} if session_id else {} + messages: list[ChatCompletionMessageParam] = [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": prompt}, + ] + first_model: str | None = None + + while True: + resp = await client.chat.completions.create( + model="gpt-4o-mini", # Plano's router overrides this via routing_preferences + messages=messages, + tools=TOOLS, + tool_choice="auto", + max_completion_tokens=600, + extra_headers=headers or None, + ) + if first_model is None: + first_model = resp.model + + log.info( + "task=%s model=%s finish=%s", + task_name, + resp.model, + resp.choices[0].finish_reason, + ) + + choice = resp.choices[0] + if choice.finish_reason == "tool_calls" and choice.message.tool_calls: + messages.append(choice.message) + for tc in choice.message.tool_calls: + args = json.loads(tc.function.arguments or "{}") + result = _dispatch(tc.function.name, args) + log.info(" tool %s(%s)", tc.function.name, args) + messages.append( + {"role": "tool", "content": result, "tool_call_id": tc.id} + ) + else: + return (choice.message.content or "").strip(), first_model or "unknown" + + +# --------------------------------------------------------------------------- +# Research loop — runs all tasks, threading context forward +# --------------------------------------------------------------------------- + + +async def run_research_loop( + client: AsyncOpenAI, + session_id: str | None, +) -> tuple[str, list[dict]]: + """ + Run all 3 research tasks in sequence, passing each task's output as + context to the next. Returns (final_answer, routing_trace). + """ + context = "" + trace: list[dict] = [] + final_answer = "" + + for task in TASKS: + prompt = task["prompt"].format(context=context) + answer, model = await run_task(client, task["name"], prompt, session_id) + trace.append({"task": task["name"], "model": model}) + context += f"\n### {task['name']}\n{answer}\n" + final_answer = answer + + return final_answer, trace + + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- + +app = FastAPI(title="Research Agent", version="1.0.0") + + +@app.post("/v1/chat/completions") +async def chat(request: Request) -> JSONResponse: + body = await request.json() + session_id: str | None = request.headers.get("x-session-id") + + log.info("request session_id=%s", session_id or "none") + + client = AsyncOpenAI(base_url=f"{PLANO_URL}/v1", api_key="EMPTY") + answer, trace = await run_research_loop(client, session_id) + + return JSONResponse( + { + "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", + "object": "chat.completion", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": answer}, + "finish_reason": "stop", + } + ], + "routing_trace": trace, + "session_id": session_id, + } + ) + + +@app.get("/health") +async def health() -> dict: + return {"status": "ok", "plano_url": PLANO_URL} + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + log.info("starting on port %d plano=%s", PORT, PLANO_URL) + uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="warning") diff --git a/demos/llm_routing/session_pinning/demo.py b/demos/llm_routing/session_pinning/demo.py index 7bfb74a0..fdf7634b 100644 --- a/demos/llm_routing/session_pinning/demo.py +++ b/demos/llm_routing/session_pinning/demo.py @@ -1,150 +1,174 @@ -#!/usr/bin/env python3 +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.12" +# dependencies = ["httpx>=0.27"] +# /// """ -Session Pinning Demo — Iterative Research Agent +Session Pinning Demo — Research Agent client -Demonstrates how session pinning ensures consistent model selection -across multiple iterations of an agentic loop. Runs the same 5-step -research workflow twice: +Sends the same query to the Research Agent twice — once without a session ID +and once with one — and compares the routing trace to show how session pinning +keeps the model consistent across the LLM's tool-calling loop. - 1) Without session pinning — models may switch between iterations - 2) With session pinning — first iteration pins the model for all subsequent ones +Requires the agent to already be running (start it with ./start_agents.sh). -Uses the /routing/v1/chat/completions endpoint (routing decisions only, no LLM calls). +Usage: + uv run demo.py + AGENT_URL=http://localhost:8000 uv run demo.py """ -import json +import asyncio import os -import urllib.request import uuid -PLANO_URL = os.environ.get("PLANO_URL", "http://localhost:12000") +import httpx -# Simulates an iterative research agent building a task management app. -# Prompts deliberately alternate between code_generation and complex_reasoning -# intents so that without pinning, different models get selected per step. -RESEARCH_STEPS = [ - "Design a REST API schema for a task management app with users, projects, and tasks", - "Analyze the trade-offs between SQL and NoSQL databases for this task management system", - "Write the database models and ORM setup in Python using SQLAlchemy", - "Review the API design for security vulnerabilities and suggest improvements", - "Implement the authentication middleware with JWT tokens", -] +AGENT_URL = os.environ.get("AGENT_URL", "http://localhost:8000") + +QUERY = ( + "Should we use PostgreSQL or MongoDB for a high-traffic e-commerce backend " + "that needs strong consistency for orders but flexible schemas for products?" +) -STEP_LABELS = [ - "Design REST API schema", - "Analyze SQL vs NoSQL trade-offs", - "Write SQLAlchemy database models", - "Review API security vulnerabilities", - "Implement JWT auth middleware", -] +# --------------------------------------------------------------------------- +# Client helpers +# --------------------------------------------------------------------------- -def run_research_loop(session_id=None): - """Run the research agent loop, optionally with session pinning.""" - results = [] +async def wait_for_agent(timeout: int = 30) -> bool: + async with httpx.AsyncClient() as client: + for _ in range(timeout * 2): + try: + r = await client.get(f"{AGENT_URL}/health", timeout=1.0) + if r.status_code == 200: + return True + except Exception: + pass + await asyncio.sleep(0.5) + return False - for i, prompt in enumerate(RESEARCH_STEPS, 1): - headers = {"Content-Type": "application/json"} - if session_id: - headers["X-Session-Id"] = session_id - payload = { - "model": "gpt-4o-mini", - "messages": [{"role": "user", "content": prompt}], - } +async def ask_agent(query: str, session_id: str | None = None) -> dict: + headers: dict[str, str] = {} + if session_id: + headers["X-Session-Id"] = session_id - resp = urllib.request.urlopen( - urllib.request.Request( - f"{PLANO_URL}/routing/v1/chat/completions", - data=json.dumps(payload).encode(), - headers=headers, - ), - timeout=10, + async with httpx.AsyncClient(timeout=120.0) as client: + r = await client.post( + f"{AGENT_URL}/v1/chat/completions", + headers=headers, + json={"messages": [{"role": "user", "content": query}]}, ) - data = json.loads(resp.read()) - - model = data.get("model", "unknown") - route = data.get("route") or "none" - pinned = data.get("pinned") - - results.append({"step": i, "model": model, "route": route, "pinned": pinned}) - - return results + r.raise_for_status() + return r.json() -def print_results_table(results): - """Print results as a compact aligned table.""" - label_width = max(len(l) for l in STEP_LABELS) - for r in results: - step = r["step"] - label = STEP_LABELS[step - 1] - model = r["model"] - pinned = r["pinned"] - - # Shorten model names for readability - short_model = model.replace("anthropic/", "").replace("openai/", "") - - pin_indicator = "" - if pinned is True: - pin_indicator = " ◀ pinned" - elif pinned is False: - pin_indicator = " ◀ routed" - - print(f" {step}. {label:<{label_width}} → {short_model}{pin_indicator}") +# --------------------------------------------------------------------------- +# Display helpers +# --------------------------------------------------------------------------- -def print_summary(label, results): - """Print a one-line summary of model consistency.""" - models = [r["model"] for r in results] - unique = sorted(set(models)) +def _short(model: str) -> str: + return model.split("/")[-1] if "/" in model else model + + +def _print_trace(result: dict) -> None: + trace = result.get("routing_trace", []) + if not trace: + print(" (no trace)") + return + + prev: str | None = None + for t in trace: + short = _short(t["model"]) + switch = " ← switched" if (prev and t["model"] != prev) else "" + prev = t["model"] + print(f" {t['task']:<26} [{short}]{switch}") + + +def _print_summary(label: str, result: dict) -> None: + models = [t["model"] for t in result.get("routing_trace", [])] + if not models: + print(f" ? {label}: no routing data") + return + unique = set(models) if len(unique) == 1: - short = models[0].replace("anthropic/", "").replace("openai/", "") - print(f" ✓ {label}: All 5 steps → {short}") + print(f" ✓ {label}: {_short(next(iter(unique)))} for all {len(models)} turns") else: - short = [m.replace("anthropic/", "").replace("openai/", "") for m in unique] - print(f" ✗ {label}: Models varied → {', '.join(short)}") + switched = sum(1 for a, b in zip(models, models[1:]) if a != b) + names = ", ".join(sorted(_short(m) for m in unique)) + print(f" ✗ {label}: model switched {switched} time(s) — {names}") -def main(): +# --------------------------------------------------------------------------- +# Demo +# --------------------------------------------------------------------------- + + +async def main() -> None: print() print(" ╔══════════════════════════════════════════════════════════════╗") - print(" ║ Session Pinning Demo — Iterative Research Agent ║") + print(" ║ Session Pinning Demo — Research Agent ║") print(" ╚══════════════════════════════════════════════════════════════╝") print() - print(" An agent builds a task management app in 5 steps.") - print(" Each step asks Plano's router which model to use.") + print(f" Agent : {AGENT_URL}") + print(f" Query : \"{QUERY[:72]}…\"") + print() + print(" The agent uses a tool-calling loop (get_db_benchmarks,") + print(" get_case_studies, check_feature_support) to research the") + print(" question. Each LLM turn hits Plano's preference-based router.") print() - # --- Run 1: Without session pinning --- - print(" ┌──────────────────────────────────────────────────────────────┐") - print(" │ Run 1: WITHOUT Session Pinning │") - print(" └──────────────────────────────────────────────────────────────┘") - print() - results_no_pin = run_research_loop(session_id=None) - print_results_table(results_no_pin) + print(f" Waiting for agent at {AGENT_URL}…", end=" ", flush=True) + if not await wait_for_agent(): + print("FAILED — agent did not respond within 30 s") + return + print("ready.") print() - # --- Run 2: With session pinning --- - session_id = str(uuid.uuid4()) - short_sid = session_id[:8] - print(f" ┌──────────────────────────────────────────────────────────────┐") - print(f" │ Run 2: WITH Session Pinning (session: {short_sid}…) │") - print(f" └──────────────────────────────────────────────────────────────┘") + sid = str(uuid.uuid4()) + print(" Sending queries (running concurrently)…") print() - results_pinned = run_research_loop(session_id=session_id) - print_results_table(results_pinned) + without, with_pin = await asyncio.gather( + ask_agent(QUERY, session_id=None), + ask_agent(QUERY, session_id=sid), + ) + + # ── Run 1 ──────────────────────────────────────────────────────────── + print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + print(" Run 1: WITHOUT Session Pinning") + print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + print() + print(" LLM turns inside the agent loop:") + print() + _print_trace(without) + print() + _print_summary("Without pinning", without) print() - # --- Summary --- - print(" ┌──────────────────────────────────────────────────────────────┐") - print(" │ Summary │") - print(" └──────────────────────────────────────────────────────────────┘") + # ── Run 2 ──────────────────────────────────────────────────────────── + print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + print(f" Run 2: WITH Session Pinning (X-Session-Id: {sid[:8]}…)") + print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") print() - print_summary("Without pinning", results_no_pin) - print_summary("With pinning ", results_pinned) + print(" LLM turns inside the agent loop:") + print() + _print_trace(with_pin) + print() + _print_summary("With pinning ", with_pin) + print() + + # ── Final answer ───────────────────────────────────────────────────── + answer = with_pin["choices"][0]["message"]["content"] + print(" ══ Agent recommendation (pinned session) ═════════════════════") + print() + for line in answer.splitlines(): + print(f" {line}") + print() + print(" ══════════════════════════════════════════════════════════════") print() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/demos/llm_routing/session_pinning/demo.sh b/demos/llm_routing/session_pinning/demo.sh index 882144b0..210fd136 100755 --- a/demos/llm_routing/session_pinning/demo.sh +++ b/demos/llm_routing/session_pinning/demo.sh @@ -3,9 +3,17 @@ set -e SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" export PLANO_URL="${PLANO_URL:-http://localhost:12000}" +export AGENT_PORT="${AGENT_PORT:-8000}" +export AGENT_URL="http://localhost:$AGENT_PORT" -echo "Running session pinning demo..." -echo "PLANO_URL=$PLANO_URL" -echo "" +cleanup() { + [ -n "$AGENT_PID" ] && kill "$AGENT_PID" 2>/dev/null +} +trap cleanup EXIT INT TERM -python3 "$SCRIPT_DIR/demo.py" +# Start the agent in the background +"$SCRIPT_DIR/start_agents.sh" & +AGENT_PID=$! + +# Run the demo client +uv run "$SCRIPT_DIR/demo.py" diff --git a/demos/llm_routing/session_pinning/start_agents.sh b/demos/llm_routing/session_pinning/start_agents.sh new file mode 100755 index 00000000..5baaa378 --- /dev/null +++ b/demos/llm_routing/session_pinning/start_agents.sh @@ -0,0 +1,28 @@ +#!/bin/bash +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PIDS=() + +log() { echo "$(date '+%F %T') - $*"; } + +cleanup() { + log "Stopping agents..." + for PID in "${PIDS[@]}"; do + kill "$PID" 2>/dev/null && log "Stopped process $PID" + done + exit 0 +} + +trap cleanup EXIT INT TERM + +export PLANO_URL="${PLANO_URL:-http://localhost:12000}" +export AGENT_PORT="${AGENT_PORT:-8000}" + +log "Starting research_agent on port $AGENT_PORT..." +uv run "$SCRIPT_DIR/agent.py" & +PIDS+=($!) + +for PID in "${PIDS[@]}"; do + wait "$PID" +done