orchestration guide changes

This commit is contained in:
Salman Paracha 2025-12-23 16:07:17 -08:00
parent e3f7ae1cbe
commit 8cfefa38a2
5 changed files with 1063 additions and 378 deletions

View file

@ -1 +1 @@
docs.archgw.com
docs.planoai.dev

View file

@ -6,7 +6,7 @@ Orchestration
Building multi-agent systems allow you to route requests across multiple specialized agents, each designed to handle specific types of tasks.
Plano makes it easy to build and scale these systems by managing the orchestration layer—deciding which agent(s) should handle each request—while you focus on implementing individual agent logic.
This guide shows you how to configure and implement multi-agent orchestration in Plano using a real-world example: a **Travel Booking Assistant** that routes queries to specialized agents for weather, flights, and currency exchange.
This guide shows you how to configure and implement multi-agent orchestration in Plano using a real-world example: a **Travel Booking Assistant** that routes queries to specialized agents for weather and flights.
How It Works
------------
@ -14,61 +14,28 @@ How It Works
Plano's orchestration layer analyzes incoming prompts and routes them to the most appropriate agent based on user intent and conversation context. The workflow is:
1. **User submits a prompt**: The request arrives at Plano's agent listener.
2. **Agent selection**: Plano analyzes the prompt to determine user intent and complexity, and routes the request to the most suitable agent configured in your system—such as a weather agent, flight agent, or currency agent.
3. **Agent handles request**: The selected agent processes the prompt using its specialized logic and tools.
2. **Agent selection**: Plano uses an LLM to analyze the prompt and determine user intent and complexity. By default, this uses `Plano-Orchestrator-30B-A3B <https://huggingface.co/collections/katanemo/plano-orchestrator>`_, which offers performance of foundation models at 1/10th the cost. The LLM routes the request to the most suitable agent configured in your system—such as a weather agent or flight agent.
3. **Agent handles request**: Once the selected agent receives the request object from Plano, it manages its own :ref:`inner loop <agents>` until the task is complete. This means the agent autonomously calls models, invokes tools, processes data, and reasons about next steps—all within its specialized domain—before returning the final response.
4. **Seamless handoffs**: For multi-turn conversations, Plano repeats the intent analysis for each follow-up query, enabling smooth handoffs between agents as the conversation evolves.
Example: Travel Booking Assistant
----------------------------------
Let's walk through a complete multi-agent system: a Travel Booking Assistant that helps users plan trips by providing weather forecasts, flight information, and currency exchange rates. This system uses three specialized agents:
Let's walk through a complete multi-agent system: a Travel Booking Assistant that helps users plan trips by providing weather forecasts and flight information. This system uses two specialized agents:
* **Weather Agent**: Provides real-time weather conditions and forecasts
* **Flight Agent**: Searches for flights between airports
* **Currency Agent**: Provides currency exchange rates and conversions
* **Weather Agent**: Provides real-time weather conditions and multi-day forecasts
* **Flight Agent**: Searches for flights between airports with real-time tracking
Configuration
-------------
Configure your agents in the ``listeners`` section of your ``plano_config.yaml``:
.. code-block:: yaml
.. literalinclude:: ../resources/includes/agents/agents_config.yaml
:language: yaml
:linenos:
:caption: Travel Booking Multi-Agent Configuration
version: v0.3.0
agents:
- id: weather_agent
url: http://host.docker.internal:10510
- id: flight_agent
url: http://host.docker.internal:10520
- id: currency_agent
url: http://host.docker.internal:10530
model_providers:
- model: openai/gpt-4o
access_key: $OPENAI_API_KEY
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY
listeners:
- type: agent
name: travel_booking_service
port: 8001
router: plano_orchestrator_v1
agents:
- id: weather_agent
description: Get real-time weather conditions and multi-day forecasts for any city worldwide using Open-Meteo API (free, no API key needed). Provides current temperature, multi-day forecasts, weather conditions, sunrise/sunset times, and detailed weather information. Understands conversation context to resolve location references from previous messages.
- id: flight_agent
description: Get live flight information between airports using FlightAware AeroAPI. Shows real-time flight status, scheduled/estimated/actual departure and arrival times, gate and terminal information, delays, aircraft type, and flight status. Automatically resolves city names to airport codes (IATA/ICAO). Understands conversation context to infer origin/destination from follow-up questions. Supports queries like "What flights go from London to Seattle?" or "Do they fly out from Seattle?" (using context from previous messages).
- id: currency_agent
description: Get real-time currency exchange rates and perform currency conversions using Frankfurter API (free, no API key needed). Provides latest exchange rates, currency conversions with amount calculations, and supports any currency pair. Automatically extracts currency codes from country names and conversation context. Understands pronouns like "their currency" when referring to previously mentioned countries. Uses standard 3-letter ISO currency codes (e.g., USD, EUR, GBP, JPY, PKR).
tracing:
random_sampling: 100
**Key Configuration Elements:**
* **agent listener**: A listener of ``type: agent`` tells Plano to perform intent analysis and routing for incoming requests.
@ -78,12 +45,40 @@ Configure your agents in the ``listeners`` section of your ``plano_config.yaml``
**Writing Effective Agent Descriptions**
Agent descriptions are critical—they're used by Plano-Orchestrator to make routing decisions. Be specific about:
Agent descriptions are critical—they're used by Plano-Orchestrator to make routing decisions. Effective descriptions should include:
* What the agent does (e.g., "Get real-time weather conditions")
* What APIs or tools it uses (e.g., "using Open-Meteo API")
* What information it provides (e.g., "current temperature, multi-day forecasts")
* How it handles context (e.g., "Understands conversation context to resolve location references")
* **Clear introduction**: A concise statement explaining what the agent is and its primary purpose
* **Capabilities section**: A bulleted list of specific capabilities, including:
* What APIs or data sources it uses (e.g., "Open-Meteo API", "FlightAware AeroAPI")
* What information it provides (e.g., "current temperature", "multi-day forecasts", "gate information")
* How it handles context (e.g., "Understands conversation context to resolve location references")
* What question patterns it handles (e.g., "What's the weather in [city]?")
* How it handles multi-part queries (e.g., "When queries include both weather and flights, this agent answers ONLY the weather part")
Here's an example of a well-structured agent description:
.. code-block:: yaml
- id: weather_agent
description: |
WeatherAgent is a specialized AI assistant for real-time weather information
and forecasts. It provides accurate weather data for any city worldwide using
the Open-Meteo API, helping travelers plan their trips with up-to-date weather
conditions.
Capabilities:
* Get real-time weather conditions and multi-day forecasts for any city worldwide
* Provides current temperature, weather conditions, sunrise/sunset times
* Provides detailed weather information including multi-day forecasts
* Understands conversation context to resolve location references from previous messages
* Handles weather-related questions including "What's the weather in [city]?"
* When queries include both weather and other travel questions (e.g., flights),
this agent answers ONLY the weather part
.. note::
We will soon support "Agents as Tools" via Model Context Protocol (MCP), enabling agents to dynamically discover and invoke other agents as tools. Track progress on `GitHub Issue #646 <https://github.com/katanemo/archgw/issues/646>`_.
Implementation
--------------
@ -95,41 +90,17 @@ Agent Structure
Let's examine the Weather Agent implementation:
.. code-block:: python
.. literalinclude:: ../resources/includes/agents/weather.py
:language: python
:linenos:
:lines: 262-283
:emphasize-lines: 1-2, 13-21
:caption: Weather Agent - Core Structure
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import os
import httpx
from .api import ChatCompletionRequest, ChatCompletionStreamResponse
# Configuration
PLANO_ENDPOINT = os.getenv("PLANO_ENDPOINT", "http://localhost:12000/v1")
WEATHER_MODEL = "openai/gpt-4o"
LOCATION_MODEL = "openai/gpt-4o-mini"
# Initialize OpenAI client for Plano
plano = AsyncOpenAI(
base_url=PLANO_ENDPOINT,
api_key="EMPTY",
)
app = FastAPI(title="Weather Forecast Agent")
@app.post("/v1/chat/completions")
async def chat_completion_http(request: Request, request_body: ChatCompletionRequest):
"""HTTP endpoint for chat completions with streaming support."""
return StreamingResponse(
stream_chat_completions(request_body),
media_type="text/event-stream",
)
**Key Points:**
* Agents expose a ``/v1/chat/completions`` endpoint that matches OpenAI's API format
* They use Plano's LLM gateway (via ``PLANO_ENDPOINT``) for all LLM calls
* They use Plano's LLM gateway (via ``LLM_GATEWAY_ENDPOINT``) for all LLM calls
* They receive the full conversation history in ``request_body.messages``
Information Extraction with LLMs
@ -139,95 +110,22 @@ Agents use LLMs to extract structured information from natural language queries.
The Weather Agent extracts location information:
.. code-block:: python
.. literalinclude:: ../resources/includes/agents/weather.py
:language: python
:linenos:
:lines: 72-119
:emphasize-lines: 108-128
:caption: Weather Agent - Location Extraction
LOCATION_EXTRACTION_PROMPT = """You are a location extraction assistant. Your ONLY job is to extract the geographic location (city, state, country, etc.) from user messages.
CRITICAL RULES:
1. Extract ONLY the location name - nothing else
2. Return just the location name in plain text (e.g., "London", "New York", "Paris, France")
3. If the user mentions multiple locations, extract the PRIMARY location they're asking about
4. Ignore error messages, HTML tags, and assistant responses
5. If no clear location is found, return exactly: "NOT_FOUND"
"""
async def extract_location_from_messages(messages):
"""Extract location from user messages using LLM."""
user_messages = [msg for msg in messages if msg.role == "user"]
if not user_messages:
return "New York" # Default fallback
# Get the most recent user message
user_content = user_messages[-1].content.strip()
response = await plano.chat.completions.create(
model=LOCATION_MODEL,
messages=[
{"role": "system", "content": LOCATION_EXTRACTION_PROMPT},
{"role": "user", "content": user_content},
],
temperature=0.1,
max_tokens=50,
)
location = response.choices[0].message.content.strip()
return location if location != "NOT_FOUND" else "New York"
The Flight Agent extracts more complex information—origin, destination, and dates:
.. code-block:: python
.. literalinclude:: ../resources/includes/agents/flights.py
:language: python
:linenos:
:lines: 66-120
:emphasize-lines: 1-14, 28-45
:caption: Flight Agent - Flight Information Extraction
FLIGHT_EXTRACTION_PROMPT = """You are a flight information extraction assistant. Extract flight-related information from user messages and convert it to structured data.
CRITICAL RULES:
1. Extract origin city/airport and destination city/airport from the message AND conversation context
2. Extract any mentioned dates or time references
3. PAY ATTENTION TO CONVERSATION CONTEXT - THIS IS CRITICAL:
- If previous messages mention cities/countries, use that context to resolve pronouns and incomplete queries
- Example: Previous: "What's the weather in Istanbul?" → Current: "Do they fly out from Seattle?"
→ This likely means: origin=Istanbul, destination=Seattle
4. Return your response as a JSON object:
{
"origin": "London" or null,
"destination": "Seattle" or null,
"date": "2025-12-20" or null,
"origin_airport_code": "LHR" or null,
"destination_airport_code": "SEA" or null
}
"""
async def extract_flight_info_from_messages(messages):
"""Extract flight information from user messages using LLM, considering conversation context."""
# Build conversation context from all messages
conversation_context = []
for msg in messages:
content = msg.content.strip()
# Skip error messages and HTML tags
if not any(pattern in content.lower() for pattern in ["<", ">", "error:"]):
conversation_context.append({"role": msg.role, "content": content})
# Use last 10 messages for context
context_messages = conversation_context[-10:] if len(conversation_context) > 10 else conversation_context
llm_messages = [
{"role": "system", "content": FLIGHT_EXTRACTION_PROMPT}
] + context_messages
response = await plano.chat.completions.create(
model=FLIGHT_EXTRACTION_MODEL,
messages=llm_messages,
temperature=0.1,
max_tokens=300,
)
extracted_text = response.choices[0].message.content.strip()
# Parse JSON from response
flight_info = json.loads(extracted_text)
return flight_info
**Key Points:**
* Use smaller, faster models (like ``gpt-4o-mini``) for extraction tasks
@ -240,78 +138,22 @@ Calling External APIs
After extracting information, agents call external APIs to fetch real-time data:
.. code-block:: python
.. literalinclude:: ../resources/includes/agents/weather.py
:language: python
:linenos:
:lines: 136-197
:emphasize-lines: 3-5, 18-30, 35-45
:caption: Weather Agent - External API Call
async def get_weather_data(location: str, days: int = 1):
"""Get live weather data for a location using Open-Meteo API."""
# Step 1: Geocode location to get coordinates
geocode_result = await geocode_city(location)
if not geocode_result:
return {"error": "Could not find location"}
latitude = geocode_result["latitude"]
longitude = geocode_result["longitude"]
# Step 2: Fetch weather data
url = (
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={latitude}&"
f"longitude={longitude}&"
f"daily=temperature_2m_max,temperature_2m_min,weather_code&"
f"forecast_days={days}&"
f"timezone=auto"
)
response = await http_client.get(url)
weather_data = response.json()
# Step 3: Transform API response into structured format
forecast = []
for i in range(days):
forecast.append({
"date": daily_data["time"][i],
"temperature_max_c": daily_data["temperature_2m_max"][i],
"temperature_min_c": daily_data["temperature_2m_min"][i],
"condition": weather_code_to_condition(daily_data["weather_code"][i]),
})
return {"location": location, "forecast": forecast}
The Flight Agent calls FlightAware's AeroAPI:
.. code-block:: python
.. literalinclude:: ../resources/includes/agents/flights.py
:language: python
:linenos:
:lines: 154-260
:emphasize-lines: 1-4, 18-28, 45-55
:caption: Flight Agent - External API Call
async def get_flights_between_airports(
origin_code: str, dest_code: str, start_date: str = None
):
"""Get flights between two airports using FlightAware AeroAPI."""
url = f"{AEROAPI_BASE_URL}/airports/{origin_code}/flights/to/{dest_code}"
headers = {"x-apikey": AEROAPI_KEY}
params = {"connection": "nonstop", "max_pages": 1}
if start_date:
params["start"] = start_date
response = await http_client.get(url, headers=headers, params=params)
data = response.json()
# Transform API response
flights = []
for flight_group in data.get("flights", []):
segment = flight_group.get("segments", [])[0]
flights.append({
"ident": segment.get("ident"),
"operator": segment.get("operator"),
"origin": segment.get("origin", {}).get("city"),
"destination": segment.get("destination", {}).get("city"),
"scheduled_out": segment.get("scheduled_out"),
"status": segment.get("status"),
})
return {"flights": flights}
**Key Points:**
* Use async HTTP clients (like ``httpx.AsyncClient``) for non-blocking API calls
@ -324,78 +166,13 @@ Preparing Context and Generating Responses
Agents combine extracted information, API data, and conversation history to generate responses:
.. code-block:: python
.. literalinclude:: ../resources/includes/agents/weather.py
:language: python
:linenos:
:lines: 290-370
:emphasize-lines: 28-36, 47-57, 62-72
:caption: Weather Agent - Context Preparation and Response Generation
SYSTEM_PROMPT = """You are a professional weather information assistant. Your role is to provide accurate, clear, and helpful weather information based on the structured weather data provided to you.
CRITICAL INSTRUCTIONS:
1. You will receive weather data as JSON in a system message
2. Present temperatures in both Celsius and Fahrenheit when available
3. Use natural, conversational language
4. Never invent or guess weather data - only use what's provided
"""
async def prepare_weather_messages(request_body: ChatCompletionRequest):
"""Prepare messages with weather data."""
# Step 1: Extract location from conversation
location = await extract_location_from_messages(request_body.messages)
# Step 2: Determine if user wants forecast (multi-day)
last_user_msg = ""
for msg in reversed(request_body.messages):
if msg.role == "user":
last_user_msg = msg.content.lower()
break
days = 5 if "forecast" in last_user_msg or "week" in last_user_msg else 1
# Step 3: Fetch weather data
weather_data = await get_weather_data(location, days)
# Step 4: Build context message with structured data
weather_context = f"""
Current weather data for {weather_data['location']}:
{json.dumps(weather_data, indent=2)}
Use this data to answer the user's weather query.
"""
# Step 5: Combine system prompt, context, and conversation history
response_messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "system", "content": weather_context},
]
# Add conversation history
for msg in request_body.messages:
response_messages.append({"role": msg.role, "content": msg.content})
return response_messages
async def stream_chat_completions(request_body: ChatCompletionRequest):
"""Generate streaming chat completions."""
# Prepare messages with weather data
response_messages = await prepare_weather_messages(request_body)
# Call Plano's LLM gateway
response_stream = await plano.chat.completions.create(
model=WEATHER_MODEL,
messages=response_messages,
temperature=request_body.temperature or 0.7,
max_tokens=request_body.max_tokens or 1000,
stream=True,
)
# Stream response chunks
async for chunk in response_stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
**Key Points:**
* Use system messages to provide structured data to the LLM
@ -408,38 +185,12 @@ Handling Conversation Context
One of the most powerful features of multi-agent systems is handling follow-up questions that reference previous context. The Flight Agent demonstrates this:
.. code-block:: python
:caption: Flight Agent - Context-Aware Follow-up Questions
# User: "What's the weather in Istanbul?"
# Plano routes to weather_agent → Response: "Istanbul is sunny, 25°C"
# User: "Do they fly out from Seattle?"
# Plano routes to flight_agent → Agent needs to infer:
# - "they" refers to Istanbul (from previous message)
# - User is asking: "Do flights go from Istanbul to Seattle?"
async def extract_flight_info_from_messages(messages):
"""Extract flight information using conversation context."""
# Build full conversation context
conversation_context = []
for msg in messages:
conversation_context.append({"role": msg.role, "content": msg.content})
# Use context-aware extraction prompt
llm_messages = [
{"role": "system", "content": FLIGHT_EXTRACTION_PROMPT}
] + conversation_context[-10:] # Last 10 messages for context
response = await plano.chat.completions.create(
model=FLIGHT_EXTRACTION_MODEL,
messages=llm_messages,
temperature=0.1,
)
# LLM extracts: {"origin": "Istanbul", "destination": "Seattle"}
flight_info = json.loads(response.choices[0].message.content)
return flight_info
.. literalinclude:: ../resources/includes/agents/flights.py
:language: python
:linenos:
:lines: 66-120
:emphasize-lines: 10-20, 32-45
:caption: Flight Agent - Context-Aware Follow-up Questions with Extraction
**Key Points:**
@ -448,37 +199,6 @@ One of the most powerful features of multi-agent systems is handling follow-up q
* Limit context window to recent messages (e.g., last 10) for efficiency
* Handle cases where context is insufficient gracefully
How Requests Flow
-----------------
Here's how a complete request flows through the Travel Booking system:
1. **User sends a prompt**: "What's the weather in London?"
2. **Plano analyzes intent**: Plano-Orchestrator analyzes the prompt and determines it should route to ``weather_agent`` based on the agent's description.
3. **Plano forwards request**: Plano sends the request to the correct agent with ``/v1/chat/completions``, along with the full conversation history.
4. **Weather Agent processes**:
a. Extracts location "London" from the message
b. Calls Open-Meteo API to get weather data
c. Prepares context with weather data
d. Calls Plano's LLM gateway to generate response
e. Streams response back to Plano
5. **Plano forwards response**: Plano streams the agent's response back to the user.
6. **Follow-up question**: User asks "Do they fly out from Seattle?"
7. **Plano routes again**: Plano-Orchestrator analyzes the new prompt and routes to ``flight_agent``.
8. **Flight Agent processes**:
a. Extracts flight info using conversation context (infers "they" = London)
b. Resolves airport codes (London → LHR, Seattle → SEA)
c. Calls FlightAware API to get flights
d. Generates response with flight information
9. **Seamless handoff**: The user experiences a smooth transition between agents without needing to repeat context.
Best Practices
--------------
@ -517,12 +237,12 @@ Always route LLM calls through Plano's :ref:`Model Proxy <llm_providers>` for co
.. code-block:: python
plano = AsyncOpenAI(
base_url=PLANO_ENDPOINT, # Plano's LLM gateway
openai_client_via_plano = AsyncOpenAI(
base_url=LLM_GATEWAY_ENDPOINT, # Plano's LLM gateway
api_key="EMPTY",
)
response = await plano.chat.completions.create(
response = await openai_client_via_plano.chat.completions.create(
model="openai/gpt-4o",
messages=messages,
stream=True,
@ -534,13 +254,16 @@ Provide fallback values and clear error messages:
.. code-block:: python
async def extract_location_from_messages(messages):
async def get_weather_data(request: Request, messages: list, days: int = 1):
try:
# ... extraction logic ...
return location
# ... extraction and API logic ...
location = response.choices[0].message.content.strip().strip("\"'`.,!?")
if not location or location.upper() == "NOT_FOUND":
location = "New York" # Fallback to default
return weather_data
except Exception as e:
logger.error(f"Error extracting location: {e}")
return "New York" # Fallback to default
logger.error(f"Error getting weather data: {e}")
return {"location": "New York", "weather": {"error": "Could not retrieve weather data"}}
**Use Appropriate Models for Tasks**
@ -560,17 +283,24 @@ Stream responses for better user experience:
.. code-block:: python
async def stream_chat_completions(request_body):
response_stream = await plano.chat.completions.create(
async def invoke_weather_agent(request: Request, request_body: dict, traceparent_header: str = None):
# ... prepare messages with weather data ...
stream = await openai_client_via_plano.chat.completions.create(
model=WEATHER_MODEL,
messages=messages,
messages=response_messages,
temperature=request_body.get("temperature", 0.7),
max_tokens=request_body.get("max_tokens", 1000),
stream=True,
extra_headers=extra_headers,
)
async for chunk in response_stream:
if chunk.choices and chunk.choices[0].delta.content:
async for chunk in stream:
if chunk.choices:
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
Common Use Cases
----------------
@ -578,7 +308,7 @@ Multi-agent orchestration is particularly powerful for:
**Travel and Booking Systems**
Route queries to specialized agents for weather, flights, hotels, and currency:
Route queries to specialized agents for weather and flights:
.. code-block:: yaml
@ -587,10 +317,6 @@ Route queries to specialized agents for weather, flights, hotels, and currency:
description: Get real-time weather conditions and forecasts
- id: flight_agent
description: Search for flights and provide flight status
- id: hotel_agent
description: Find hotels and check availability
- id: currency_agent
description: Provide currency exchange rates
**Customer Support**

View file

@ -0,0 +1,57 @@
version: v0.3.0
agents:
- id: weather_agent
url: http://host.docker.internal:10510
- id: flight_agent
url: http://host.docker.internal:10520
model_providers:
- model: openai/gpt-4o
access_key: $OPENAI_API_KEY
default: true
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY # smaller, faster, cheaper model for extracting entities like location
listeners:
- type: agent
name: travel_booking_service
port: 8001
router: plano_orchestrator_v1
agents:
- id: weather_agent
description: |
WeatherAgent is a specialized AI assistant for real-time weather information and forecasts. It provides accurate weather data for any city worldwide using the Open-Meteo API, helping travelers plan their trips with up-to-date weather conditions.
Capabilities:
* Get real-time weather conditions and multi-day forecasts for any city worldwide using Open-Meteo API (free, no API key needed)
* Provides current temperature
* Provides multi-day forecasts
* Provides weather conditions
* Provides sunrise/sunset times
* Provides detailed weather information
* Understands conversation context to resolve location references from previous messages
* Handles weather-related questions including "What's the weather in [city]?", "What's the forecast for [city]?", "How's the weather in [city]?"
* When queries include both weather and other travel questions (e.g., flights, currency), this agent answers ONLY the weather part
- id: flight_agent
description: |
FlightAgent is an AI-powered tool specialized in providing live flight information between airports. It leverages the FlightAware AeroAPI to deliver real-time flight status, gate information, and delay updates.
Capabilities:
* Get live flight information between airports using FlightAware AeroAPI
* Shows real-time flight status
* Shows scheduled/estimated/actual departure and arrival times
* Shows gate and terminal information
* Shows delays
* Shows aircraft type
* Shows flight status
* Automatically resolves city names to airport codes (IATA/ICAO)
* Understands conversation context to infer origin/destination from follow-up questions
* Handles flight-related questions including "What flights go from [city] to [city]?", "Do flights go to [city]?", "Are there direct flights from [city]?"
* When queries include both flight and other travel questions (e.g., weather, currency), this agent answers ONLY the flight part
tracing:
random_sampling: 100

View file

@ -0,0 +1,475 @@
import json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import os
import logging
import time
import uuid
import uvicorn
from datetime import datetime, timedelta
import httpx
from typing import Optional
from opentelemetry.propagate import extract, inject
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - [FLIGHT_AGENT] - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Configuration
LLM_GATEWAY_ENDPOINT = os.getenv(
"LLM_GATEWAY_ENDPOINT", "http://host.docker.internal:12000/v1"
)
FLIGHT_MODEL = "openai/gpt-4o"
EXTRACTION_MODEL = "openai/gpt-4o-mini"
# FlightAware AeroAPI configuration
AEROAPI_BASE_URL = "https://aeroapi.flightaware.com/aeroapi"
AEROAPI_KEY = os.getenv("AEROAPI_KEY", "ESVFX7TJLxB7OTuayUv0zTQBryA3tOPr")
# HTTP client for API calls
http_client = httpx.AsyncClient(timeout=30.0)
# Initialize OpenAI client
openai_client_via_plano = AsyncOpenAI(
base_url=LLM_GATEWAY_ENDPOINT,
api_key="EMPTY",
)
# System prompt for flight agent
SYSTEM_PROMPT = """You are a travel planning assistant specializing in flight information in a multi-agent system. You will receive flight data in JSON format with these fields:
- "airline": Full airline name (e.g., "Delta Air Lines")
- "flight_number": Flight identifier (e.g., "DL123")
- "departure_time": ISO 8601 timestamp for scheduled departure (e.g., "2025-12-24T23:00:00Z")
- "arrival_time": ISO 8601 timestamp for scheduled arrival (e.g., "2025-12-25T04:40:00Z")
- "origin": Origin airport IATA code (e.g., "ATL")
- "destination": Destination airport IATA code (e.g., "SEA")
- "aircraft_type": Aircraft model code (e.g., "A21N", "B739")
- "status": Flight status (e.g., "Scheduled", "Delayed")
- "terminal_origin": Departure terminal (may be null)
- "gate_origin": Departure gate (may be null)
Your task:
1. Read the JSON flight data carefully
2. Present each flight clearly with: airline, flight number, departure/arrival times (convert from ISO format to readable time), airports, and aircraft type
3. Organize flights chronologically by departure time
4. Convert ISO timestamps to readable format (e.g., "11:00 PM" or "23:00")
5. Include terminal/gate info when available
6. Use natural, conversational language
Important: If the conversation includes information from other agents (like weather details), acknowledge and build upon that context naturally. Your primary focus is flights, but maintain awareness of the full conversation.
Remember: All the data you need is in the JSON. Use it directly."""
async def extract_flight_route(messages: list, request: Request) -> dict:
"""Extract origin, destination, and date from conversation using LLM."""
extraction_prompt = """Extract flight origin, destination cities, and travel date from the conversation.
Rules:
1. Look for patterns: "flight from X to Y", "flights to Y", "fly from X"
2. Extract dates like "tomorrow", "next week", "December 25", "12/25", "on Monday"
3. Use conversation context to fill in missing details
4. Return JSON: {"origin": "City" or null, "destination": "City" or null, "date": "YYYY-MM-DD" or null}
Examples:
- "Flight from Seattle to Atlanta tomorrow" -> {"origin": "Seattle", "destination": "Atlanta", "date": "2025-12-24"}
- "What flights go to New York?" -> {"origin": null, "destination": "New York", "date": null}
- "Flights to Miami on Christmas" -> {"origin": null, "destination": "Miami", "date": "2025-12-25"}
- "Show me flights from LA to NYC next Monday" -> {"origin": "LA", "destination": "NYC", "date": "2025-12-30"}
Today is December 23, 2025. Extract flight route and date:"""
try:
ctx = extract(request.headers)
extra_headers = {}
inject(extra_headers, context=ctx)
response = await openai_client_via_plano.chat.completions.create(
model=EXTRACTION_MODEL,
messages=[
{"role": "system", "content": extraction_prompt},
*[
{"role": msg.get("role"), "content": msg.get("content")}
for msg in messages[-5:]
],
],
temperature=0.1,
max_tokens=100,
extra_headers=extra_headers if extra_headers else None,
)
result = response.choices[0].message.content.strip()
if "```json" in result:
result = result.split("```json")[1].split("```")[0].strip()
elif "```" in result:
result = result.split("```")[1].split("```")[0].strip()
route = json.loads(result)
return {
"origin": route.get("origin"),
"destination": route.get("destination"),
"date": route.get("date"),
}
except Exception as e:
logger.error(f"Error extracting flight route: {e}")
return {"origin": None, "destination": None, "date": None}
async def resolve_airport_code(city_name: str, request: Request) -> Optional[str]:
"""Convert city name to airport code using LLM."""
if not city_name:
return None
try:
ctx = extract(request.headers)
extra_headers = {}
inject(extra_headers, context=ctx)
response = await openai_client_via_plano.chat.completions.create(
model=EXTRACTION_MODEL,
messages=[
{
"role": "system",
"content": "Convert city names to primary airport IATA codes. Return only the 3-letter code. Examples: Seattle→SEA, Atlanta→ATL, New York→JFK, London→LHR",
},
{"role": "user", "content": city_name},
],
temperature=0.1,
max_tokens=10,
extra_headers=extra_headers if extra_headers else None,
)
code = response.choices[0].message.content.strip().upper()
code = code.strip("\"'`.,!? \n\t")
return code if len(code) == 3 else None
except Exception as e:
logger.error(f"Error resolving airport code for {city_name}: {e}")
return None
async def get_flights(
origin_code: str, dest_code: str, travel_date: Optional[str] = None
) -> Optional[dict]:
"""Get flights between two airports using FlightAware API.
Args:
origin_code: Origin airport IATA code
dest_code: Destination airport IATA code
travel_date: Travel date in YYYY-MM-DD format, defaults to today
Note: FlightAware API limits searches to 2 days in the future.
"""
try:
# Use provided date or default to today
if travel_date:
search_date = travel_date
else:
search_date = datetime.now().strftime("%Y-%m-%d")
# Validate date is not too far in the future (FlightAware limit: 2 days)
search_date_obj = datetime.strptime(search_date, "%Y-%m-%d")
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
days_ahead = (search_date_obj - today).days
if days_ahead > 2:
logger.warning(
f"Requested date {search_date} is {days_ahead} days ahead, exceeds FlightAware 2-day limit"
)
return {
"origin_code": origin_code,
"destination_code": dest_code,
"flights": [],
"count": 0,
"error": f"FlightAware API only provides flight data up to 2 days in the future. The requested date ({search_date}) is {days_ahead} days ahead. Please search for today, tomorrow, or the day after.",
}
url = f"{AEROAPI_BASE_URL}/airports/{origin_code}/flights/to/{dest_code}"
headers = {"x-apikey": AEROAPI_KEY}
params = {
"start": f"{search_date}T00:00:00Z",
"end": f"{search_date}T23:59:59Z",
"connection": "nonstop",
"max_pages": 1,
}
response = await http_client.get(url, headers=headers, params=params)
if response.status_code != 200:
logger.error(
f"FlightAware API error {response.status_code}: {response.text}"
)
return None
data = response.json()
flights = []
# Log raw API response for debugging
logger.info(f"FlightAware API returned {len(data.get('flights', []))} flights")
for idx, flight_group in enumerate(
data.get("flights", [])[:5]
): # Limit to 5 flights
# FlightAware API nests data in segments array
segments = flight_group.get("segments", [])
if not segments:
continue
flight = segments[0] # Get first segment (direct flights only have one)
# Extract airport codes from nested objects
flight_origin = None
flight_dest = None
if isinstance(flight.get("origin"), dict):
flight_origin = flight["origin"].get("code_iata")
if isinstance(flight.get("destination"), dict):
flight_dest = flight["destination"].get("code_iata")
# Build flight object
flights.append(
{
"airline": flight.get("operator"),
"flight_number": flight.get("ident_iata") or flight.get("ident"),
"departure_time": flight.get("scheduled_out"),
"arrival_time": flight.get("scheduled_in"),
"origin": flight_origin,
"destination": flight_dest,
"aircraft_type": flight.get("aircraft_type"),
"status": flight.get("status"),
"terminal_origin": flight.get("terminal_origin"),
"gate_origin": flight.get("gate_origin"),
}
)
return {
"origin_code": origin_code,
"destination_code": dest_code,
"flights": flights,
"count": len(flights),
}
except Exception as e:
logger.error(f"Error fetching flights: {e}")
return None
app = FastAPI(title="Flight Information Agent", version="1.0.0")
@app.post("/v1/chat/completions")
async def handle_request(request: Request):
"""HTTP endpoint for chat completions with streaming support."""
request_body = await request.json()
messages = request_body.get("messages", [])
return StreamingResponse(
invoke_flight_agent(request, request_body),
media_type="text/plain",
headers={"content-type": "text/event-stream"},
)
async def invoke_flight_agent(request: Request, request_body: dict):
"""Generate streaming chat completions."""
messages = request_body.get("messages", [])
# Step 1: Extract origin, destination, and date
route = await extract_flight_route(messages, request)
origin = route.get("origin")
destination = route.get("destination")
travel_date = route.get("date")
# Step 2: Short circuit if missing origin or destination
if not origin or not destination:
missing = []
if not origin:
missing.append("origin city")
if not destination:
missing.append("destination city")
error_message = f"I need both origin and destination cities to search for flights. Please provide the {' and '.join(missing)}. For example: 'Flights from Seattle to Atlanta'"
error_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_body.get("model", FLIGHT_MODEL),
"choices": [
{
"index": 0,
"delta": {"content": error_message},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
return
# Step 3: Resolve airport codes
origin_code = await resolve_airport_code(origin, request)
dest_code = await resolve_airport_code(destination, request)
if not origin_code or not dest_code:
error_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_body.get("model", FLIGHT_MODEL),
"choices": [
{
"index": 0,
"delta": {
"content": f"I couldn't find airport codes for {origin if not origin_code else destination}. Please check the city name."
},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
return
# Step 4: Get live flight data
flight_data = await get_flights(origin_code, dest_code, travel_date)
# Determine date display for messages
date_display = travel_date if travel_date else "today"
if not flight_data or not flight_data.get("flights"):
# Check if there's a specific error message (e.g., date too far in future)
error_detail = flight_data.get("error") if flight_data else None
if error_detail:
no_flights_message = error_detail
else:
no_flights_message = f"No direct flights found from {origin} ({origin_code}) to {destination} ({dest_code}) for {date_display}."
error_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_body.get("model", FLIGHT_MODEL),
"choices": [
{
"index": 0,
"delta": {"content": no_flights_message},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
return
# Step 5: Prepare context for LLM - append flight data to last user message
flight_context = f"""
Flight search results from {origin} ({origin_code}) to {destination} ({dest_code}):
Flight data in JSON format:
{json.dumps(flight_data, indent=2)}
Present these {len(flight_data.get('flights', []))} flight(s) to the user in a clear, readable format."""
# Build message history with flight data appended to the last user message
response_messages = [{"role": "system", "content": SYSTEM_PROMPT}]
for i, msg in enumerate(messages):
# Append flight data to the last user message
if i == len(messages) - 1 and msg.get("role") == "user":
response_messages.append(
{"role": "user", "content": msg.get("content") + flight_context}
)
else:
response_messages.append(
{"role": msg.get("role"), "content": msg.get("content")}
)
# Log what we're sending to the LLM for debugging
logger.info(f"Sending messages to LLM: {json.dumps(response_messages, indent=2)}")
# Step 6: Stream response
try:
ctx = extract(request.headers)
extra_headers = {"x-envoy-max-retries": "3"}
inject(extra_headers, context=ctx)
stream = await openai_client_via_plano.chat.completions.create(
model=FLIGHT_MODEL,
messages=response_messages,
temperature=request_body.get("temperature", 0.7),
max_tokens=request_body.get("max_tokens", 1000),
stream=True,
extra_headers=extra_headers,
)
async for chunk in stream:
if chunk.choices:
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Error generating flight response: {e}")
error_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_body.get("model", FLIGHT_MODEL),
"choices": [
{
"index": 0,
"delta": {
"content": "I apologize, but I'm having trouble retrieving flight information right now. Please try again."
},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "agent": "flight_information"}
def start_server(host: str = "localhost", port: int = 10520):
"""Start the REST server."""
uvicorn.run(
app,
host=host,
port=port,
log_config={
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - [FLIGHT_AGENT] - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"root": {
"level": "INFO",
"handlers": ["default"],
},
},
)
if __name__ == "__main__":
start_server(host="0.0.0.0", port=10520)

View file

@ -0,0 +1,427 @@
import json
import re
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import os
import logging
import time
import uuid
import uvicorn
from datetime import datetime, timedelta
import httpx
from typing import Optional
from urllib.parse import quote
from opentelemetry.propagate import extract, inject
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - [WEATHER_AGENT] - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Configuration for plano LLM gateway
LLM_GATEWAY_ENDPOINT = os.getenv(
"LLM_GATEWAY_ENDPOINT", "http://host.docker.internal:12001/v1"
)
WEATHER_MODEL = "openai/gpt-4o"
LOCATION_MODEL = "openai/gpt-4o-mini"
# Initialize OpenAI client for plano
openai_client_via_plano = AsyncOpenAI(
base_url=LLM_GATEWAY_ENDPOINT,
api_key="EMPTY",
)
# FastAPI app for REST server
app = FastAPI(title="Weather Forecast Agent", version="1.0.0")
# HTTP client for API calls
http_client = httpx.AsyncClient(timeout=10.0)
# Utility functions
def celsius_to_fahrenheit(temp_c: Optional[float]) -> Optional[float]:
"""Convert Celsius to Fahrenheit."""
return round(temp_c * 9 / 5 + 32, 1) if temp_c is not None else None
def get_user_messages(messages: list) -> list:
"""Extract user messages from message list."""
return [msg for msg in messages if msg.get("role") == "user"]
def get_last_user_content(messages: list) -> str:
"""Get the content of the most recent user message."""
for msg in reversed(messages):
if msg.get("role") == "user":
return msg.get("content", "").lower()
return ""
async def get_weather_data(request: Request, messages: list, days: int = 1):
"""Extract location from user's conversation and fetch weather data from Open-Meteo API.
This function does two things:
1. Uses an LLM to extract the location from the user's message
2. Fetches weather data for that location from Open-Meteo
Currently returns only current day weather. Want to add multi-day forecasts?
Check out the TODO comments below - it's a great learning exercise! 🚀
"""
# Step 1: Extract location from conversation using LLM
instructions = """Extract the location for WEATHER queries. Return just the city name.
Rules:
1. For multi-part queries, extract ONLY the location mentioned with weather keywords ("weather in [location]")
2. If user says "there" or "that city", it typically refers to the DESTINATION city in travel contexts (not the origin)
3. For flight queries with weather, "there" means the destination city where they're traveling TO
4. Return plain text (e.g., "London", "New York", "Paris, France")
5. If no weather location found, return "NOT_FOUND"
Examples:
- "What's the weather in London?" -> "London"
- "Flights from Seattle to Atlanta, and show me the weather there" -> "Atlanta"
- "Can you get me flights from Seattle to Atlanta tomorrow, and also please show me the weather there" -> "Atlanta"
- "What's the weather in Seattle, and what is one flight that goes direct to Atlanta?" -> "Seattle"
- User asked about flights to Atlanta, then "what's the weather like there?" -> "Atlanta"
- "I'm going to Seattle" -> "Seattle"
- "What's happening?" -> "NOT_FOUND"
Extract location:"""
try:
user_messages = [
msg.get("content") for msg in messages if msg.get("role") == "user"
]
if not user_messages:
location = "New York"
else:
ctx = extract(request.headers)
extra_headers = {}
inject(extra_headers, context=ctx)
# For location extraction, pass full conversation for context (e.g., "there" = previous destination)
response = await openai_client_via_plano.chat.completions.create(
model=LOCATION_MODEL,
messages=[
{"role": "system", "content": instructions},
*[
{"role": msg.get("role"), "content": msg.get("content")}
for msg in messages
],
],
temperature=0.1,
max_tokens=50,
extra_headers=extra_headers if extra_headers else None,
)
location = response.choices[0].message.content.strip().strip("\"'`.,!?")
logger.info(f"Location extraction result: '{location}'")
if not location or location.upper() == "NOT_FOUND":
location = "New York"
logger.info(f"Location not found, defaulting to: {location}")
except Exception as e:
logger.error(f"Error extracting location: {e}")
location = "New York"
logger.info(f"Fetching weather for location: '{location}' (days: {days})")
# Step 2: Fetch weather data for the extracted location
try:
# Geocode city to get coordinates
geocode_url = f"https://geocoding-api.open-meteo.com/v1/search?name={quote(location)}&count=1&language=en&format=json"
geocode_response = await http_client.get(geocode_url)
if geocode_response.status_code != 200 or not geocode_response.json().get(
"results"
):
logger.warning(f"Could not geocode {location}, using New York")
location = "New York"
geocode_url = f"https://geocoding-api.open-meteo.com/v1/search?name={quote(location)}&count=1&language=en&format=json"
geocode_response = await http_client.get(geocode_url)
geocode_data = geocode_response.json()
if not geocode_data.get("results"):
return {
"location": location,
"weather": {
"date": datetime.now().strftime("%Y-%m-%d"),
"day_name": datetime.now().strftime("%A"),
"temperature_c": None,
"temperature_f": None,
"weather_code": None,
"error": "Could not retrieve weather data",
},
}
result = geocode_data["results"][0]
location_name = result.get("name", location)
latitude = result["latitude"]
longitude = result["longitude"]
logger.info(
f"Geocoded '{location}' to {location_name} ({latitude}, {longitude})"
)
# Get weather forecast
weather_url = (
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={latitude}&longitude={longitude}&"
f"current=temperature_2m&"
f"daily=sunrise,sunset,temperature_2m_max,temperature_2m_min,weather_code&"
f"forecast_days={days}&timezone=auto"
)
weather_response = await http_client.get(weather_url)
if weather_response.status_code != 200:
return {
"location": location_name,
"weather": {
"date": datetime.now().strftime("%Y-%m-%d"),
"day_name": datetime.now().strftime("%A"),
"temperature_c": None,
"temperature_f": None,
"weather_code": None,
"error": "Could not retrieve weather data",
},
}
weather_data = weather_response.json()
current_temp = weather_data.get("current", {}).get("temperature_2m")
daily = weather_data.get("daily", {})
# Build forecast for requested number of days
forecast = []
for i in range(days):
date_str = daily["time"][i]
date_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
temp_max = (
daily.get("temperature_2m_max", [])[i]
if daily.get("temperature_2m_max")
else None
)
temp_min = (
daily.get("temperature_2m_min", [])[i]
if daily.get("temperature_2m_min")
else None
)
weather_code = (
daily.get("weather_code", [0])[i] if daily.get("weather_code") else 0
)
sunrise = daily.get("sunrise", [])[i] if daily.get("sunrise") else None
sunset = daily.get("sunset", [])[i] if daily.get("sunset") else None
# Use current temp for today, otherwise use max temp
temp_c = (
temp_max
if temp_max is not None
else (current_temp if i == 0 and current_temp else temp_min)
)
forecast.append(
{
"date": date_str.split("T")[0],
"day_name": date_obj.strftime("%A"),
"temperature_c": round(temp_c, 1) if temp_c is not None else None,
"temperature_f": celsius_to_fahrenheit(temp_c),
"temperature_max_c": round(temp_max, 1)
if temp_max is not None
else None,
"temperature_min_c": round(temp_min, 1)
if temp_min is not None
else None,
"weather_code": weather_code,
"sunrise": sunrise.split("T")[1] if sunrise else None,
"sunset": sunset.split("T")[1] if sunset else None,
}
)
return {"location": location_name, "forecast": forecast}
except Exception as e:
logger.error(f"Error getting weather data: {e}")
return {
"location": location,
"weather": {
"date": datetime.now().strftime("%Y-%m-%d"),
"day_name": datetime.now().strftime("%A"),
"temperature_c": None,
"temperature_f": None,
"weather_code": None,
"error": "Could not retrieve weather data",
},
}
@app.post("/v1/chat/completions")
async def handle_request(request: Request):
"""HTTP endpoint for chat completions with streaming support."""
request_body = await request.json()
messages = request_body.get("messages", [])
logger.info(
"messages detail json dumps: %s",
json.dumps(messages, indent=2),
)
traceparent_header = request.headers.get("traceparent")
return StreamingResponse(
invoke_weather_agent(request, request_body, traceparent_header),
media_type="text/plain",
headers={
"content-type": "text/event-stream",
},
)
async def invoke_weather_agent(
request: Request, request_body: dict, traceparent_header: str = None
):
"""Generate streaming chat completions."""
messages = request_body.get("messages", [])
# Detect if user wants multi-day forecast
last_user_msg = get_last_user_content(messages)
days = 1
if "forecast" in last_user_msg or "week" in last_user_msg:
days = 7
elif "tomorrow" in last_user_msg:
days = 2
# Extract specific number of days if mentioned (e.g., "5 day forecast")
import re
day_match = re.search(r"(\d{1,2})\s+day", last_user_msg)
if day_match:
requested_days = int(day_match.group(1))
days = min(requested_days, 16) # API supports max 16 days
# Get live weather data (location extraction happens inside this function)
weather_data = await get_weather_data(request, messages, days)
# Create weather context to append to user message
forecast_type = "forecast" if days > 1 else "current weather"
weather_context = f"""
Weather data for {weather_data['location']} ({forecast_type}):
{json.dumps(weather_data, indent=2)}"""
# System prompt for weather agent
instructions = """You are a weather assistant in a multi-agent system. You will receive weather data in JSON format with these fields:
- "location": City name
- "forecast": Array of weather objects, each with date, day_name, temperature_c, temperature_f, temperature_max_c, temperature_min_c, weather_code, sunrise, sunset
- weather_code: WMO code (0=clear, 1-3=partly cloudy, 45-48=fog, 51-67=rain, 71-86=snow, 95-99=thunderstorm)
Your task:
1. Present the weather/forecast clearly for the location
2. For single day: show current conditions
3. For multi-day: show each day with date and conditions
4. Include temperature in both Celsius and Fahrenheit
5. Describe conditions naturally based on weather_code
6. Use conversational language
Important: If the conversation includes information from other agents (like flight details), acknowledge and build upon that context naturally. Your primary focus is weather, but maintain awareness of the full conversation.
Remember: Only use the provided data. If fields are null, mention data is unavailable."""
# Build message history with weather data appended to the last user message
response_messages = [{"role": "system", "content": instructions}]
for i, msg in enumerate(messages):
# Append weather data to the last user message
if i == len(messages) - 1 and msg.get("role") == "user":
response_messages.append(
{"role": "user", "content": msg.get("content") + weather_context}
)
else:
response_messages.append(
{"role": msg.get("role"), "content": msg.get("content")}
)
try:
ctx = extract(request.headers)
extra_headers = {"x-envoy-max-retries": "3"}
inject(extra_headers, context=ctx)
stream = await openai_client_via_plano.chat.completions.create(
model=WEATHER_MODEL,
messages=response_messages,
temperature=request_body.get("temperature", 0.7),
max_tokens=request_body.get("max_tokens", 1000),
stream=True,
extra_headers=extra_headers,
)
async for chunk in stream:
if chunk.choices:
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Error generating weather response: {e}")
error_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_body.get("model", WEATHER_MODEL),
"choices": [
{
"index": 0,
"delta": {
"content": "I apologize, but I'm having trouble retrieving weather information right now. Please try again."
},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "agent": "weather_forecast"}
def start_server(host: str = "localhost", port: int = 10510):
"""Start the REST server."""
uvicorn.run(
app,
host=host,
port=port,
log_config={
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - [WEATHER_AGENT] - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"root": {
"level": "INFO",
"handlers": ["default"],
},
},
)
if __name__ == "__main__":
start_server(host="0.0.0.0", port=10510)