plano/demos/weather_forecast/main.py
Aayush 885acc899f
322 add support for pydantic logfire for llm agent tracing (#329)
* set up otel-collector and implement sending to logfire

* moved rest of the files for the demo into the folder

* update docker-compose.yaml and run_demo.sh to properly check for LOGFIRE_API_KEY

* refactor weather_forecast demo to only be one demo

* add a default docker-compose for e2e tests

* update based on requested changes

* fix replace comma with colon in readme

* remove weather_forecast_service folder, and make logfire demo fail instantly if no key is set

* remove the unused weather forecast service folder

* Changed stop_demo to only stop one file at a time

* update readme with new demo stopping setup

* Revert changes to end behavior

* fix silly formatting mistake
2024-12-06 13:44:22 -08:00

97 lines
2.7 KiB
Python

import json
import os
import random
from fastapi import FastAPI, Response
from datetime import datetime, date, timedelta, timezone
import logging
from pydantic import BaseModel
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
resource = Resource.create(
{
"service.name": "weather-forecast-service",
}
)
# Initialize the tracer provider
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
logger = logging.getLogger("uvicorn.error")
logger.setLevel(logging.INFO)
app = FastAPI()
FastAPIInstrumentor().instrument_app(app)
# Configure the OTLP exporter (Jaeger, Zipkin, etc.)
otlp_exporter = OTLPSpanExporter(
endpoint=os.getenv("OLTP_HOST", "http://localhost:4317")
)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_exporter))
@app.get("/healthz")
async def healthz():
return {"status": "ok"}
class WeatherRequest(BaseModel):
city: str
days: int = 7
units: str = "Farenheit"
@app.post("/weather")
async def weather(req: WeatherRequest, res: Response):
weather_forecast = {
"city": req.city,
"temperature": [],
"units": req.units,
}
for i in range(req.days):
min_temp = random.randrange(50, 90)
max_temp = random.randrange(min_temp + 5, min_temp + 20)
if req.units.lower() == "celsius" or req.units.lower() == "c":
min_temp = (min_temp - 32) * 5.0 / 9.0
max_temp = (max_temp - 32) * 5.0 / 9.0
weather_forecast["temperature"].append(
{
"date": str(date.today() + timedelta(days=i)),
"temperature": {"min": min_temp, "max": max_temp},
"units": req.units,
"query_time": str(datetime.now(timezone.utc)),
}
)
return weather_forecast
class DefaultTargetRequest(BaseModel):
messages: list
@app.post("/default_target")
async def default_target(req: DefaultTargetRequest, res: Response):
logger.info(f"Received messages: {req.messages}")
resp = {
"choices": [
{
"message": {
"role": "assistant",
"content": "I can help you with weather forecast",
},
"finish_reason": "completed",
"index": 0,
}
],
"model": "api_server",
"usage": {"completion_tokens": 0},
}
logger.info(f"sending response: {json.dumps(resp)}")
return resp