mirror of
https://github.com/katanemo/plano.git
synced 2026-04-28 02:23:56 +02:00
Update docs to Plano (#639)
This commit is contained in:
parent
15fbb6c3af
commit
e224cba3e3
139 changed files with 4407 additions and 24735 deletions
|
|
@ -3,11 +3,11 @@
|
|||
Configuration Reference
|
||||
=======================
|
||||
|
||||
The following is a complete reference of the ``arch_config.yml`` that controls the behavior of a single instance of
|
||||
The following is a complete reference of the ``plano_config.yml`` that controls the behavior of a single instance of
|
||||
the Arch gateway. This where you enable capabilities like routing to upstream LLm providers, defining prompt_targets
|
||||
where prompts get routed to, apply guardrails, and enable critical agent observability features.
|
||||
|
||||
.. literalinclude:: includes/arch_config_full_reference.yaml
|
||||
:language: yaml
|
||||
:linenos:
|
||||
:caption: :download:`Arch Configuration - Full Reference <includes/arch_config_full_reference.yaml>`
|
||||
:caption: :download:`Plano Configuration - Full Reference <includes/arch_config_full_reference.yaml>`
|
||||
|
|
|
|||
109
docs/source/resources/db_setup/README.md
Normal file
109
docs/source/resources/db_setup/README.md
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
# Database Setup for Conversation State Storage
|
||||
|
||||
This directory contains SQL scripts needed to set up database tables for storing conversation state when using the OpenAI Responses API.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- PostgreSQL database (Supabase or self-hosted)
|
||||
- Database connection credentials
|
||||
- `psql` CLI tool or database admin access
|
||||
|
||||
## Setup Instructions
|
||||
|
||||
### Option 1: Using psql
|
||||
|
||||
```bash
|
||||
psql $DATABASE_URL -f docs/db_setup/conversation_states.sql
|
||||
```
|
||||
|
||||
### Option 2: Using Supabase Dashboard
|
||||
|
||||
1. Log in to your Supabase project dashboard
|
||||
2. Navigate to the SQL Editor
|
||||
3. Copy and paste the contents of `conversation_states.sql`
|
||||
4. Run the query
|
||||
|
||||
### Option 3: Direct Database Connection
|
||||
|
||||
Connect to your PostgreSQL database using your preferred client and execute the SQL from `conversation_states.sql`.
|
||||
|
||||
## Verification
|
||||
|
||||
After running the setup, verify the table was created:
|
||||
|
||||
```sql
|
||||
SELECT tablename FROM pg_tables WHERE tablename = 'conversation_states';
|
||||
```
|
||||
|
||||
You should see `conversation_states` in the results.
|
||||
|
||||
## Configuration
|
||||
|
||||
After setting up the database table, configure your application to use Supabase storage by setting the appropriate environment variable or configuration parameter with your database connection string.
|
||||
|
||||
### Supabase Connection String
|
||||
|
||||
**Important:** Supabase requires different connection strings depending on your network:
|
||||
|
||||
- **IPv4 Networks (Most Common)**: Use the **Session Pooler** connection string (port 5432):
|
||||
```
|
||||
postgresql://postgres.[PROJECT-REF]:[PASSWORD]@aws-0-[REGION].pooler.supabase.com:5432/postgres
|
||||
```
|
||||
|
||||
- **IPv6 Networks**: Use the direct connection (port 5432):
|
||||
```
|
||||
postgresql://postgres:[PASSWORD]@db.[PROJECT-REF].supabase.co:5432/postgres
|
||||
```
|
||||
|
||||
**How to get your connection string:**
|
||||
1. Go to your Supabase project dashboard
|
||||
2. Settings → Database → Connection Pooling
|
||||
3. Copy the **Session mode** connection string
|
||||
4. Replace `[YOUR-PASSWORD]` with your actual database password
|
||||
5. URL-encode special characters in the password (e.g., `#` becomes `%23`)
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
# If your password is "MyPass#123", encode it as "MyPass%23123"
|
||||
export DATABASE_URL="postgresql://postgres.myproject:MyPass%23123@aws-0-us-west-2.pooler.supabase.com:5432/postgres"
|
||||
```
|
||||
|
||||
### Testing the Connection
|
||||
|
||||
To test your connection string works:
|
||||
```bash
|
||||
export TEST_DATABASE_URL="your-connection-string-here"
|
||||
cd crates/brightstaff
|
||||
cargo test supabase -- --nocapture
|
||||
```
|
||||
|
||||
## Table Schema
|
||||
|
||||
The `conversation_states` table stores:
|
||||
- `response_id` (TEXT, PRIMARY KEY): Unique identifier for each conversation
|
||||
- `input_items` (JSONB): Array of conversation messages and context
|
||||
- `created_at` (BIGINT): Unix timestamp when conversation started
|
||||
- `model` (TEXT): Model name used for the conversation
|
||||
- `provider` (TEXT): LLM provider name
|
||||
- `updated_at` (TIMESTAMP): Last update time (auto-managed)
|
||||
|
||||
## Maintenance
|
||||
|
||||
### Cleanup Old Conversations
|
||||
|
||||
To prevent unbounded growth, consider periodically cleaning up old conversation states:
|
||||
|
||||
```sql
|
||||
-- Delete conversations older than 7 days
|
||||
DELETE FROM conversation_states
|
||||
WHERE updated_at < NOW() - INTERVAL '7 days';
|
||||
```
|
||||
|
||||
You can automate this with a cron job or database trigger.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If you encounter errors on first use:
|
||||
- **"Table 'conversation_states' does not exist"**: Run the setup SQL
|
||||
- **Connection errors**: Verify your DATABASE_URL is correct
|
||||
- **Permission errors**: Ensure your database user has CREATE TABLE privileges
|
||||
31
docs/source/resources/db_setup/conversation_states.sql
Normal file
31
docs/source/resources/db_setup/conversation_states.sql
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
-- Conversation State Storage Table
|
||||
-- This table stores conversational context for the OpenAI Responses API
|
||||
-- Run this SQL against your PostgreSQL/Supabase database before enabling conversation state storage
|
||||
|
||||
CREATE TABLE IF NOT EXISTS conversation_states (
|
||||
response_id TEXT PRIMARY KEY,
|
||||
input_items JSONB NOT NULL,
|
||||
created_at BIGINT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
-- Indexes for common query patterns
|
||||
CREATE INDEX IF NOT EXISTS idx_conversation_states_created_at
|
||||
ON conversation_states(created_at);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_conversation_states_provider
|
||||
ON conversation_states(provider);
|
||||
|
||||
-- Optional: Add a policy for automatic cleanup of old conversations
|
||||
-- Uncomment and adjust the retention period as needed
|
||||
-- CREATE INDEX IF NOT EXISTS idx_conversation_states_updated_at
|
||||
-- ON conversation_states(updated_at);
|
||||
|
||||
COMMENT ON TABLE conversation_states IS 'Stores conversation history for OpenAI Responses API continuity';
|
||||
COMMENT ON COLUMN conversation_states.response_id IS 'Unique identifier for the conversation state';
|
||||
COMMENT ON COLUMN conversation_states.input_items IS 'JSONB array of conversation messages and context';
|
||||
COMMENT ON COLUMN conversation_states.created_at IS 'Unix timestamp (seconds) when the conversation started';
|
||||
COMMENT ON COLUMN conversation_states.model IS 'Model name used for this conversation';
|
||||
COMMENT ON COLUMN conversation_states.provider IS 'LLM provider (e.g., openai, anthropic, bedrock)';
|
||||
|
|
@ -3,17 +3,17 @@
|
|||
Deployment
|
||||
==========
|
||||
|
||||
This guide shows how to deploy Arch directly using Docker without the archgw CLI, including basic runtime checks for routing and health monitoring.
|
||||
This guide shows how to deploy Plano directly using Docker without the ``plano`` CLI, including basic runtime checks for routing and health monitoring.
|
||||
|
||||
Docker Deployment
|
||||
-----------------
|
||||
|
||||
Below is a minimal, production-ready example showing how to deploy the Arch Docker image directly and run basic runtime checks. Adjust image names, tags, and the ``arch_config.yaml`` path to match your environment.
|
||||
Below is a minimal, production-ready example showing how to deploy the Plano Docker image directly and run basic runtime checks. Adjust image names, tags, and the ``plano_config.yaml`` path to match your environment.
|
||||
|
||||
.. note::
|
||||
You will need to pass all required environment variables that are referenced in your ``arch_config.yaml`` file.
|
||||
You will need to pass all required environment variables that are referenced in your ``plano_config.yaml`` file.
|
||||
|
||||
For ``arch_config.yaml``, you can use any sample configuration defined earlier in the documentation. For example, you can try the :ref:`LLM Routing <llm_router>` sample config.
|
||||
For ``plano_config.yaml``, you can use any sample configuration defined earlier in the documentation. For example, you can try the :ref:`LLM Routing <llm_router>` sample config.
|
||||
|
||||
Docker Compose Setup
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
|
@ -24,14 +24,14 @@ Create a ``docker-compose.yml`` file with the following configuration:
|
|||
|
||||
# docker-compose.yml
|
||||
services:
|
||||
archgw:
|
||||
image: katanemo/archgw:0.3.22
|
||||
container_name: archgw
|
||||
plano:
|
||||
image: katanemo/plano:0.4.0
|
||||
container_name: plano
|
||||
ports:
|
||||
- "10000:10000" # ingress (client -> arch)
|
||||
- "12000:12000" # egress (arch -> upstream/llm proxy)
|
||||
- "10000:10000" # ingress (client -> plano)
|
||||
- "12000:12000" # egress (plano -> upstream/llm proxy)
|
||||
volumes:
|
||||
- ./arch_config.yaml:/app/arch_config.yaml:ro
|
||||
- ./plano_config.yaml:/app/plano_config.yaml:ro
|
||||
environment:
|
||||
- OPENAI_API_KEY=${OPENAI_API_KEY:?error}
|
||||
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:?error}
|
||||
|
|
@ -39,7 +39,7 @@ Create a ``docker-compose.yml`` file with the following configuration:
|
|||
Starting the Stack
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Start the services from the directory containing ``docker-compose.yml`` and ``arch_config.yaml``:
|
||||
Start the services from the directory containing ``docker-compose.yml`` and ``plano_config.yaml``:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
|
|
@ -51,7 +51,7 @@ Check container health and logs:
|
|||
.. code-block:: bash
|
||||
|
||||
docker compose ps
|
||||
docker compose logs -f archgw
|
||||
docker compose logs -f plano
|
||||
|
||||
Runtime Tests
|
||||
-------------
|
||||
|
|
@ -65,7 +65,7 @@ Test the chat completion endpoint with automatic routing:
|
|||
|
||||
.. code-block:: bash
|
||||
|
||||
# Request handled by the gateway. 'model: "none"' lets Arch decide routing
|
||||
# Request handled by the gateway. 'model: "none"' lets Plano decide routing
|
||||
curl --header 'Content-Type: application/json' \
|
||||
--data '{"messages":[{"role":"user","content":"tell me a joke"}], "model":"none"}' \
|
||||
http://localhost:12000/v1/chat/completions | jq .model
|
||||
|
|
@ -74,7 +74,7 @@ Expected output:
|
|||
|
||||
.. code-block:: json
|
||||
|
||||
"gpt-4o-2024-08-06"
|
||||
"gpt-5.2"
|
||||
|
||||
Model-Based Routing
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
|
@ -84,14 +84,14 @@ Test explicit provider and model routing:
|
|||
.. code-block:: bash
|
||||
|
||||
curl -s -H "Content-Type: application/json" \
|
||||
-d '{"messages":[{"role":"user","content":"Explain quantum computing"}], "model":"anthropic/claude-3-5-sonnet-20241022"}' \
|
||||
-d '{"messages":[{"role":"user","content":"Explain quantum computing"}], "model":"anthropic/claude-sonnet-4-5"}' \
|
||||
http://localhost:12000/v1/chat/completions | jq .model
|
||||
|
||||
Expected output:
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
"claude-3-5-sonnet-20241022"
|
||||
"claude-sonnet-4-5"
|
||||
|
||||
Troubleshooting
|
||||
---------------
|
||||
|
|
@ -100,19 +100,19 @@ Common Issues and Solutions
|
|||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
**Environment Variables**
|
||||
Ensure all environment variables (``OPENAI_API_KEY``, ``ANTHROPIC_API_KEY``, etc.) used by ``arch_config.yaml`` are set before starting services.
|
||||
Ensure all environment variables (``OPENAI_API_KEY``, ``ANTHROPIC_API_KEY``, etc.) used by ``plano_config.yaml`` are set before starting services.
|
||||
|
||||
**TLS/Connection Errors**
|
||||
If you encounter TLS or connection errors to upstream providers:
|
||||
|
||||
- Check DNS resolution
|
||||
- Verify proxy settings
|
||||
- Confirm correct protocol and port in your ``arch_config`` endpoints
|
||||
- Confirm correct protocol and port in your ``plano_config`` endpoints
|
||||
|
||||
**Verbose Logging**
|
||||
To enable more detailed logs for debugging:
|
||||
|
||||
- Run archgw with a higher component log level
|
||||
- Run plano with a higher component log level
|
||||
- See the :ref:`Observability <observability>` guide for logging and monitoring details
|
||||
- Rebuild the image if required with updated log configuration
|
||||
|
||||
|
|
|
|||
57
docs/source/resources/includes/agents/agents_config.yaml
Normal file
57
docs/source/resources/includes/agents/agents_config.yaml
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
version: v0.3.0
|
||||
|
||||
agents:
|
||||
- id: weather_agent
|
||||
url: http://host.docker.internal:10510
|
||||
- id: flight_agent
|
||||
url: http://host.docker.internal:10520
|
||||
|
||||
model_providers:
|
||||
- model: openai/gpt-4o
|
||||
access_key: $OPENAI_API_KEY
|
||||
default: true
|
||||
- model: openai/gpt-4o-mini
|
||||
access_key: $OPENAI_API_KEY # smaller, faster, cheaper model for extracting entities like location
|
||||
|
||||
listeners:
|
||||
- type: agent
|
||||
name: travel_booking_service
|
||||
port: 8001
|
||||
router: plano_orchestrator_v1
|
||||
agents:
|
||||
- id: weather_agent
|
||||
description: |
|
||||
|
||||
WeatherAgent is a specialized AI assistant for real-time weather information and forecasts. It provides accurate weather data for any city worldwide using the Open-Meteo API, helping travelers plan their trips with up-to-date weather conditions.
|
||||
|
||||
Capabilities:
|
||||
* Get real-time weather conditions and multi-day forecasts for any city worldwide using Open-Meteo API (free, no API key needed)
|
||||
* Provides current temperature
|
||||
* Provides multi-day forecasts
|
||||
* Provides weather conditions
|
||||
* Provides sunrise/sunset times
|
||||
* Provides detailed weather information
|
||||
* Understands conversation context to resolve location references from previous messages
|
||||
* Handles weather-related questions including "What's the weather in [city]?", "What's the forecast for [city]?", "How's the weather in [city]?"
|
||||
* When queries include both weather and other travel questions (e.g., flights, currency), this agent answers ONLY the weather part
|
||||
|
||||
- id: flight_agent
|
||||
description: |
|
||||
|
||||
FlightAgent is an AI-powered tool specialized in providing live flight information between airports. It leverages the FlightAware AeroAPI to deliver real-time flight status, gate information, and delay updates.
|
||||
|
||||
Capabilities:
|
||||
* Get live flight information between airports using FlightAware AeroAPI
|
||||
* Shows real-time flight status
|
||||
* Shows scheduled/estimated/actual departure and arrival times
|
||||
* Shows gate and terminal information
|
||||
* Shows delays
|
||||
* Shows aircraft type
|
||||
* Shows flight status
|
||||
* Automatically resolves city names to airport codes (IATA/ICAO)
|
||||
* Understands conversation context to infer origin/destination from follow-up questions
|
||||
* Handles flight-related questions including "What flights go from [city] to [city]?", "Do flights go to [city]?", "Are there direct flights from [city]?"
|
||||
* When queries include both flight and other travel questions (e.g., weather, currency), this agent answers ONLY the flight part
|
||||
|
||||
tracing:
|
||||
random_sampling: 100
|
||||
475
docs/source/resources/includes/agents/flights.py
Normal file
475
docs/source/resources/includes/agents/flights.py
Normal file
|
|
@ -0,0 +1,475 @@
|
|||
import json
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from openai import AsyncOpenAI
|
||||
import os
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
import uvicorn
|
||||
from datetime import datetime, timedelta
|
||||
import httpx
|
||||
from typing import Optional
|
||||
from opentelemetry.propagate import extract, inject
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - [FLIGHT_AGENT] - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configuration
|
||||
LLM_GATEWAY_ENDPOINT = os.getenv(
|
||||
"LLM_GATEWAY_ENDPOINT", "http://host.docker.internal:12000/v1"
|
||||
)
|
||||
FLIGHT_MODEL = "openai/gpt-4o"
|
||||
EXTRACTION_MODEL = "openai/gpt-4o-mini"
|
||||
|
||||
# FlightAware AeroAPI configuration
|
||||
AEROAPI_BASE_URL = "https://aeroapi.flightaware.com/aeroapi"
|
||||
AEROAPI_KEY = os.getenv("AEROAPI_KEY", "ESVFX7TJLxB7OTuayUv0zTQBryA3tOPr")
|
||||
|
||||
# HTTP client for API calls
|
||||
http_client = httpx.AsyncClient(timeout=30.0)
|
||||
|
||||
# Initialize OpenAI client
|
||||
openai_client_via_plano = AsyncOpenAI(
|
||||
base_url=LLM_GATEWAY_ENDPOINT,
|
||||
api_key="EMPTY",
|
||||
)
|
||||
|
||||
# System prompt for flight agent
|
||||
SYSTEM_PROMPT = """You are a travel planning assistant specializing in flight information in a multi-agent system. You will receive flight data in JSON format with these fields:
|
||||
|
||||
- "airline": Full airline name (e.g., "Delta Air Lines")
|
||||
- "flight_number": Flight identifier (e.g., "DL123")
|
||||
- "departure_time": ISO 8601 timestamp for scheduled departure (e.g., "2025-12-24T23:00:00Z")
|
||||
- "arrival_time": ISO 8601 timestamp for scheduled arrival (e.g., "2025-12-25T04:40:00Z")
|
||||
- "origin": Origin airport IATA code (e.g., "ATL")
|
||||
- "destination": Destination airport IATA code (e.g., "SEA")
|
||||
- "aircraft_type": Aircraft model code (e.g., "A21N", "B739")
|
||||
- "status": Flight status (e.g., "Scheduled", "Delayed")
|
||||
- "terminal_origin": Departure terminal (may be null)
|
||||
- "gate_origin": Departure gate (may be null)
|
||||
|
||||
Your task:
|
||||
1. Read the JSON flight data carefully
|
||||
2. Present each flight clearly with: airline, flight number, departure/arrival times (convert from ISO format to readable time), airports, and aircraft type
|
||||
3. Organize flights chronologically by departure time
|
||||
4. Convert ISO timestamps to readable format (e.g., "11:00 PM" or "23:00")
|
||||
5. Include terminal/gate info when available
|
||||
6. Use natural, conversational language
|
||||
|
||||
Important: If the conversation includes information from other agents (like weather details), acknowledge and build upon that context naturally. Your primary focus is flights, but maintain awareness of the full conversation.
|
||||
|
||||
Remember: All the data you need is in the JSON. Use it directly."""
|
||||
|
||||
|
||||
async def extract_flight_route(messages: list, request: Request) -> dict:
|
||||
"""Extract origin, destination, and date from conversation using LLM."""
|
||||
|
||||
extraction_prompt = """Extract flight origin, destination cities, and travel date from the conversation.
|
||||
|
||||
Rules:
|
||||
1. Look for patterns: "flight from X to Y", "flights to Y", "fly from X"
|
||||
2. Extract dates like "tomorrow", "next week", "December 25", "12/25", "on Monday"
|
||||
3. Use conversation context to fill in missing details
|
||||
4. Return JSON: {"origin": "City" or null, "destination": "City" or null, "date": "YYYY-MM-DD" or null}
|
||||
|
||||
Examples:
|
||||
- "Flight from Seattle to Atlanta tomorrow" -> {"origin": "Seattle", "destination": "Atlanta", "date": "2025-12-24"}
|
||||
- "What flights go to New York?" -> {"origin": null, "destination": "New York", "date": null}
|
||||
- "Flights to Miami on Christmas" -> {"origin": null, "destination": "Miami", "date": "2025-12-25"}
|
||||
- "Show me flights from LA to NYC next Monday" -> {"origin": "LA", "destination": "NYC", "date": "2025-12-30"}
|
||||
|
||||
Today is December 23, 2025. Extract flight route and date:"""
|
||||
|
||||
try:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {}
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
response = await openai_client_via_plano.chat.completions.create(
|
||||
model=EXTRACTION_MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": extraction_prompt},
|
||||
*[
|
||||
{"role": msg.get("role"), "content": msg.get("content")}
|
||||
for msg in messages[-5:]
|
||||
],
|
||||
],
|
||||
temperature=0.1,
|
||||
max_tokens=100,
|
||||
extra_headers=extra_headers if extra_headers else None,
|
||||
)
|
||||
|
||||
result = response.choices[0].message.content.strip()
|
||||
if "```json" in result:
|
||||
result = result.split("```json")[1].split("```")[0].strip()
|
||||
elif "```" in result:
|
||||
result = result.split("```")[1].split("```")[0].strip()
|
||||
|
||||
route = json.loads(result)
|
||||
return {
|
||||
"origin": route.get("origin"),
|
||||
"destination": route.get("destination"),
|
||||
"date": route.get("date"),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting flight route: {e}")
|
||||
return {"origin": None, "destination": None, "date": None}
|
||||
|
||||
|
||||
async def resolve_airport_code(city_name: str, request: Request) -> Optional[str]:
|
||||
"""Convert city name to airport code using LLM."""
|
||||
if not city_name:
|
||||
return None
|
||||
|
||||
try:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {}
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
response = await openai_client_via_plano.chat.completions.create(
|
||||
model=EXTRACTION_MODEL,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Convert city names to primary airport IATA codes. Return only the 3-letter code. Examples: Seattle→SEA, Atlanta→ATL, New York→JFK, London→LHR",
|
||||
},
|
||||
{"role": "user", "content": city_name},
|
||||
],
|
||||
temperature=0.1,
|
||||
max_tokens=10,
|
||||
extra_headers=extra_headers if extra_headers else None,
|
||||
)
|
||||
|
||||
code = response.choices[0].message.content.strip().upper()
|
||||
code = code.strip("\"'`.,!? \n\t")
|
||||
return code if len(code) == 3 else None
|
||||
except Exception as e:
|
||||
logger.error(f"Error resolving airport code for {city_name}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def get_flights(
|
||||
origin_code: str, dest_code: str, travel_date: Optional[str] = None
|
||||
) -> Optional[dict]:
|
||||
"""Get flights between two airports using FlightAware API.
|
||||
|
||||
Args:
|
||||
origin_code: Origin airport IATA code
|
||||
dest_code: Destination airport IATA code
|
||||
travel_date: Travel date in YYYY-MM-DD format, defaults to today
|
||||
|
||||
Note: FlightAware API limits searches to 2 days in the future.
|
||||
"""
|
||||
try:
|
||||
# Use provided date or default to today
|
||||
if travel_date:
|
||||
search_date = travel_date
|
||||
else:
|
||||
search_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
# Validate date is not too far in the future (FlightAware limit: 2 days)
|
||||
search_date_obj = datetime.strptime(search_date, "%Y-%m-%d")
|
||||
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
days_ahead = (search_date_obj - today).days
|
||||
|
||||
if days_ahead > 2:
|
||||
logger.warning(
|
||||
f"Requested date {search_date} is {days_ahead} days ahead, exceeds FlightAware 2-day limit"
|
||||
)
|
||||
return {
|
||||
"origin_code": origin_code,
|
||||
"destination_code": dest_code,
|
||||
"flights": [],
|
||||
"count": 0,
|
||||
"error": f"FlightAware API only provides flight data up to 2 days in the future. The requested date ({search_date}) is {days_ahead} days ahead. Please search for today, tomorrow, or the day after.",
|
||||
}
|
||||
|
||||
url = f"{AEROAPI_BASE_URL}/airports/{origin_code}/flights/to/{dest_code}"
|
||||
headers = {"x-apikey": AEROAPI_KEY}
|
||||
params = {
|
||||
"start": f"{search_date}T00:00:00Z",
|
||||
"end": f"{search_date}T23:59:59Z",
|
||||
"connection": "nonstop",
|
||||
"max_pages": 1,
|
||||
}
|
||||
|
||||
response = await http_client.get(url, headers=headers, params=params)
|
||||
|
||||
if response.status_code != 200:
|
||||
logger.error(
|
||||
f"FlightAware API error {response.status_code}: {response.text}"
|
||||
)
|
||||
return None
|
||||
|
||||
data = response.json()
|
||||
flights = []
|
||||
|
||||
# Log raw API response for debugging
|
||||
logger.info(f"FlightAware API returned {len(data.get('flights', []))} flights")
|
||||
|
||||
for idx, flight_group in enumerate(
|
||||
data.get("flights", [])[:5]
|
||||
): # Limit to 5 flights
|
||||
# FlightAware API nests data in segments array
|
||||
segments = flight_group.get("segments", [])
|
||||
if not segments:
|
||||
continue
|
||||
|
||||
flight = segments[0] # Get first segment (direct flights only have one)
|
||||
|
||||
# Extract airport codes from nested objects
|
||||
flight_origin = None
|
||||
flight_dest = None
|
||||
|
||||
if isinstance(flight.get("origin"), dict):
|
||||
flight_origin = flight["origin"].get("code_iata")
|
||||
|
||||
if isinstance(flight.get("destination"), dict):
|
||||
flight_dest = flight["destination"].get("code_iata")
|
||||
|
||||
# Build flight object
|
||||
flights.append(
|
||||
{
|
||||
"airline": flight.get("operator"),
|
||||
"flight_number": flight.get("ident_iata") or flight.get("ident"),
|
||||
"departure_time": flight.get("scheduled_out"),
|
||||
"arrival_time": flight.get("scheduled_in"),
|
||||
"origin": flight_origin,
|
||||
"destination": flight_dest,
|
||||
"aircraft_type": flight.get("aircraft_type"),
|
||||
"status": flight.get("status"),
|
||||
"terminal_origin": flight.get("terminal_origin"),
|
||||
"gate_origin": flight.get("gate_origin"),
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"origin_code": origin_code,
|
||||
"destination_code": dest_code,
|
||||
"flights": flights,
|
||||
"count": len(flights),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching flights: {e}")
|
||||
return None
|
||||
|
||||
|
||||
app = FastAPI(title="Flight Information Agent", version="1.0.0")
|
||||
|
||||
|
||||
@app.post("/v1/chat/completions")
|
||||
async def handle_request(request: Request):
|
||||
"""HTTP endpoint for chat completions with streaming support."""
|
||||
request_body = await request.json()
|
||||
messages = request_body.get("messages", [])
|
||||
|
||||
return StreamingResponse(
|
||||
invoke_flight_agent(request, request_body),
|
||||
media_type="text/plain",
|
||||
headers={"content-type": "text/event-stream"},
|
||||
)
|
||||
|
||||
|
||||
async def invoke_flight_agent(request: Request, request_body: dict):
|
||||
"""Generate streaming chat completions."""
|
||||
messages = request_body.get("messages", [])
|
||||
|
||||
# Step 1: Extract origin, destination, and date
|
||||
route = await extract_flight_route(messages, request)
|
||||
origin = route.get("origin")
|
||||
destination = route.get("destination")
|
||||
travel_date = route.get("date")
|
||||
|
||||
# Step 2: Short circuit if missing origin or destination
|
||||
if not origin or not destination:
|
||||
missing = []
|
||||
if not origin:
|
||||
missing.append("origin city")
|
||||
if not destination:
|
||||
missing.append("destination city")
|
||||
|
||||
error_message = f"I need both origin and destination cities to search for flights. Please provide the {' and '.join(missing)}. For example: 'Flights from Seattle to Atlanta'"
|
||||
|
||||
error_chunk = {
|
||||
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": int(time.time()),
|
||||
"model": request_body.get("model", FLIGHT_MODEL),
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {"content": error_message},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(error_chunk)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
# Step 3: Resolve airport codes
|
||||
origin_code = await resolve_airport_code(origin, request)
|
||||
dest_code = await resolve_airport_code(destination, request)
|
||||
|
||||
if not origin_code or not dest_code:
|
||||
error_chunk = {
|
||||
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": int(time.time()),
|
||||
"model": request_body.get("model", FLIGHT_MODEL),
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {
|
||||
"content": f"I couldn't find airport codes for {origin if not origin_code else destination}. Please check the city name."
|
||||
},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(error_chunk)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
# Step 4: Get live flight data
|
||||
flight_data = await get_flights(origin_code, dest_code, travel_date)
|
||||
|
||||
# Determine date display for messages
|
||||
date_display = travel_date if travel_date else "today"
|
||||
|
||||
if not flight_data or not flight_data.get("flights"):
|
||||
# Check if there's a specific error message (e.g., date too far in future)
|
||||
error_detail = flight_data.get("error") if flight_data else None
|
||||
if error_detail:
|
||||
no_flights_message = error_detail
|
||||
else:
|
||||
no_flights_message = f"No direct flights found from {origin} ({origin_code}) to {destination} ({dest_code}) for {date_display}."
|
||||
|
||||
error_chunk = {
|
||||
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": int(time.time()),
|
||||
"model": request_body.get("model", FLIGHT_MODEL),
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {"content": no_flights_message},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(error_chunk)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
# Step 5: Prepare context for LLM - append flight data to last user message
|
||||
flight_context = f"""
|
||||
|
||||
Flight search results from {origin} ({origin_code}) to {destination} ({dest_code}):
|
||||
|
||||
Flight data in JSON format:
|
||||
{json.dumps(flight_data, indent=2)}
|
||||
|
||||
Present these {len(flight_data.get('flights', []))} flight(s) to the user in a clear, readable format."""
|
||||
|
||||
# Build message history with flight data appended to the last user message
|
||||
response_messages = [{"role": "system", "content": SYSTEM_PROMPT}]
|
||||
|
||||
for i, msg in enumerate(messages):
|
||||
# Append flight data to the last user message
|
||||
if i == len(messages) - 1 and msg.get("role") == "user":
|
||||
response_messages.append(
|
||||
{"role": "user", "content": msg.get("content") + flight_context}
|
||||
)
|
||||
else:
|
||||
response_messages.append(
|
||||
{"role": msg.get("role"), "content": msg.get("content")}
|
||||
)
|
||||
|
||||
# Log what we're sending to the LLM for debugging
|
||||
logger.info(f"Sending messages to LLM: {json.dumps(response_messages, indent=2)}")
|
||||
|
||||
# Step 6: Stream response
|
||||
try:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {"x-envoy-max-retries": "3"}
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
stream = await openai_client_via_plano.chat.completions.create(
|
||||
model=FLIGHT_MODEL,
|
||||
messages=response_messages,
|
||||
temperature=request_body.get("temperature", 0.7),
|
||||
max_tokens=request_body.get("max_tokens", 1000),
|
||||
stream=True,
|
||||
extra_headers=extra_headers,
|
||||
)
|
||||
|
||||
async for chunk in stream:
|
||||
if chunk.choices:
|
||||
yield f"data: {chunk.model_dump_json()}\n\n"
|
||||
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating flight response: {e}")
|
||||
error_chunk = {
|
||||
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": int(time.time()),
|
||||
"model": request_body.get("model", FLIGHT_MODEL),
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {
|
||||
"content": "I apologize, but I'm having trouble retrieving flight information right now. Please try again."
|
||||
},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(error_chunk)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
return {"status": "healthy", "agent": "flight_information"}
|
||||
|
||||
|
||||
def start_server(host: str = "localhost", port: int = 10520):
|
||||
"""Start the REST server."""
|
||||
uvicorn.run(
|
||||
app,
|
||||
host=host,
|
||||
port=port,
|
||||
log_config={
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"format": "%(asctime)s - [FLIGHT_AGENT] - %(levelname)s - %(message)s",
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stdout",
|
||||
},
|
||||
},
|
||||
"root": {
|
||||
"level": "INFO",
|
||||
"handlers": ["default"],
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
start_server(host="0.0.0.0", port=10520)
|
||||
426
docs/source/resources/includes/agents/weather.py
Normal file
426
docs/source/resources/includes/agents/weather.py
Normal file
|
|
@ -0,0 +1,426 @@
|
|||
import json
|
||||
import re
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from openai import AsyncOpenAI
|
||||
import os
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
import uvicorn
|
||||
from datetime import datetime, timedelta
|
||||
import httpx
|
||||
from typing import Optional
|
||||
from urllib.parse import quote
|
||||
from opentelemetry.propagate import extract, inject
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - [WEATHER_AGENT] - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Configuration for plano LLM gateway
|
||||
LLM_GATEWAY_ENDPOINT = os.getenv(
|
||||
"LLM_GATEWAY_ENDPOINT", "http://host.docker.internal:12001/v1"
|
||||
)
|
||||
WEATHER_MODEL = "openai/gpt-4o"
|
||||
LOCATION_MODEL = "openai/gpt-4o-mini"
|
||||
|
||||
# Initialize OpenAI client for plano
|
||||
openai_client_via_plano = AsyncOpenAI(
|
||||
base_url=LLM_GATEWAY_ENDPOINT,
|
||||
api_key="EMPTY",
|
||||
)
|
||||
|
||||
# FastAPI app for REST server
|
||||
app = FastAPI(title="Weather Forecast Agent", version="1.0.0")
|
||||
|
||||
# HTTP client for API calls
|
||||
http_client = httpx.AsyncClient(timeout=10.0)
|
||||
|
||||
|
||||
# Utility functions
|
||||
def celsius_to_fahrenheit(temp_c: Optional[float]) -> Optional[float]:
|
||||
"""Convert Celsius to Fahrenheit."""
|
||||
return round(temp_c * 9 / 5 + 32, 1) if temp_c is not None else None
|
||||
|
||||
|
||||
def get_user_messages(messages: list) -> list:
|
||||
"""Extract user messages from message list."""
|
||||
return [msg for msg in messages if msg.get("role") == "user"]
|
||||
|
||||
|
||||
def get_last_user_content(messages: list) -> str:
|
||||
"""Get the content of the most recent user message."""
|
||||
for msg in reversed(messages):
|
||||
if msg.get("role") == "user":
|
||||
return msg.get("content", "").lower()
|
||||
return ""
|
||||
|
||||
|
||||
async def get_weather_data(request: Request, messages: list, days: int = 1):
|
||||
"""Extract location from user's conversation and fetch weather data from Open-Meteo API.
|
||||
|
||||
This function does two things:
|
||||
1. Uses an LLM to extract the location from the user's message
|
||||
2. Fetches weather data for that location from Open-Meteo
|
||||
|
||||
Currently returns only current day weather. Want to add multi-day forecasts?
|
||||
"""
|
||||
|
||||
instructions = """Extract the location for WEATHER queries. Return just the city name.
|
||||
|
||||
Rules:
|
||||
1. For multi-part queries, extract ONLY the location mentioned with weather keywords ("weather in [location]")
|
||||
2. If user says "there" or "that city", it typically refers to the DESTINATION city in travel contexts (not the origin)
|
||||
3. For flight queries with weather, "there" means the destination city where they're traveling TO
|
||||
4. Return plain text (e.g., "London", "New York", "Paris, France")
|
||||
5. If no weather location found, return "NOT_FOUND"
|
||||
|
||||
Examples:
|
||||
- "What's the weather in London?" -> "London"
|
||||
- "Flights from Seattle to Atlanta, and show me the weather there" -> "Atlanta"
|
||||
- "Can you get me flights from Seattle to Atlanta tomorrow, and also please show me the weather there" -> "Atlanta"
|
||||
- "What's the weather in Seattle, and what is one flight that goes direct to Atlanta?" -> "Seattle"
|
||||
- User asked about flights to Atlanta, then "what's the weather like there?" -> "Atlanta"
|
||||
- "I'm going to Seattle" -> "Seattle"
|
||||
- "What's happening?" -> "NOT_FOUND"
|
||||
|
||||
Extract location:"""
|
||||
|
||||
try:
|
||||
user_messages = [
|
||||
msg.get("content") for msg in messages if msg.get("role") == "user"
|
||||
]
|
||||
|
||||
if not user_messages:
|
||||
location = "New York"
|
||||
else:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {}
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
# For location extraction, pass full conversation for context (e.g., "there" = previous destination)
|
||||
response = await openai_client_via_plano.chat.completions.create(
|
||||
model=LOCATION_MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": instructions},
|
||||
*[
|
||||
{"role": msg.get("role"), "content": msg.get("content")}
|
||||
for msg in messages
|
||||
],
|
||||
],
|
||||
temperature=0.1,
|
||||
max_tokens=50,
|
||||
extra_headers=extra_headers if extra_headers else None,
|
||||
)
|
||||
|
||||
location = response.choices[0].message.content.strip().strip("\"'`.,!?")
|
||||
logger.info(f"Location extraction result: '{location}'")
|
||||
|
||||
if not location or location.upper() == "NOT_FOUND":
|
||||
location = "New York"
|
||||
logger.info(f"Location not found, defaulting to: {location}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting location: {e}")
|
||||
location = "New York"
|
||||
|
||||
logger.info(f"Fetching weather for location: '{location}' (days: {days})")
|
||||
|
||||
# Step 2: Fetch weather data for the extracted location
|
||||
try:
|
||||
# Geocode city to get coordinates
|
||||
geocode_url = f"https://geocoding-api.open-meteo.com/v1/search?name={quote(location)}&count=1&language=en&format=json"
|
||||
geocode_response = await http_client.get(geocode_url)
|
||||
|
||||
if geocode_response.status_code != 200 or not geocode_response.json().get(
|
||||
"results"
|
||||
):
|
||||
logger.warning(f"Could not geocode {location}, using New York")
|
||||
location = "New York"
|
||||
geocode_url = f"https://geocoding-api.open-meteo.com/v1/search?name={quote(location)}&count=1&language=en&format=json"
|
||||
geocode_response = await http_client.get(geocode_url)
|
||||
|
||||
geocode_data = geocode_response.json()
|
||||
if not geocode_data.get("results"):
|
||||
return {
|
||||
"location": location,
|
||||
"weather": {
|
||||
"date": datetime.now().strftime("%Y-%m-%d"),
|
||||
"day_name": datetime.now().strftime("%A"),
|
||||
"temperature_c": None,
|
||||
"temperature_f": None,
|
||||
"weather_code": None,
|
||||
"error": "Could not retrieve weather data",
|
||||
},
|
||||
}
|
||||
|
||||
result = geocode_data["results"][0]
|
||||
location_name = result.get("name", location)
|
||||
latitude = result["latitude"]
|
||||
longitude = result["longitude"]
|
||||
|
||||
logger.info(
|
||||
f"Geocoded '{location}' to {location_name} ({latitude}, {longitude})"
|
||||
)
|
||||
|
||||
# Get weather forecast
|
||||
weather_url = (
|
||||
f"https://api.open-meteo.com/v1/forecast?"
|
||||
f"latitude={latitude}&longitude={longitude}&"
|
||||
f"current=temperature_2m&"
|
||||
f"daily=sunrise,sunset,temperature_2m_max,temperature_2m_min,weather_code&"
|
||||
f"forecast_days={days}&timezone=auto"
|
||||
)
|
||||
|
||||
weather_response = await http_client.get(weather_url)
|
||||
if weather_response.status_code != 200:
|
||||
return {
|
||||
"location": location_name,
|
||||
"weather": {
|
||||
"date": datetime.now().strftime("%Y-%m-%d"),
|
||||
"day_name": datetime.now().strftime("%A"),
|
||||
"temperature_c": None,
|
||||
"temperature_f": None,
|
||||
"weather_code": None,
|
||||
"error": "Could not retrieve weather data",
|
||||
},
|
||||
}
|
||||
|
||||
weather_data = weather_response.json()
|
||||
current_temp = weather_data.get("current", {}).get("temperature_2m")
|
||||
daily = weather_data.get("daily", {})
|
||||
|
||||
# Build forecast for requested number of days
|
||||
forecast = []
|
||||
for i in range(days):
|
||||
date_str = daily["time"][i]
|
||||
date_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||||
|
||||
temp_max = (
|
||||
daily.get("temperature_2m_max", [])[i]
|
||||
if daily.get("temperature_2m_max")
|
||||
else None
|
||||
)
|
||||
temp_min = (
|
||||
daily.get("temperature_2m_min", [])[i]
|
||||
if daily.get("temperature_2m_min")
|
||||
else None
|
||||
)
|
||||
weather_code = (
|
||||
daily.get("weather_code", [0])[i] if daily.get("weather_code") else 0
|
||||
)
|
||||
sunrise = daily.get("sunrise", [])[i] if daily.get("sunrise") else None
|
||||
sunset = daily.get("sunset", [])[i] if daily.get("sunset") else None
|
||||
|
||||
# Use current temp for today, otherwise use max temp
|
||||
temp_c = (
|
||||
temp_max
|
||||
if temp_max is not None
|
||||
else (current_temp if i == 0 and current_temp else temp_min)
|
||||
)
|
||||
|
||||
forecast.append(
|
||||
{
|
||||
"date": date_str.split("T")[0],
|
||||
"day_name": date_obj.strftime("%A"),
|
||||
"temperature_c": round(temp_c, 1) if temp_c is not None else None,
|
||||
"temperature_f": celsius_to_fahrenheit(temp_c),
|
||||
"temperature_max_c": round(temp_max, 1)
|
||||
if temp_max is not None
|
||||
else None,
|
||||
"temperature_min_c": round(temp_min, 1)
|
||||
if temp_min is not None
|
||||
else None,
|
||||
"weather_code": weather_code,
|
||||
"sunrise": sunrise.split("T")[1] if sunrise else None,
|
||||
"sunset": sunset.split("T")[1] if sunset else None,
|
||||
}
|
||||
)
|
||||
|
||||
return {"location": location_name, "forecast": forecast}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting weather data: {e}")
|
||||
return {
|
||||
"location": location,
|
||||
"weather": {
|
||||
"date": datetime.now().strftime("%Y-%m-%d"),
|
||||
"day_name": datetime.now().strftime("%A"),
|
||||
"temperature_c": None,
|
||||
"temperature_f": None,
|
||||
"weather_code": None,
|
||||
"error": "Could not retrieve weather data",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@app.post("/v1/chat/completions")
|
||||
async def handle_request(request: Request):
|
||||
"""HTTP endpoint for chat completions with streaming support."""
|
||||
|
||||
request_body = await request.json()
|
||||
messages = request_body.get("messages", [])
|
||||
logger.info(
|
||||
"messages detail json dumps: %s",
|
||||
json.dumps(messages, indent=2),
|
||||
)
|
||||
|
||||
traceparent_header = request.headers.get("traceparent")
|
||||
return StreamingResponse(
|
||||
invoke_weather_agent(request, request_body, traceparent_header),
|
||||
media_type="text/plain",
|
||||
headers={
|
||||
"content-type": "text/event-stream",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def invoke_weather_agent(
|
||||
request: Request, request_body: dict, traceparent_header: str = None
|
||||
):
|
||||
"""Generate streaming chat completions."""
|
||||
messages = request_body.get("messages", [])
|
||||
|
||||
# Detect if user wants multi-day forecast
|
||||
last_user_msg = get_last_user_content(messages)
|
||||
days = 1
|
||||
|
||||
if "forecast" in last_user_msg or "week" in last_user_msg:
|
||||
days = 7
|
||||
elif "tomorrow" in last_user_msg:
|
||||
days = 2
|
||||
|
||||
# Extract specific number of days if mentioned (e.g., "5 day forecast")
|
||||
import re
|
||||
|
||||
day_match = re.search(r"(\d{1,2})\s+day", last_user_msg)
|
||||
if day_match:
|
||||
requested_days = int(day_match.group(1))
|
||||
days = min(requested_days, 16) # API supports max 16 days
|
||||
|
||||
# Get live weather data (location extraction happens inside this function)
|
||||
weather_data = await get_weather_data(request, messages, days)
|
||||
|
||||
# Create weather context to append to user message
|
||||
forecast_type = "forecast" if days > 1 else "current weather"
|
||||
weather_context = f"""
|
||||
|
||||
Weather data for {weather_data['location']} ({forecast_type}):
|
||||
{json.dumps(weather_data, indent=2)}"""
|
||||
|
||||
# System prompt for weather agent
|
||||
instructions = """You are a weather assistant in a multi-agent system. You will receive weather data in JSON format with these fields:
|
||||
|
||||
- "location": City name
|
||||
- "forecast": Array of weather objects, each with date, day_name, temperature_c, temperature_f, temperature_max_c, temperature_min_c, weather_code, sunrise, sunset
|
||||
- weather_code: WMO code (0=clear, 1-3=partly cloudy, 45-48=fog, 51-67=rain, 71-86=snow, 95-99=thunderstorm)
|
||||
|
||||
Your task:
|
||||
1. Present the weather/forecast clearly for the location
|
||||
2. For single day: show current conditions
|
||||
3. For multi-day: show each day with date and conditions
|
||||
4. Include temperature in both Celsius and Fahrenheit
|
||||
5. Describe conditions naturally based on weather_code
|
||||
6. Use conversational language
|
||||
|
||||
Important: If the conversation includes information from other agents (like flight details), acknowledge and build upon that context naturally. Your primary focus is weather, but maintain awareness of the full conversation.
|
||||
|
||||
Remember: Only use the provided data. If fields are null, mention data is unavailable."""
|
||||
|
||||
# Build message history with weather data appended to the last user message
|
||||
response_messages = [{"role": "system", "content": instructions}]
|
||||
|
||||
for i, msg in enumerate(messages):
|
||||
# Append weather data to the last user message
|
||||
if i == len(messages) - 1 and msg.get("role") == "user":
|
||||
response_messages.append(
|
||||
{"role": "user", "content": msg.get("content") + weather_context}
|
||||
)
|
||||
else:
|
||||
response_messages.append(
|
||||
{"role": msg.get("role"), "content": msg.get("content")}
|
||||
)
|
||||
|
||||
try:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {"x-envoy-max-retries": "3"}
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
stream = await openai_client_via_plano.chat.completions.create(
|
||||
model=WEATHER_MODEL,
|
||||
messages=response_messages,
|
||||
temperature=request_body.get("temperature", 0.7),
|
||||
max_tokens=request_body.get("max_tokens", 1000),
|
||||
stream=True,
|
||||
extra_headers=extra_headers,
|
||||
)
|
||||
|
||||
async for chunk in stream:
|
||||
if chunk.choices:
|
||||
yield f"data: {chunk.model_dump_json()}\n\n"
|
||||
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating weather response: {e}")
|
||||
error_chunk = {
|
||||
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": int(time.time()),
|
||||
"model": request_body.get("model", WEATHER_MODEL),
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {
|
||||
"content": "I apologize, but I'm having trouble retrieving weather information right now. Please try again."
|
||||
},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(error_chunk)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
return {"status": "healthy", "agent": "weather_forecast"}
|
||||
|
||||
|
||||
def start_server(host: str = "localhost", port: int = 10510):
|
||||
"""Start the REST server."""
|
||||
uvicorn.run(
|
||||
app,
|
||||
host=host,
|
||||
port=port,
|
||||
log_config={
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"format": "%(asctime)s - [WEATHER_AGENT] - %(levelname)s - %(message)s",
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stdout",
|
||||
},
|
||||
},
|
||||
"root": {
|
||||
"level": "INFO",
|
||||
"handlers": ["default"],
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
start_server(host="0.0.0.0", port=10510)
|
||||
|
|
@ -1,100 +1,110 @@
|
|||
version: v0.1
|
||||
|
||||
# Arch Gateway configuration version
|
||||
version: v0.3.0
|
||||
|
||||
|
||||
# External HTTP agents - API type is controlled by request path (/v1/responses, /v1/messages, /v1/chat/completions)
|
||||
agents:
|
||||
- id: weather_agent # Example agent for weather
|
||||
url: http://host.docker.internal:10510
|
||||
|
||||
- id: flight_agent # Example agent for flights
|
||||
url: http://host.docker.internal:10520
|
||||
|
||||
|
||||
# MCP filters applied to requests/responses (e.g., input validation, query rewriting)
|
||||
filters:
|
||||
- id: input_guards # Example filter for input validation
|
||||
url: http://host.docker.internal:10500
|
||||
# type: mcp (default)
|
||||
# transport: streamable-http (default)
|
||||
# tool: input_guards (default - same as filter id)
|
||||
|
||||
|
||||
# LLM provider configurations with API keys and model routing
|
||||
model_providers:
|
||||
- model: openai/gpt-4o
|
||||
access_key: $OPENAI_API_KEY
|
||||
default: true
|
||||
|
||||
- model: openai/gpt-4o-mini
|
||||
access_key: $OPENAI_API_KEY
|
||||
|
||||
- model: anthropic/claude-sonnet-4-0
|
||||
access_key: $ANTHROPIC_API_KEY
|
||||
|
||||
- model: mistral/ministral-3b-latest
|
||||
access_key: $MISTRAL_API_KEY
|
||||
|
||||
|
||||
# Model aliases - use friendly names instead of full provider model names
|
||||
model_aliases:
|
||||
fast-llm:
|
||||
target: gpt-4o-mini
|
||||
|
||||
smart-llm:
|
||||
target: gpt-4o
|
||||
|
||||
|
||||
# HTTP listeners - entry points for agent routing, prompt targets, and direct LLM access
|
||||
listeners:
|
||||
ingress_traffic:
|
||||
# Agent listener for routing requests to multiple agents
|
||||
- type: agent
|
||||
name: travel_booking_service
|
||||
port: 8001
|
||||
router: plano_orchestrator_v1
|
||||
address: 0.0.0.0
|
||||
port: 10000
|
||||
message_format: openai
|
||||
timeout: 5s
|
||||
egress_traffic:
|
||||
agents:
|
||||
- id: rag_agent
|
||||
description: virtual assistant for retrieval augmented generation tasks
|
||||
filter_chain:
|
||||
- input_guards
|
||||
|
||||
# Model listener for direct LLM access
|
||||
- type: model
|
||||
name: model_1
|
||||
address: 0.0.0.0
|
||||
port: 12000
|
||||
message_format: openai
|
||||
timeout: 5s
|
||||
|
||||
# Arch creates a round-robin load balancing between different endpoints, managed via the cluster subsystem.
|
||||
# Prompt listener for function calling (for prompt_targets)
|
||||
- type: prompt
|
||||
name: prompt_function_listener
|
||||
address: 0.0.0.0
|
||||
port: 10000
|
||||
# This listener is used for prompt_targets and function calling
|
||||
|
||||
|
||||
# Reusable service endpoints
|
||||
endpoints:
|
||||
app_server:
|
||||
# value could be ip address or a hostname with port
|
||||
# this could also be a list of endpoints for load balancing
|
||||
# for example endpoint: [ ip1:port, ip2:port ]
|
||||
endpoint: 127.0.0.1:80
|
||||
# max time to wait for a connection to be established
|
||||
connect_timeout: 0.005s
|
||||
|
||||
mistral_local:
|
||||
endpoint: 127.0.0.1:8001
|
||||
|
||||
error_target:
|
||||
endpoint: error_target_1
|
||||
|
||||
# Centralized way to manage LLMs, manage keys, retry logic, failover and limits in a central way
|
||||
llm_providers:
|
||||
- name: openai/gpt-4o
|
||||
access_key: $OPENAI_API_KEY
|
||||
model: openai/gpt-4o
|
||||
default: true
|
||||
|
||||
- access_key: $MISTRAL_API_KEY
|
||||
model: mistral/mistral-8x7b
|
||||
|
||||
- model: mistral/mistral-7b-instruct
|
||||
base_url: http://mistral_local
|
||||
|
||||
# Model aliases - friendly names that map to actual provider names
|
||||
model_aliases:
|
||||
# Alias for summarization tasks -> fast/cheap model
|
||||
arch.summarize.v1:
|
||||
target: gpt-4o
|
||||
|
||||
# Alias for general purpose tasks -> latest model
|
||||
arch.v1:
|
||||
target: mistral-8x7b
|
||||
|
||||
# provides a way to override default settings for the arch system
|
||||
overrides:
|
||||
# By default Arch uses an NLI + embedding approach to match an incoming prompt to a prompt target.
|
||||
# The intent matching threshold is kept at 0.80, you can override this behavior if you would like
|
||||
prompt_target_intent_matching_threshold: 0.60
|
||||
|
||||
# default system prompt used by all prompt targets
|
||||
system_prompt: You are a network assistant that just offers facts; not advice on manufacturers or purchasing decisions.
|
||||
|
||||
prompt_guards:
|
||||
input_guards:
|
||||
jailbreak:
|
||||
on_exception:
|
||||
message: Looks like you're curious about my abilities, but I can only provide assistance within my programmed parameters.
|
||||
|
||||
# Prompt targets for function calling and API orchestration
|
||||
prompt_targets:
|
||||
- name: information_extraction
|
||||
default: true
|
||||
description: handel all scenarios that are question and answer in nature. Like summarization, information extraction, etc.
|
||||
endpoint:
|
||||
name: app_server
|
||||
path: /agent/summary
|
||||
http_method: POST
|
||||
# Arch uses the default LLM and treats the response from the endpoint as the prompt to send to the LLM
|
||||
auto_llm_dispatch_on_response: true
|
||||
# override system prompt for this prompt target
|
||||
system_prompt: You are a helpful information extraction assistant. Use the information that is provided to you.
|
||||
|
||||
- name: reboot_network_device
|
||||
description: Reboot a specific network device
|
||||
endpoint:
|
||||
name: app_server
|
||||
path: /agent/action
|
||||
- name: get_current_weather
|
||||
description: Get current weather at a location.
|
||||
parameters:
|
||||
- name: device_id
|
||||
type: str
|
||||
description: Identifier of the network device to reboot.
|
||||
- name: location
|
||||
description: The location to get the weather for
|
||||
required: true
|
||||
- name: confirmation
|
||||
type: bool
|
||||
description: Confirmation flag to proceed with reboot.
|
||||
default: false
|
||||
enum: [true, false]
|
||||
type: string
|
||||
format: City, State
|
||||
- name: days
|
||||
description: the number of days for the request
|
||||
required: true
|
||||
type: int
|
||||
endpoint:
|
||||
name: app_server
|
||||
path: /weather
|
||||
http_method: POST
|
||||
|
||||
|
||||
# OpenTelemetry tracing configuration
|
||||
tracing:
|
||||
# sampling rate. Note by default Arch works on OpenTelemetry compatible tracing.
|
||||
sampling_rate: 0.1
|
||||
# Random sampling percentage (1-100)
|
||||
random_sampling: 100
|
||||
|
|
|
|||
|
|
@ -1,15 +1,50 @@
|
|||
agents:
|
||||
- id: weather_agent
|
||||
url: http://host.docker.internal:10510
|
||||
- id: flight_agent
|
||||
url: http://host.docker.internal:10520
|
||||
endpoints:
|
||||
app_server:
|
||||
connect_timeout: 0.005s
|
||||
endpoint: 127.0.0.1
|
||||
port: 80
|
||||
error_target:
|
||||
endpoint: error_target_1
|
||||
port: 80
|
||||
flight_agent:
|
||||
endpoint: host.docker.internal
|
||||
port: 10520
|
||||
protocol: http
|
||||
input_guards:
|
||||
endpoint: host.docker.internal
|
||||
port: 10500
|
||||
protocol: http
|
||||
mistral_local:
|
||||
endpoint: 127.0.0.1
|
||||
port: 8001
|
||||
weather_agent:
|
||||
endpoint: host.docker.internal
|
||||
port: 10510
|
||||
protocol: http
|
||||
filters:
|
||||
- id: input_guards
|
||||
url: http://host.docker.internal:10500
|
||||
listeners:
|
||||
- address: 0.0.0.0
|
||||
agents:
|
||||
- description: virtual assistant for retrieval augmented generation tasks
|
||||
filter_chain:
|
||||
- input_guards
|
||||
id: rag_agent
|
||||
name: travel_booking_service
|
||||
port: 8001
|
||||
router: plano_orchestrator_v1
|
||||
type: agent
|
||||
- address: 0.0.0.0
|
||||
name: model_1
|
||||
port: 12000
|
||||
type: model
|
||||
- address: 0.0.0.0
|
||||
name: prompt_function_listener
|
||||
port: 10000
|
||||
type: prompt
|
||||
- address: 0.0.0.0
|
||||
model_providers:
|
||||
- access_key: $OPENAI_API_KEY
|
||||
|
|
@ -17,49 +52,44 @@ listeners:
|
|||
model: gpt-4o
|
||||
name: openai/gpt-4o
|
||||
provider_interface: openai
|
||||
- access_key: $OPENAI_API_KEY
|
||||
model: gpt-4o-mini
|
||||
name: openai/gpt-4o-mini
|
||||
provider_interface: openai
|
||||
- access_key: $ANTHROPIC_API_KEY
|
||||
model: claude-sonnet-4-0
|
||||
name: anthropic/claude-sonnet-4-0
|
||||
provider_interface: anthropic
|
||||
- access_key: $MISTRAL_API_KEY
|
||||
model: mistral-8x7b
|
||||
name: mistral/mistral-8x7b
|
||||
provider_interface: mistral
|
||||
- base_url: http://mistral_local
|
||||
cluster_name: mistral_mistral_local
|
||||
endpoint: mistral_local
|
||||
model: mistral-7b-instruct
|
||||
name: mistral/mistral-7b-instruct
|
||||
port: 80
|
||||
protocol: http
|
||||
model: ministral-3b-latest
|
||||
name: mistral/ministral-3b-latest
|
||||
provider_interface: mistral
|
||||
name: egress_traffic
|
||||
port: 12000
|
||||
timeout: 5s
|
||||
timeout: 30s
|
||||
type: model_listener
|
||||
- address: 0.0.0.0
|
||||
name: ingress_traffic
|
||||
port: 10000
|
||||
timeout: 5s
|
||||
type: prompt_listener
|
||||
model_aliases:
|
||||
arch.summarize.v1:
|
||||
fast-llm:
|
||||
target: gpt-4o-mini
|
||||
smart-llm:
|
||||
target: gpt-4o
|
||||
arch.v1:
|
||||
target: mistral-8x7b
|
||||
model_providers:
|
||||
- access_key: $OPENAI_API_KEY
|
||||
default: true
|
||||
model: gpt-4o
|
||||
name: openai/gpt-4o
|
||||
provider_interface: openai
|
||||
- access_key: $OPENAI_API_KEY
|
||||
model: gpt-4o-mini
|
||||
name: openai/gpt-4o-mini
|
||||
provider_interface: openai
|
||||
- access_key: $ANTHROPIC_API_KEY
|
||||
model: claude-sonnet-4-0
|
||||
name: anthropic/claude-sonnet-4-0
|
||||
provider_interface: anthropic
|
||||
- access_key: $MISTRAL_API_KEY
|
||||
model: mistral-8x7b
|
||||
name: mistral/mistral-8x7b
|
||||
provider_interface: mistral
|
||||
- base_url: http://mistral_local
|
||||
cluster_name: mistral_mistral_local
|
||||
endpoint: mistral_local
|
||||
model: mistral-7b-instruct
|
||||
name: mistral/mistral-7b-instruct
|
||||
port: 80
|
||||
protocol: http
|
||||
model: ministral-3b-latest
|
||||
name: mistral/ministral-3b-latest
|
||||
provider_interface: mistral
|
||||
- model: Arch-Function
|
||||
name: arch-function
|
||||
|
|
@ -67,45 +97,23 @@ model_providers:
|
|||
- model: Plano-Orchestrator
|
||||
name: plano-orchestrator
|
||||
provider_interface: arch
|
||||
overrides:
|
||||
prompt_target_intent_matching_threshold: 0.6
|
||||
prompt_guards:
|
||||
input_guards:
|
||||
jailbreak:
|
||||
on_exception:
|
||||
message: Looks like you're curious about my abilities, but I can only provide
|
||||
assistance within my programmed parameters.
|
||||
prompt_targets:
|
||||
- auto_llm_dispatch_on_response: true
|
||||
default: true
|
||||
description: handel all scenarios that are question and answer in nature. Like summarization,
|
||||
information extraction, etc.
|
||||
- description: Get current weather at a location.
|
||||
endpoint:
|
||||
http_method: POST
|
||||
name: app_server
|
||||
path: /agent/summary
|
||||
name: information_extraction
|
||||
system_prompt: You are a helpful information extraction assistant. Use the information
|
||||
that is provided to you.
|
||||
- description: Reboot a specific network device
|
||||
endpoint:
|
||||
name: app_server
|
||||
path: /agent/action
|
||||
name: reboot_network_device
|
||||
path: /weather
|
||||
name: get_current_weather
|
||||
parameters:
|
||||
- description: Identifier of the network device to reboot.
|
||||
name: device_id
|
||||
- description: The location to get the weather for
|
||||
format: City, State
|
||||
name: location
|
||||
required: true
|
||||
type: str
|
||||
- default: false
|
||||
description: Confirmation flag to proceed with reboot.
|
||||
enum:
|
||||
- true
|
||||
- false
|
||||
name: confirmation
|
||||
type: bool
|
||||
system_prompt: You are a network assistant that just offers facts; not advice on manufacturers
|
||||
or purchasing decisions.
|
||||
type: string
|
||||
- description: the number of days for the request
|
||||
name: days
|
||||
required: true
|
||||
type: int
|
||||
tracing:
|
||||
sampling_rate: 0.1
|
||||
version: v0.1
|
||||
random_sampling: 100
|
||||
version: v0.3.0
|
||||
|
|
|
|||
|
|
@ -1,14 +1,12 @@
|
|||
version: v0.1
|
||||
|
||||
listeners:
|
||||
egress_traffic:
|
||||
- type: model
|
||||
name: model_proxy_listener
|
||||
address: 0.0.0.0
|
||||
port: 12000
|
||||
message_format: openai
|
||||
timeout: 30s
|
||||
|
||||
llm_providers:
|
||||
|
||||
model_providers:
|
||||
# OpenAI Models
|
||||
- model: openai/gpt-5-mini-2025-08-07
|
||||
access_key: $OPENAI_API_KEY
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
version: v0.3.0
|
||||
|
||||
agents:
|
||||
- id: rag_agent
|
||||
url: http://host.docker.internal:10505
|
||||
|
||||
filters:
|
||||
- id: query_rewriter
|
||||
url: http://host.docker.internal:10501
|
||||
# type: mcp # default is mcp
|
||||
# transport: streamable-http # default is streamable-http
|
||||
# tool: query_rewriter # default name is the filter id
|
||||
- id: context_builder
|
||||
url: http://host.docker.internal:10502
|
||||
|
||||
model_providers:
|
||||
- model: openai/gpt-4o-mini
|
||||
access_key: $OPENAI_API_KEY
|
||||
default: true
|
||||
- model: openai/gpt-4o
|
||||
access_key: $OPENAI_API_KEY
|
||||
|
||||
model_aliases:
|
||||
fast-llm:
|
||||
target: gpt-4o-mini
|
||||
smart-llm:
|
||||
target: gpt-4o
|
||||
|
||||
listeners:
|
||||
- type: agent
|
||||
name: agent_1
|
||||
port: 8001
|
||||
router: arch_agent_router
|
||||
agents:
|
||||
- id: rag_agent
|
||||
description: virtual assistant for retrieval augmented generation tasks
|
||||
filter_chain:
|
||||
- query_rewriter
|
||||
- context_builder
|
||||
tracing:
|
||||
random_sampling: 100
|
||||
6
docs/source/resources/llms_txt.rst
Normal file
6
docs/source/resources/llms_txt.rst
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
llms.txt
|
||||
================
|
||||
|
||||
This project generates a single plaintext file containing the compiled text of all documentation pages, useful for large context models to reference Plano documentation.
|
||||
|
||||
Open it here: `llms.txt <../includes/llms.txt>`_
|
||||
21
docs/source/resources/tech_overview/model_serving.rst
Normal file
21
docs/source/resources/tech_overview/model_serving.rst
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
.. _bright_staff:
|
||||
|
||||
Bright Staff
|
||||
============
|
||||
|
||||
Bright Staff is Plano's memory-efficient, lightweight controller for agentic traffic. It sits inside the Plano
|
||||
data plane and makes real-time decisions about how prompts are handled, forwarded, and processed.
|
||||
|
||||
Rather than running a separate "model server" subsystem, Plano relies on Envoy's HTTP connection management
|
||||
and cluster subsystem to talk to different models and backends over HTTP(S). Bright Staff uses these primitives to:
|
||||
* Inspect prompts, conversation state, and metadata.
|
||||
* Decide which upstream model(s), tool backends, or APIs to call, and in what order.
|
||||
* Coordinate retries, fallbacks, and traffic splitting across providers and models.
|
||||
|
||||
Plano is designed to run alongside your application servers in your cloud VPC, on-premises, or in local
|
||||
development. It does not require a GPU itself; GPUs live where your models are hosted (third-party APIs or your
|
||||
own deployments), and Plano reaches them via HTTP.
|
||||
|
||||
.. image:: /_static/img/plano-system-architecture.png
|
||||
:align: center
|
||||
:width: 40%
|
||||
145
docs/source/resources/tech_overview/request_lifecycle.rst
Normal file
145
docs/source/resources/tech_overview/request_lifecycle.rst
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
.. _lifecycle_of_a_request:
|
||||
|
||||
Request Lifecycle
|
||||
=================
|
||||
|
||||
Below we describe the events in the lifecycle of a request passing through a Plano instance. We first
|
||||
describe how Plano fits into the request path and then the internal events that take place following
|
||||
the arrival of a request at Plano from downstream clients. We follow the request until the corresponding
|
||||
dispatch upstream and the response path.
|
||||
|
||||
.. image:: /_static/img/network-topology-ingress-egress.png
|
||||
:width: 100%
|
||||
:align: center
|
||||
|
||||
Network topology
|
||||
----------------
|
||||
|
||||
How a request flows through the components in a network (including Plano) depends on the network’s topology.
|
||||
Plano can be used in a wide variety of networking topologies. We focus on the inner operations of Plano below,
|
||||
but briefly we address how Plano relates to the rest of the network in this section.
|
||||
|
||||
- **Downstream(Ingress)** listeners take requests from upstream clients like a web UI or clients that forward
|
||||
prompts to you local application responses from the application flow back through Plano to the downstream.
|
||||
|
||||
- **Upstream(Egress)** listeners take requests from the application and forward them to LLMs.
|
||||
|
||||
High level architecture
|
||||
-----------------------
|
||||
Plano is a set of **two** self-contained processes that are designed to run alongside your application servers
|
||||
(or on a separate server connected to your application servers via a network).
|
||||
|
||||
The first process is designated to manage HTTP-level networking and connection management concerns (protocol management, request id generation, header sanitization, etc.), and the other process is a **controller**, which helps Plano make intelligent decisions about the incoming prompts. The controller hosts the purpose-built LLMs to manage several critical, but undifferentiated, prompt related tasks on behalf of developers.
|
||||
|
||||
|
||||
The request processing path in Plano has three main parts:
|
||||
|
||||
* :ref:`Listener subsystem <plano_overview_listeners>` which handles **downstream** and **upstream** request
|
||||
processing. It is responsible for managing the inbound(edge) and outbound(egress) request lifecycle. The downstream and upstream HTTP/2 codec lives here. This also includes the lifecycle of any **upstream** connection to an LLM provider or tool backend. The listenser subsystmem manages connection pools, load balancing, retries, and failover.
|
||||
|
||||
* :ref:`Bright Staff controller subsystem <bright_staff>` is Plano's memory-efficient, lightweight controller for agentic traffic. It sits inside the Plano data plane and makes real-time decisions about how prompts are handled, forwarded, and processed.
|
||||
|
||||
These two subsystems are bridged with either the HTTP router filter, and the cluster manager subsystems of Envoy.
|
||||
|
||||
Also, Plano utilizes `Envoy event-based thread model <https://blog.envoyproxy.io/envoy-threading-model-a8d44b922310>`_. A main thread is responsible for the server lifecycle, configuration processing, stats, etc. and some number of :ref:`worker threads <arch_overview_threading>` process requests. All threads operate around an event loop (`libevent <https://libevent.org/>`_) and any given downstream TCP connection will be handled by exactly one worker thread for its lifetime. Each worker thread maintains its own pool of TCP connections to upstream endpoints.
|
||||
|
||||
Worker threads rarely share state and operate in a trivially parallel fashion. This threading model
|
||||
enables scaling to very high core count CPUs.
|
||||
|
||||
Request Flow (Ingress)
|
||||
----------------------
|
||||
|
||||
A brief outline of the lifecycle of a request and response using the example configuration above:
|
||||
|
||||
1. **TCP Connection Establishment**:
|
||||
A TCP connection from downstream is accepted by an Plano listener running on a worker thread.
|
||||
The listener filter chain provides SNI and other pre-TLS information. The transport socket, typically TLS,
|
||||
decrypts incoming data for processing.
|
||||
|
||||
3. **Routing Decision (Agent vs Prompt Target)**:
|
||||
The decrypted data stream is de-framed by the HTTP/2 codec in Plano's HTTP connection manager. Plano performs
|
||||
intent matching (via the Bright Staff controller and prompt-handling logic) using the configured agents and
|
||||
:ref:`prompt targets <prompt_target>`, determining whether this request should be handled by an agent workflow
|
||||
(with optional :ref:`Filter Chains <filter_chain>`) or by a deterministic prompt target.
|
||||
|
||||
4a. **Agent Path: Orchestration and Filter Chains**
|
||||
|
||||
If the request is routed to an **agent**, Plano executes any attached :ref:`Filter Chains <filter_chain>` first. These filters can apply guardrails, rewrite prompts, or enrich context (for example, RAG retrieval) before the agent runs. Once filters complete, the Bright Staff controller orchestrates which downstream tools, APIs, or LLMs the agent should call and in what sequence.
|
||||
|
||||
* Plano may call one or more backend APIs or tools on behalf of the agent.
|
||||
* If an endpoint cluster is identified, load balancing is performed, circuit breakers are checked, and the request is proxied to the appropriate upstream endpoint.
|
||||
* If no specific endpoint is required, the prompt is sent to an upstream LLM using Plano's model proxy for
|
||||
completion or summarization.
|
||||
|
||||
For more on agent workflows and orchestration, see :ref:`Prompt Targets and Agents <prompt_target>` and
|
||||
:ref:`Agent Filter Chains <filter_chain>`.
|
||||
|
||||
4b. **Prompt Target Path: Deterministic Tool/API Calls**
|
||||
|
||||
If the request is routed to a **prompt target**, Plano treats it as a deterministic, task-specific call.
|
||||
Plano engages its function-calling and parameter-gathering capabilities to extract the necessary details
|
||||
from the incoming prompt(s) and produce the structured inputs your backend expects.
|
||||
|
||||
* **Parameter Gathering**: Plano extracts and validates parameters defined on the prompt target (for example,
|
||||
currency symbols, dates, or entity identifiers) so your backend does not need to parse natural language.
|
||||
* **API Call Execution**: Plano then routes the call to the configured backend endpoint. If an endpoint cluster is identified, load balancing and circuit-breaker checks are applied before proxying the request upstream.
|
||||
|
||||
For more on how to design and configure prompt targets, see :ref:`Prompt Target <prompt_target>`.
|
||||
|
||||
5. **Error Handling and Forwarding**:
|
||||
Errors encountered during processing, such as failed function calls or guardrail detections, are forwarded to
|
||||
designated error targets. Error details are communicated through specific headers to the application:
|
||||
|
||||
- ``X-Function-Error-Code``: Code indicating the type of function call error.
|
||||
- ``X-Prompt-Guard-Error-Code``: Code specifying violations detected by prompt guardrails.
|
||||
- Additional headers carry messages and timestamps to aid in debugging and logging.
|
||||
|
||||
6. **Response Handling**:
|
||||
The upstream endpoint’s TLS transport socket encrypts the response, which is then proxied back downstream.
|
||||
Responses pass through HTTP filters in reverse order, ensuring any necessary processing or modification before final delivery.
|
||||
|
||||
|
||||
Request Flow (Egress)
|
||||
---------------------
|
||||
|
||||
A brief outline of the lifecycle of a request and response in the context of egress traffic from an application to Large Language Models (LLMs) via Plano:
|
||||
|
||||
1. **HTTP Connection Establishment to LLM**:
|
||||
Plano initiates an HTTP connection to the upstream LLM service. This connection is handled by Plano’s egress listener running on a worker thread. The connection typically uses a secure transport protocol such as HTTPS, ensuring the prompt data is encrypted before being sent to the LLM service.
|
||||
|
||||
2. **Rate Limiting**:
|
||||
Before sending the request to the LLM, Plano applies rate-limiting policies to ensure that the upstream LLM service is not overwhelmed by excessive traffic. Rate limits are enforced per client or service, ensuring fair usage and preventing accidental or malicious overload. If the rate limit is exceeded, Plano may return an appropriate HTTP error (e.g., 429 Too Many Requests) without sending the prompt to the LLM.
|
||||
|
||||
3. **Seamless Request Transformation and Smart Routing**:
|
||||
After rate limiting, Plano normalizes the outgoing request into a provider-agnostic shape and applies smart routing decisions using the configured :ref:`LLM Providers <llm_providers>`. This includes translating client-specific conventions into a unified OpenAI-style contract, enriching or overriding parameters (for example, temperature or max tokens) based on policy, and choosing the best target model or provider using :ref:`model-based, alias-based, or preference-aligned routing <llm_providers>`.
|
||||
|
||||
4. **Load Balancing to (hosted) LLM Endpoints**:
|
||||
After smart routing selects the target provider/model, Plano routes the prompt to the appropriate LLM endpoint.
|
||||
If multiple LLM provider instances are available, load balancing is performed to distribute traffic evenly
|
||||
across the instances. Plano checks the health of the LLM endpoints using circuit breakers and health checks,
|
||||
ensuring that the prompt is only routed to a healthy, responsive instance.
|
||||
|
||||
5. **Response Reception and Forwarding**:
|
||||
Once the LLM processes the prompt, Plano receives the response from the LLM service. The response is typically a generated text, completion, or summarization. Upon reception, Plano decrypts (if necessary) and handles the response, passing it through any egress processing pipeline defined by the application, such as logging or additional response filtering.
|
||||
|
||||
|
||||
Post-request processing
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Once a request completes, the stream is destroyed. The following also takes places:
|
||||
|
||||
* The post-request :ref:`monitoring <monitoring>` are updated (e.g. timing, active requests, upgrades, health checks).
|
||||
Some statistics are updated earlier however, during request processing. Stats are batched and written by the main
|
||||
thread periodically.
|
||||
* :ref:`Access logs <arch_access_logging>` are written to the access log
|
||||
* :ref:`Trace <arch_overview_tracing>` spans are finalized. If our example request was traced, a
|
||||
trace span, describing the duration and details of the request would be created by the HCM when
|
||||
processing request headers and then finalized by the HCM during post-request processing.
|
||||
|
||||
|
||||
Configuration
|
||||
-------------
|
||||
|
||||
Today, only support a static bootstrap configuration file for simplicity today:
|
||||
|
||||
.. literalinclude:: ../../concepts/includes/plano_config.yaml
|
||||
:language: yaml
|
||||
11
docs/source/resources/tech_overview/tech_overview.rst
Normal file
11
docs/source/resources/tech_overview/tech_overview.rst
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
.. _tech_overview:
|
||||
|
||||
Tech Overview
|
||||
=============
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
||||
request_lifecycle
|
||||
model_serving
|
||||
threading_model
|
||||
21
docs/source/resources/tech_overview/threading_model.rst
Normal file
21
docs/source/resources/tech_overview/threading_model.rst
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
.. _arch_overview_threading:
|
||||
|
||||
Threading Model
|
||||
===============
|
||||
|
||||
Plano builds on top of Envoy's single process with multiple threads architecture.
|
||||
|
||||
A single *primary* thread controls various sporadic coordination tasks while some number of *worker*
|
||||
threads perform filtering, and forwarding.
|
||||
|
||||
Once a connection is accepted, the connection spends the rest of its lifetime bound to a single worker
|
||||
thread. All the functionality around prompt handling from a downstream client is handled in a separate worker thread.
|
||||
This allows the majority of Plano to be largely single threaded (embarrassingly parallel) with a small amount
|
||||
of more complex code handling coordination between the worker threads.
|
||||
|
||||
Generally, Plano is written to be 100% non-blocking.
|
||||
|
||||
.. tip::
|
||||
|
||||
For most workloads we recommend configuring the number of worker threads to be equal to the number of
|
||||
hardware threads on the machine.
|
||||
Loading…
Add table
Add a link
Reference in a new issue