Use POST method SSE streaming in agents

This change eliminates an extra API endpoint from agents service. It
also removes dependency on redis
This commit is contained in:
Ramnique Singh 2025-04-09 14:50:14 +05:30
parent ceaef81a5b
commit 8731dc716c
6 changed files with 125 additions and 252 deletions

View file

@ -1,11 +1,18 @@
import { redisClient } from "@/app/lib/redis";
export async function GET(request: Request, { params }: { params: { streamId: string } }) {
// Replace with your actual upstream SSE endpoint.
const upstreamUrl = `${process.env.AGENTS_API_URL}/chat_stream/${params.streamId}`;
console.log('upstreamUrl', upstreamUrl);
// get the payload from redis
const payload = await redisClient.get(`chat-stream-${params.streamId}`);
if (!payload) {
return new Response("Stream not found", { status: 404 });
}
// Fetch the upstream SSE stream.
const upstreamResponse = await fetch(upstreamUrl, {
const upstreamResponse = await fetch(`${process.env.AGENTS_API_URL}/chat_stream`, {
method: 'POST',
body: payload,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.AGENTS_API_KEY || 'test'}`,
},
cache: 'no-store',

View file

@ -3,6 +3,7 @@ import { z } from "zod";
import { generateObject } from "ai";
import { ApiMessage } from "./types/types";
import { openai } from "@ai-sdk/openai";
import { redisClient } from "./redis";
export async function getAgenticApiResponse(
request: z.infer<typeof AgenticAPIChatRequest>,
@ -38,24 +39,20 @@ export async function getAgenticApiResponse(
export async function getAgenticResponseStreamId(
request: z.infer<typeof AgenticAPIChatRequest>,
): Promise<z.infer<typeof AgenticAPIInitStreamResponse>> {
// call agentic api
console.log(`sending agentic api init stream request`, JSON.stringify(request));
const response = await fetch(process.env.AGENTS_API_URL + '/chat_stream_init', {
method: 'POST',
body: JSON.stringify(request),
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.AGENTS_API_KEY || 'test'}`,
},
// serialize the request
const payload = JSON.stringify(request);
// create a uuid for the stream
const streamId = crypto.randomUUID();
// store payload in redis
await redisClient.set(`chat-stream-${streamId}`, payload, {
EX: 60 * 10, // expire in 10 minutes
});
if (!response.ok) {
console.error('Failed to call agentic init stream api', response);
throw new Error(`Failed to call agentic init stream api: ${response.statusText}`);
}
const responseJson = await response.json();
console.log(`received agentic api init stream response`, JSON.stringify(responseJson));
const result: z.infer<typeof AgenticAPIInitStreamResponse> = responseJson;
return result;
return {
streamId,
};
}
// create a PrefixLogger class that wraps console.log with a prefix

File diff suppressed because it is too large Load diff

View file

@ -83,7 +83,6 @@ pytz = "^2024.2"
qdrant-client = "*"
Quart = "^0.20.0"
RapidFuzz = "^3.11.0"
redis = "^5.2.1"
requests = "^2.32.3"
requests-toolbelt = "^1.0.0"
setuptools = "^75.8.0"

View file

@ -1,7 +1,11 @@
aiohttp==3.9.3
aiofiles==24.1.0
aiohappyeyeballs==2.6.1
aiohttp==3.11.14
aiosignal==1.3.2
annotated-types==0.7.0
anyio==4.8.0
asgiref
asgiref==3.8.1
attrs==25.3.0
beautifulsoup4==4.12.3
blinker==1.9.0
build==1.2.2.post1
@ -11,22 +15,32 @@ cffi==1.17.1
charset-normalizer==3.4.1
cleo==2.1.0
click==8.1.8
colorama==0.4.6
crashtest==0.4.1
distlib==0.3.9
distro==1.9.0
dnspython==2.7.0
dulwich==0.22.7
dulwich==0.22.8
et_xmlfile==2.0.0
eval_type_backport==0.2.2
fastjsonschema==2.21.1
filelock==3.17.0
filelock==3.18.0
findpython==0.6.3
firecrawl==1.9.0
Flask==3.1.0
frozenlist==1.5.0
griffe==1.6.2
grpcio==1.71.0
grpcio-tools==1.71.0
gunicorn==23.0.0
h11==0.14.0
h2==4.2.0
hpack==4.1.0
httpcore==1.0.7
httpx==0.27.2
hypercorn
httpx-sse==0.4.0
Hypercorn==0.17.3
hyperframe==6.1.0
idna==3.10
installer==0.7.0
itsdangerous==2.2.0
@ -40,24 +54,31 @@ keyring==25.6.0
lxml==5.3.0
markdownify==0.13.1
MarkupSafe==3.0.2
mcp
mcp==1.5.0
more-itertools==10.6.0
motor
motor==3.7.0
msgpack==1.1.0
multidict==6.2.0
mypy-extensions==1.0.0
nest-asyncio==1.6.0
numpy==2.2.1
openai
openai-agents
openai==1.68.0
openai-agents==0.0.4
openpyxl==3.1.5
packaging==24.2
pandas==2.2.3
pkginfo==1.12.0
platformdirs==4.3.6
poetry==2.0.1
poetry-core==2.0.1
pbs-installer==2025.3.17
pkginfo==1.12.1.2
platformdirs==4.3.7
poetry==2.1.1
poetry-core==2.1.1
portalocker==2.10.1
priority==2.0.0
propcache==0.3.0
protobuf==5.29.4
pycparser==2.22
pydantic==2.10.5
pydantic-settings==2.8.1
pydantic_core==2.27.2
PyJWT==2.10.1
pymongo==4.10.1
@ -66,28 +87,35 @@ python-dateutil==2.9.0.post0
python-docx==1.1.2
python-dotenv==1.0.1
pytz==2024.2
qdrant-client
qdrant-client==1.13.3
Quart==0.20.0
RapidFuzz==3.11.0
redis==5.2.1
RapidFuzz==3.12.2
requests==2.32.3
requests-toolbelt==1.0.0
setuptools==75.8.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
sounddevice==0.5.1
soupsieve==2.6
sse-starlette==2.2.1
starlette==0.46.1
tabulate==0.9.0
tomlkit==0.13.2
tqdm==4.67.1
trove-classifiers==2025.1.15.22
trove-classifiers==2025.3.19.19
types-requests==2.32.0.20250306
typing-inspect==0.9.0
typing_extensions==4.12.2
tzdata==2024.2
urllib3==2.3.0
virtualenv==20.29.1
uvicorn==0.34.0
virtualenv==20.29.3
waitress==2.1.2
websockets==13.1
Werkzeug==3.1.3
wheel==0.44.0
xattr==1.1.4
wsproto==1.2.0
xattr==1.1.4
yarl==1.18.3
zstandard==0.23.0

View file

@ -3,8 +3,6 @@ from quart import Quart, request, jsonify, Response
from datetime import datetime
from functools import wraps
import os
import redis
import uuid
import json
from hypercorn.config import Config
from hypercorn.asyncio import serve
@ -17,7 +15,6 @@ from src.utils.common import common_logger, read_json_from_file
from pprint import pprint
logger = common_logger
redis_client = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
app = Quart(__name__)
config = read_json_from_file("./configs/default_config.json")
@ -122,31 +119,17 @@ async def chat():
logger.error(f"Error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/chat_stream_init", methods=["POST"])
@require_api_key
async def chat_stream_init():
# create a uuid for the stream
stream_id = str(uuid.uuid4())
# store the request data in redis with 10 minute TTL
data = await request.get_json()
redis_client.setex(f"stream_request_{stream_id}", 600, json.dumps(data))
return jsonify({"streamId": stream_id})
def format_sse(data: dict, event: str = None) -> str:
msg = f"data: {json.dumps(data)}\n\n"
if event is not None:
msg = f"event: {event}\n{msg}"
return msg
@app.route("/chat_stream/<stream_id>", methods=["GET"])
@app.route("/chat_stream", methods=["POST"])
@require_api_key
async def chat_stream(stream_id):
# get the request data from redis
request_data = redis_client.get(f"stream_request_{stream_id}")
if not request_data:
return jsonify({"error": "Stream not found"}), 404
async def chat_stream():
# get the request data from the request
request_data = await request.get_data()
print("Request:", request_data.decode('utf-8'))
request_data = json.loads(request_data)