add session pinning to llm_chat handler and rewrite session pinning demo

- extend brightstaff llm_chat_inner to extract X-Session-Id, check the
  session cache before routing, and cache the result afterward — same
  pattern as routing_service.rs
- replace old urllib-based demo with a real FastAPI research agent that
  runs 3 independent tool-calling tasks with alternating intents so
  Plano routes to different models; demo.py is a pure httpx client that
  shows the routing trace side-by-side with and without session pinning
This commit is contained in:
Adil Hafeez 2026-03-26 16:44:05 -07:00
parent 71437d2b2c
commit 0105897692
7 changed files with 771 additions and 200 deletions

View file

@ -1,6 +1,6 @@
use bytes::Bytes;
use common::configuration::{FilterPipeline, ModelAlias};
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER};
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, SESSION_ID_HEADER};
use common::llm_providers::LlmProviders;
use hermesllm::apis::openai::Message;
use hermesllm::apis::openai_responses::InputParam;
@ -92,6 +92,21 @@ async fn llm_chat_inner(
let traceparent = extract_or_generate_traceparent(&request_headers);
// Session pinning: extract session ID and check cache before routing
let session_id: Option<String> = request_headers
.get(SESSION_ID_HEADER)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
let pinned_model: Option<String> = if let Some(ref sid) = session_id {
state
.router_service
.get_cached_route(sid)
.await
.map(|c| c.model_name)
} else {
None
};
let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path);
// --- Phase 1: Parse and validate the incoming request ---
@ -242,46 +257,65 @@ async fn llm_chat_inner(
}
};
// --- Phase 3: Route the request ---
let routing_span = info_span!(
"routing",
component = "routing",
http.method = "POST",
http.target = %request_path,
model.requested = %model_from_request,
model.alias_resolved = %alias_resolved_model,
route.selected_model = tracing::field::Empty,
routing.determination_ms = tracing::field::Empty,
);
let routing_result = match async {
set_service_name(operation_component::ROUTING);
router_chat_get_upstream_model(
Arc::clone(&state.router_service),
client_request,
&traceparent,
&request_path,
&request_id,
inline_routing_policy,
)
.await
}
.instrument(routing_span)
.await
{
Ok(result) => result,
Err(err) => {
let mut internal_error = Response::new(full(err.message));
*internal_error.status_mut() = err.status_code;
return Ok(internal_error);
}
};
// Determine final model (router returns "none" when it doesn't select a specific model)
let router_selected_model = routing_result.model_name;
let resolved_model = if router_selected_model != "none" {
router_selected_model
// --- Phase 3: Route the request (or use pinned model from session cache) ---
let resolved_model = if let Some(cached_model) = pinned_model {
info!(
session_id = %session_id.as_deref().unwrap_or(""),
model = %cached_model,
"using pinned routing decision from cache"
);
cached_model
} else {
alias_resolved_model.clone()
let routing_span = info_span!(
"routing",
component = "routing",
http.method = "POST",
http.target = %request_path,
model.requested = %model_from_request,
model.alias_resolved = %alias_resolved_model,
route.selected_model = tracing::field::Empty,
routing.determination_ms = tracing::field::Empty,
);
let routing_result = match async {
set_service_name(operation_component::ROUTING);
router_chat_get_upstream_model(
Arc::clone(&state.router_service),
client_request,
&traceparent,
&request_path,
&request_id,
inline_routing_policy,
)
.await
}
.instrument(routing_span)
.await
{
Ok(result) => result,
Err(err) => {
let mut internal_error = Response::new(full(err.message));
*internal_error.status_mut() = err.status_code;
return Ok(internal_error);
}
};
let (router_selected_model, route_name) =
(routing_result.model_name, routing_result.route_name);
let model = if router_selected_model != "none" {
router_selected_model
} else {
alias_resolved_model.clone()
};
// Cache the routing decision so subsequent requests with the same session ID are pinned
if let Some(ref sid) = session_id {
state
.router_service
.cache_route(sid.clone(), model.clone(), route_name)
.await;
}
model
};
tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str());

View file

@ -162,15 +162,9 @@ async fn init_app_state(
.map(|p| p.name.clone())
.unwrap_or_else(|| DEFAULT_ROUTING_LLM_PROVIDER.to_string());
let session_ttl_seconds = config
.routing
.as_ref()
.and_then(|r| r.session_ttl_seconds);
let session_ttl_seconds = config.routing.as_ref().and_then(|r| r.session_ttl_seconds);
let session_max_entries = config
.routing
.as_ref()
.and_then(|r| r.session_max_entries);
let session_max_entries = config.routing.as_ref().and_then(|r| r.session_max_entries);
let router_service = Arc::new(RouterService::new(
config.model_providers.clone(),

View file

@ -4,18 +4,18 @@
## Why Session Pinning?
When an agent runs in a loop — research → plan → implement → review → refine — each iteration hits Plano's router independently. Since the prompts vary in intent, the router may select **different models** for each step, breaking consistency mid-workflow.
When an agent runs in a loop — research → analyse → implement → evaluate → summarise — each step hits Plano's router independently. Because prompts vary in intent, the router may select **different models** for each step, fragmenting context mid-session.
**Session pinning** solves this: send an `X-Session-Id` header and the first request runs routing as usual, caching the decision. Every subsequent request with the same session ID returns the **same model** instantly (`"pinned": true`), without re-running the router.
**Session pinning** solves this: send an `X-Session-Id` header and the first request runs routing as usual, caching the decision. Every subsequent request with the same session ID returns the **same model**, without re-running the router.
```
Without pinning With pinning (X-Session-Id)
───────────────── ──────────────────────────
Step 1 → Claude (code_generation) Step 1 → Claude (code_generation) ← routed
Step 2 → GPT-4o (complex_reasoning) Step 2 → Claude (pinned ✓)
Step 3 → Claude (code_generation) Step 3 → Claude (pinned ✓)
Step 4 → GPT-4o (complex_reasoning) Step 4 → Claude (pinned ✓)
Step 5 → Claude (code_generation) Step 5 → Claude (pinned ✓)
───────────────── ──────────────────────────
Step 1 → claude-sonnet (code_gen) Step 1 → claude-sonnet ← routed
Step 2 → gpt-4o (reasoning) Step 2 → claude-sonnet ← pinned ✓
Step 3 → claude-sonnet (code_gen) Step 3 → claude-sonnet ← pinned ✓
Step 4 → gpt-4o (reasoning) Step 4 → claude-sonnet ← pinned ✓
Step 5 → claude-sonnet (code_gen) Step 5 → claude-sonnet ← pinned ✓
↑ model switches every step ↑ one model, start to finish
```
@ -32,53 +32,106 @@ export ANTHROPIC_API_KEY=<your-key>
cd demos/llm_routing/session_pinning
planoai up config.yaml
# 3. Run the demo
./demo.sh # or: python3 demo.py
# 3. Run the demo (uv manages dependencies automatically)
./demo.sh # or: uv run demo.py
```
---
## What the Demo Does
The script simulates an agent building a task management app in **5 iterative steps**, deliberately mixing intents:
A **Database Research Agent** investigates whether to use PostgreSQL or MongoDB
for an e-commerce platform. It runs 5 steps, each building on prior findings via
accumulated message history. Steps alternate between `code_generation` and
`complex_reasoning` intents so Plano routes to different models without pinning.
| Step | Prompt | Intent |
|:----:|--------|--------|
| 1 | Design a REST API schema for a task management app… | code generation |
| 2 | Analyze SQL vs NoSQL trade-offs for this system… | complex reasoning |
| 3 | Write the SQLAlchemy database models… | code generation |
| 4 | Review the API design for security vulnerabilities… | complex reasoning |
| 5 | Implement JWT authentication middleware… | code generation |
| Step | Task | Intent |
|:----:|------|--------|
| 1 | List technical requirements | code_generation → claude-sonnet |
| 2 | Compare PostgreSQL vs MongoDB | complex_reasoning → gpt-4o |
| 3 | Write schema (CREATE TABLE) | code_generation → claude-sonnet |
| 4 | Assess scalability trade-offs | complex_reasoning → gpt-4o |
| 5 | Write final recommendation report | code_generation → claude-sonnet |
It runs this loop **twice** against the `/routing/v1/chat/completions` endpoint (routing decisions only — no actual LLM calls):
The demo runs the loop **twice** against `/v1/chat/completions` using the
[OpenAI SDK](https://github.com/openai/openai-python):
1. **Without pinning** — no `X-Session-Id` header; models switch between steps
2. **With pinning**`X-Session-Id` header included; the model selected in step 1 is reused for all 5 steps
1. **Without pinning** — no `X-Session-Id`; models alternate per step
2. **With pinning**`X-Session-Id` header included; model is pinned from step 1
Each step makes real LLM calls. Step 5's report explicitly references findings
from earlier steps, demonstrating why coherent context requires a consistent model.
### Expected Output
```
══════════════════════════════════════════════════════════════════
Run 1: WITHOUT Session Pinning
──────────────────────────────────────────────────────────────────
Step 1: Design a REST API schema… → anthropic/claude-sonnet-4-20250514
Step 2: Analyze SQL vs NoSQL… → openai/gpt-4o
Step 3: Write SQLAlchemy models… → anthropic/claude-sonnet-4-20250514
Step 4: Review API for security… → openai/gpt-4o
Step 5: Implement JWT auth… → anthropic/claude-sonnet-4-20250514
─────────────────────────────────────────────────────────────────────
step 1 [claude-sonnet-4-20250514] List requirements
"Critical requirements: 1. ACID transactions for order integrity…"
✗ Models varied: anthropic/claude-sonnet-4-20250514, openai/gpt-4o
step 2 [gpt-4o ] Compare databases ← switched
"PostgreSQL excels at joins and ACID guarantees…"
══════════════════════════════════════════════════════════════════
Run 2: WITH Session Pinning (X-Session-Id: a1b2c3d4-…)
──────────────────────────────────────────────────────────────────
Step 1: Design a REST API schema… → anthropic/claude-sonnet-4-20250514 (pinned=false)
Step 2: Analyze SQL vs NoSQL… → anthropic/claude-sonnet-4-20250514 (pinned=true)
Step 3: Write SQLAlchemy models… → anthropic/claude-sonnet-4-20250514 (pinned=true)
Step 4: Review API for security… → anthropic/claude-sonnet-4-20250514 (pinned=true)
Step 5: Implement JWT auth… → anthropic/claude-sonnet-4-20250514 (pinned=true)
step 3 [claude-sonnet-4-20250514] Write schema ← switched
"CREATE TABLE orders (\n id SERIAL PRIMARY KEY…"
✓ All 5 steps routed to anthropic/claude-sonnet-4-20250514
step 4 [gpt-4o ] Assess scalability ← switched
"At high write volume, PostgreSQL row-level locking…"
step 5 [claude-sonnet-4-20250514] Write report ← switched
"RECOMMENDATION: PostgreSQL is the right choice…"
✗ Without pinning: model switched 4 time(s) — gpt-4o, claude-sonnet-4-20250514
Run 2: WITH Session Pinning (X-Session-Id: a1b2c3d4…)
─────────────────────────────────────────────────────────────────────
step 1 [claude-sonnet-4-20250514] List requirements
"Critical requirements: 1. ACID transactions for order integrity…"
step 2 [claude-sonnet-4-20250514] Compare databases
"Building on the requirements I just outlined: PostgreSQL…"
step 3 [claude-sonnet-4-20250514] Write schema
"Following the comparison above, here is the PostgreSQL schema…"
step 4 [claude-sonnet-4-20250514] Assess scalability
"Given the schema I designed, PostgreSQL's row-level locking…"
step 5 [claude-sonnet-4-20250514] Write report
"RECOMMENDATION: Based on my analysis of requirements, comparison…"
✓ With pinning: claude-sonnet-4-20250514 held for all 5 steps
══ Final Report (pinned session) ═════════════════════════════════════
RECOMMENDATION: Based on my analysis of requirements, the head-to-head
comparison, the schema I designed, and the scalability trade-offs…
══════════════════════════════════════════════════════════════════════
```
### How It Works
Session pinning is implemented in brightstaff. When `X-Session-Id` is present:
1. **First request** — routing runs normally, result is cached keyed by session ID
2. **Subsequent requests** — cache hit skips routing and returns the cached model instantly
The `X-Session-Id` header is forwarded transparently; no changes to your OpenAI
SDK calls beyond adding the header.
```python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:12000/v1", api_key="EMPTY")
session_id = str(uuid.uuid4())
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
extra_headers={"X-Session-Id": session_id}, # pin the session
)
```
---
@ -93,10 +146,11 @@ routing:
session_max_entries: 10000 # Max cached sessions before LRU eviction
```
Without the `X-Session-Id` header, routing runs fresh every time — no breaking change to existing clients.
Without the `X-Session-Id` header, routing runs fresh every time — no breaking
change to existing clients.
---
## See Also
- [Model Routing Service Demo](../model_routing_service/) — curl-based examples of the routing endpoint and session pinning
- [Model Routing Service Demo](../model_routing_service/) — curl-based examples of the routing endpoint

View file

@ -0,0 +1,429 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["fastapi>=0.115", "uvicorn>=0.30", "openai>=1.0.0"]
# ///
"""
Research Agent FastAPI service exposing /v1/chat/completions.
For each incoming request the agent runs 3 independent research tasks,
each with its own tool-calling loop. The tasks deliberately alternate between
code_generation and complex_reasoning intents so Plano's preference-based
router selects different models for each task.
If the client sends X-Session-Id, the agent forwards it on every outbound
call to Plano. The first task pins the model; all subsequent tasks skip the
router and reuse it keeping the whole session on one consistent model.
Run standalone:
uv run agent.py
PLANO_URL=http://myhost:12000 AGENT_PORT=8000 uv run agent.py
"""
import json
import logging
import os
import uuid
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [AGENT] %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
PLANO_URL = os.environ.get("PLANO_URL", "http://localhost:12000")
PORT = int(os.environ.get("AGENT_PORT", "8000"))
# ---------------------------------------------------------------------------
# Tasks — each has its own conversation so Plano routes each independently.
# Intent alternates: code_generation → complex_reasoning → code_generation.
# ---------------------------------------------------------------------------
TASKS = [
{
"name": "generate_comparison",
# Triggers code_generation routing preference (write/generate output)
"prompt": (
"Use the tools to fetch benchmark data for PostgreSQL and MongoDB "
"under a mixed workload. Then generate a compact Markdown comparison "
"table with columns: metric, PostgreSQL, MongoDB. Cover read QPS, "
"write QPS, p99 latency ms, ACID support, and horizontal scaling."
),
},
{
"name": "analyse_tradeoffs",
# Triggers complex_reasoning routing preference (analyse/reason/evaluate)
"prompt": (
"Context from prior research:\n{context}\n\n"
"Perform a deep analysis: for a high-traffic e-commerce platform that "
"requires ACID guarantees for order processing but flexible schemas for "
"product attributes, carefully reason through and evaluate the long-term "
"architectural trade-offs of each database. Consider consistency "
"guarantees, operational complexity, and scalability risks."
),
},
{
"name": "write_schema",
# Triggers code_generation routing preference (write SQL / generate code)
"prompt": (
"Context from prior research:\n{context}\n\n"
"Write the CREATE TABLE SQL schema for the database you would recommend "
"from the analysis above. Include: orders, order_items, products, and "
"users tables with appropriate primary keys, foreign keys, and indexes."
),
},
]
SYSTEM_PROMPT = (
"You are a database selection analyst for an e-commerce platform. "
"Use the available tools when you need data. "
"Be concise — each response should be a compact table, code block, "
"or 35 clear sentences."
)
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
TOOLS = [
{
"type": "function",
"function": {
"name": "get_db_benchmarks",
"description": (
"Fetch performance benchmark data for a database. "
"Returns read/write throughput, latency, and scaling characteristics."
),
"parameters": {
"type": "object",
"properties": {
"database": {
"type": "string",
"enum": ["postgresql", "mongodb"],
},
"workload": {
"type": "string",
"enum": ["read_heavy", "write_heavy", "mixed"],
},
},
"required": ["database", "workload"],
},
},
},
{
"type": "function",
"function": {
"name": "get_case_studies",
"description": "Retrieve e-commerce case studies for a database.",
"parameters": {
"type": "object",
"properties": {
"database": {"type": "string", "enum": ["postgresql", "mongodb"]},
},
"required": ["database"],
},
},
},
{
"type": "function",
"function": {
"name": "check_feature_support",
"description": (
"Check whether a database supports a specific feature "
"(e.g. ACID transactions, horizontal sharding, JSON documents)."
),
"parameters": {
"type": "object",
"properties": {
"database": {"type": "string", "enum": ["postgresql", "mongodb"]},
"feature": {"type": "string"},
},
"required": ["database", "feature"],
},
},
},
]
# ---------------------------------------------------------------------------
# Tool implementations (simulated — no external calls)
# ---------------------------------------------------------------------------
_BENCHMARKS = {
("postgresql", "read_heavy"): {
"read_qps": 55_000,
"write_qps": 18_000,
"p99_ms": 4,
"notes": "Excellent for complex joins; connection pooling via pgBouncer recommended",
},
("postgresql", "write_heavy"): {
"read_qps": 30_000,
"write_qps": 24_000,
"p99_ms": 8,
"notes": "WAL overhead increases at very high write volume; partitioning helps",
},
("postgresql", "mixed"): {
"read_qps": 42_000,
"write_qps": 21_000,
"p99_ms": 6,
"notes": "Solid all-round; MVCC keeps reads non-blocking",
},
("mongodb", "read_heavy"): {
"read_qps": 85_000,
"write_qps": 30_000,
"p99_ms": 2,
"notes": "Atlas Search built-in; sharding distributes read load well",
},
("mongodb", "write_heavy"): {
"read_qps": 40_000,
"write_qps": 65_000,
"p99_ms": 3,
"notes": "WiredTiger compression reduces I/O; journal writes are async-safe",
},
("mongodb", "mixed"): {
"read_qps": 60_000,
"write_qps": 50_000,
"p99_ms": 3,
"notes": "Flexible schema accelerates feature iteration",
},
}
_CASE_STUDIES = {
"postgresql": [
{
"company": "Shopify",
"scale": "100 B+ req/day",
"notes": "Moved critical order tables back to Postgres for ACID guarantees",
},
{
"company": "Zalando",
"scale": "50 M customers",
"notes": "Uses Postgres + Citus for sharded order processing",
},
{
"company": "Instacart",
"scale": "10 M orders/mo",
"notes": "Postgres for inventory; strict consistency required for stock levels",
},
],
"mongodb": [
{
"company": "eBay",
"scale": "1.5 B listings",
"notes": "Product catalogue in MongoDB for flexible attribute schemas",
},
{
"company": "Alibaba",
"scale": "billions of docs",
"notes": "Session and cart data in MongoDB; high write throughput",
},
{
"company": "Foursquare",
"scale": "10 B+ check-ins",
"notes": "Geospatial queries and flexible location schemas",
},
],
}
_FEATURES = {
("postgresql", "acid transactions"): {
"supported": True,
"notes": "Full ACID with serialisable isolation",
},
("postgresql", "horizontal sharding"): {
"supported": True,
"notes": "Via Citus extension or manual partitioning; not native",
},
("postgresql", "json documents"): {
"supported": True,
"notes": "JSONB with indexing; flexible but slower than native doc store",
},
("postgresql", "full-text search"): {
"supported": True,
"notes": "Built-in tsvector/tsquery; Elasticsearch for advanced use cases",
},
("postgresql", "multi-document transactions"): {
"supported": True,
"notes": "Native cross-table ACID",
},
("mongodb", "acid transactions"): {
"supported": True,
"notes": "Multi-document ACID since v4.0; single-doc always atomic",
},
("mongodb", "horizontal sharding"): {
"supported": True,
"notes": "Native sharding; auto-balancing across shards",
},
("mongodb", "json documents"): {
"supported": True,
"notes": "Native BSON document model; schema-free by default",
},
("mongodb", "full-text search"): {
"supported": True,
"notes": "Atlas Search (Lucene-based) for advanced full-text",
},
("mongodb", "multi-document transactions"): {
"supported": True,
"notes": "Available but adds latency; best avoided on hot paths",
},
}
def _dispatch(name: str, args: dict) -> str:
if name == "get_db_benchmarks":
key = (args["database"].lower(), args["workload"].lower())
return json.dumps(_BENCHMARKS.get(key, {"error": f"no data for {key}"}))
if name == "get_case_studies":
db = args["database"].lower()
return json.dumps(_CASE_STUDIES.get(db, {"error": f"unknown db '{db}'"}))
if name == "check_feature_support":
key = (args["database"].lower(), args["feature"].lower())
for k, v in _FEATURES.items():
if k[0] == key[0] and k[1] in key[1]:
return json.dumps(v)
return json.dumps({"error": f"feature '{args['feature']}' not in dataset"})
return json.dumps({"error": f"unknown tool '{name}'"})
# ---------------------------------------------------------------------------
# Task runner — one independent conversation per task
# ---------------------------------------------------------------------------
async def run_task(
client: AsyncOpenAI,
task_name: str,
prompt: str,
session_id: str | None,
) -> tuple[str, str]:
"""
Run a single research task with its own tool-calling loop.
Each task is an independent conversation so the router sees only
this task's intent — not the accumulated context of previous tasks.
Session pinning via X-Session-Id pins the model from the first task
onward, so all tasks stay on the same model.
Returns (answer, first_model_used).
"""
headers = {"X-Session-Id": session_id} if session_id else {}
messages: list[ChatCompletionMessageParam] = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
first_model: str | None = None
while True:
resp = await client.chat.completions.create(
model="gpt-4o-mini", # Plano's router overrides this via routing_preferences
messages=messages,
tools=TOOLS,
tool_choice="auto",
max_completion_tokens=600,
extra_headers=headers or None,
)
if first_model is None:
first_model = resp.model
log.info(
"task=%s model=%s finish=%s",
task_name,
resp.model,
resp.choices[0].finish_reason,
)
choice = resp.choices[0]
if choice.finish_reason == "tool_calls" and choice.message.tool_calls:
messages.append(choice.message)
for tc in choice.message.tool_calls:
args = json.loads(tc.function.arguments or "{}")
result = _dispatch(tc.function.name, args)
log.info(" tool %s(%s)", tc.function.name, args)
messages.append(
{"role": "tool", "content": result, "tool_call_id": tc.id}
)
else:
return (choice.message.content or "").strip(), first_model or "unknown"
# ---------------------------------------------------------------------------
# Research loop — runs all tasks, threading context forward
# ---------------------------------------------------------------------------
async def run_research_loop(
client: AsyncOpenAI,
session_id: str | None,
) -> tuple[str, list[dict]]:
"""
Run all 3 research tasks in sequence, passing each task's output as
context to the next. Returns (final_answer, routing_trace).
"""
context = ""
trace: list[dict] = []
final_answer = ""
for task in TASKS:
prompt = task["prompt"].format(context=context)
answer, model = await run_task(client, task["name"], prompt, session_id)
trace.append({"task": task["name"], "model": model})
context += f"\n### {task['name']}\n{answer}\n"
final_answer = answer
return final_answer, trace
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="Research Agent", version="1.0.0")
@app.post("/v1/chat/completions")
async def chat(request: Request) -> JSONResponse:
body = await request.json()
session_id: str | None = request.headers.get("x-session-id")
log.info("request session_id=%s", session_id or "none")
client = AsyncOpenAI(base_url=f"{PLANO_URL}/v1", api_key="EMPTY")
answer, trace = await run_research_loop(client, session_id)
return JSONResponse(
{
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": answer},
"finish_reason": "stop",
}
],
"routing_trace": trace,
"session_id": session_id,
}
)
@app.get("/health")
async def health() -> dict:
return {"status": "ok", "plano_url": PLANO_URL}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
log.info("starting on port %d plano=%s", PORT, PLANO_URL)
uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="warning")

View file

@ -1,150 +1,174 @@
#!/usr/bin/env python3
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["httpx>=0.27"]
# ///
"""
Session Pinning Demo Iterative Research Agent
Session Pinning Demo Research Agent client
Demonstrates how session pinning ensures consistent model selection
across multiple iterations of an agentic loop. Runs the same 5-step
research workflow twice:
Sends the same query to the Research Agent twice once without a session ID
and once with one and compares the routing trace to show how session pinning
keeps the model consistent across the LLM's tool-calling loop.
1) Without session pinning models may switch between iterations
2) With session pinning first iteration pins the model for all subsequent ones
Requires the agent to already be running (start it with ./start_agents.sh).
Uses the /routing/v1/chat/completions endpoint (routing decisions only, no LLM calls).
Usage:
uv run demo.py
AGENT_URL=http://localhost:8000 uv run demo.py
"""
import json
import asyncio
import os
import urllib.request
import uuid
PLANO_URL = os.environ.get("PLANO_URL", "http://localhost:12000")
import httpx
# Simulates an iterative research agent building a task management app.
# Prompts deliberately alternate between code_generation and complex_reasoning
# intents so that without pinning, different models get selected per step.
RESEARCH_STEPS = [
"Design a REST API schema for a task management app with users, projects, and tasks",
"Analyze the trade-offs between SQL and NoSQL databases for this task management system",
"Write the database models and ORM setup in Python using SQLAlchemy",
"Review the API design for security vulnerabilities and suggest improvements",
"Implement the authentication middleware with JWT tokens",
]
AGENT_URL = os.environ.get("AGENT_URL", "http://localhost:8000")
QUERY = (
"Should we use PostgreSQL or MongoDB for a high-traffic e-commerce backend "
"that needs strong consistency for orders but flexible schemas for products?"
)
STEP_LABELS = [
"Design REST API schema",
"Analyze SQL vs NoSQL trade-offs",
"Write SQLAlchemy database models",
"Review API security vulnerabilities",
"Implement JWT auth middleware",
]
# ---------------------------------------------------------------------------
# Client helpers
# ---------------------------------------------------------------------------
def run_research_loop(session_id=None):
"""Run the research agent loop, optionally with session pinning."""
results = []
async def wait_for_agent(timeout: int = 30) -> bool:
async with httpx.AsyncClient() as client:
for _ in range(timeout * 2):
try:
r = await client.get(f"{AGENT_URL}/health", timeout=1.0)
if r.status_code == 200:
return True
except Exception:
pass
await asyncio.sleep(0.5)
return False
for i, prompt in enumerate(RESEARCH_STEPS, 1):
headers = {"Content-Type": "application/json"}
if session_id:
headers["X-Session-Id"] = session_id
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
}
async def ask_agent(query: str, session_id: str | None = None) -> dict:
headers: dict[str, str] = {}
if session_id:
headers["X-Session-Id"] = session_id
resp = urllib.request.urlopen(
urllib.request.Request(
f"{PLANO_URL}/routing/v1/chat/completions",
data=json.dumps(payload).encode(),
headers=headers,
),
timeout=10,
async with httpx.AsyncClient(timeout=120.0) as client:
r = await client.post(
f"{AGENT_URL}/v1/chat/completions",
headers=headers,
json={"messages": [{"role": "user", "content": query}]},
)
data = json.loads(resp.read())
model = data.get("model", "unknown")
route = data.get("route") or "none"
pinned = data.get("pinned")
results.append({"step": i, "model": model, "route": route, "pinned": pinned})
return results
r.raise_for_status()
return r.json()
def print_results_table(results):
"""Print results as a compact aligned table."""
label_width = max(len(l) for l in STEP_LABELS)
for r in results:
step = r["step"]
label = STEP_LABELS[step - 1]
model = r["model"]
pinned = r["pinned"]
# Shorten model names for readability
short_model = model.replace("anthropic/", "").replace("openai/", "")
pin_indicator = ""
if pinned is True:
pin_indicator = " ◀ pinned"
elif pinned is False:
pin_indicator = " ◀ routed"
print(f" {step}. {label:<{label_width}}{short_model}{pin_indicator}")
# ---------------------------------------------------------------------------
# Display helpers
# ---------------------------------------------------------------------------
def print_summary(label, results):
"""Print a one-line summary of model consistency."""
models = [r["model"] for r in results]
unique = sorted(set(models))
def _short(model: str) -> str:
return model.split("/")[-1] if "/" in model else model
def _print_trace(result: dict) -> None:
trace = result.get("routing_trace", [])
if not trace:
print(" (no trace)")
return
prev: str | None = None
for t in trace:
short = _short(t["model"])
switch = " ← switched" if (prev and t["model"] != prev) else ""
prev = t["model"]
print(f" {t['task']:<26} [{short}]{switch}")
def _print_summary(label: str, result: dict) -> None:
models = [t["model"] for t in result.get("routing_trace", [])]
if not models:
print(f" ? {label}: no routing data")
return
unique = set(models)
if len(unique) == 1:
short = models[0].replace("anthropic/", "").replace("openai/", "")
print(f"{label}: All 5 steps → {short}")
print(f"{label}: {_short(next(iter(unique)))} for all {len(models)} turns")
else:
short = [m.replace("anthropic/", "").replace("openai/", "") for m in unique]
print(f"{label}: Models varied → {', '.join(short)}")
switched = sum(1 for a, b in zip(models, models[1:]) if a != b)
names = ", ".join(sorted(_short(m) for m in unique))
print(f"{label}: model switched {switched} time(s) — {names}")
def main():
# ---------------------------------------------------------------------------
# Demo
# ---------------------------------------------------------------------------
async def main() -> None:
print()
print(" ╔══════════════════════════════════════════════════════════════╗")
print(" ║ Session Pinning Demo — Iterative Research Agent ║")
print("Session Pinning Demo — Research Agent ")
print(" ╚══════════════════════════════════════════════════════════════╝")
print()
print(" An agent builds a task management app in 5 steps.")
print(" Each step asks Plano's router which model to use.")
print(f" Agent : {AGENT_URL}")
print(f" Query : \"{QUERY[:72]}\"")
print()
print(" The agent uses a tool-calling loop (get_db_benchmarks,")
print(" get_case_studies, check_feature_support) to research the")
print(" question. Each LLM turn hits Plano's preference-based router.")
print()
# --- Run 1: Without session pinning ---
print(" ┌──────────────────────────────────────────────────────────────┐")
print(" │ Run 1: WITHOUT Session Pinning │")
print(" └──────────────────────────────────────────────────────────────┘")
print()
results_no_pin = run_research_loop(session_id=None)
print_results_table(results_no_pin)
print(f" Waiting for agent at {AGENT_URL}", end=" ", flush=True)
if not await wait_for_agent():
print("FAILED — agent did not respond within 30 s")
return
print("ready.")
print()
# --- Run 2: With session pinning ---
session_id = str(uuid.uuid4())
short_sid = session_id[:8]
print(f" ┌──────────────────────────────────────────────────────────────┐")
print(f" │ Run 2: WITH Session Pinning (session: {short_sid}…) │")
print(f" └──────────────────────────────────────────────────────────────┘")
sid = str(uuid.uuid4())
print(" Sending queries (running concurrently)…")
print()
results_pinned = run_research_loop(session_id=session_id)
print_results_table(results_pinned)
without, with_pin = await asyncio.gather(
ask_agent(QUERY, session_id=None),
ask_agent(QUERY, session_id=sid),
)
# ── Run 1 ────────────────────────────────────────────────────────────
print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print(" Run 1: WITHOUT Session Pinning")
print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print()
print(" LLM turns inside the agent loop:")
print()
_print_trace(without)
print()
_print_summary("Without pinning", without)
print()
# --- Summary ---
print(" ┌──────────────────────────────────────────────────────────────┐")
print(" │ Summary │")
print(" └──────────────────────────────────────────────────────────────┘")
# ── Run 2 ────────────────────────────────────────────────────────────
print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print(f" Run 2: WITH Session Pinning (X-Session-Id: {sid[:8]}…)")
print(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print()
print_summary("Without pinning", results_no_pin)
print_summary("With pinning ", results_pinned)
print(" LLM turns inside the agent loop:")
print()
_print_trace(with_pin)
print()
_print_summary("With pinning ", with_pin)
print()
# ── Final answer ─────────────────────────────────────────────────────
answer = with_pin["choices"][0]["message"]["content"]
print(" ══ Agent recommendation (pinned session) ═════════════════════")
print()
for line in answer.splitlines():
print(f" {line}")
print()
print(" ══════════════════════════════════════════════════════════════")
print()
if __name__ == "__main__":
main()
asyncio.run(main())

View file

@ -3,9 +3,17 @@ set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
export PLANO_URL="${PLANO_URL:-http://localhost:12000}"
export AGENT_PORT="${AGENT_PORT:-8000}"
export AGENT_URL="http://localhost:$AGENT_PORT"
echo "Running session pinning demo..."
echo "PLANO_URL=$PLANO_URL"
echo ""
cleanup() {
[ -n "$AGENT_PID" ] && kill "$AGENT_PID" 2>/dev/null
}
trap cleanup EXIT INT TERM
python3 "$SCRIPT_DIR/demo.py"
# Start the agent in the background
"$SCRIPT_DIR/start_agents.sh" &
AGENT_PID=$!
# Run the demo client
uv run "$SCRIPT_DIR/demo.py"

View file

@ -0,0 +1,28 @@
#!/bin/bash
set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
PIDS=()
log() { echo "$(date '+%F %T') - $*"; }
cleanup() {
log "Stopping agents..."
for PID in "${PIDS[@]}"; do
kill "$PID" 2>/dev/null && log "Stopped process $PID"
done
exit 0
}
trap cleanup EXIT INT TERM
export PLANO_URL="${PLANO_URL:-http://localhost:12000}"
export AGENT_PORT="${AGENT_PORT:-8000}"
log "Starting research_agent on port $AGENT_PORT..."
uv run "$SCRIPT_DIR/agent.py" &
PIDS+=($!)
for PID in "${PIDS[@]}"; do
wait "$PID"
done