mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-05-03 04:12:38 +02:00
agents: add sse scaffolding
This commit is contained in:
parent
e725fc6276
commit
b131c1768e
6 changed files with 64 additions and 8 deletions
|
|
@ -1,7 +1,11 @@
|
|||
from flask import Flask, request, jsonify
|
||||
from flask import Flask, request, jsonify, Response
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
import os
|
||||
import redis
|
||||
import uuid
|
||||
import json
|
||||
import time
|
||||
|
||||
from src.graph.core import run_turn
|
||||
from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL
|
||||
|
|
@ -9,6 +13,7 @@ from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL
|
|||
from src.utils.common import common_logger, read_json_from_file
|
||||
logger = common_logger
|
||||
|
||||
redis_client = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route("/health", methods=["GET"])
|
||||
|
|
@ -83,6 +88,56 @@ def chat():
|
|||
logger.error(f"Error: {e}")
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
@app.route("/chat_stream_init", methods=["POST"])
|
||||
@require_api_key
|
||||
def chat_stream_init():
|
||||
# create a uuid for the stream
|
||||
stream_id = str(uuid.uuid4())
|
||||
|
||||
# store the the request data in redis with 10 minute TTL
|
||||
# using the key name `stream_request_<stream_id>`
|
||||
# set ttl to 10 minutes
|
||||
redis_client.setex(f"stream_request_{stream_id}", 600, json.dumps(request.get_json()))
|
||||
|
||||
return jsonify({"stream_id": stream_id})
|
||||
|
||||
@app.route("/chat_stream/<stream_id>", methods=["GET"])
|
||||
@require_api_key
|
||||
def chat_stream(stream_id):
|
||||
# get the request data from redis
|
||||
request_data = redis_client.get(f"stream_request_{stream_id}")
|
||||
if not request_data:
|
||||
return jsonify({"error": "Stream not found"}), 404
|
||||
|
||||
# invoke run_streamed() from agents-sdk
|
||||
|
||||
def generate():
|
||||
# example of HTTP SSE event stream:
|
||||
# https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
|
||||
# --------------------------------
|
||||
# id: <optional event id>
|
||||
# event: <event name>
|
||||
# data: {... event data ...}
|
||||
#
|
||||
# event: <event name>
|
||||
# data: {... event data ...}
|
||||
try:
|
||||
yield "event: message\n"
|
||||
yield "data: {\"role\": \"assistant\", \"content\": \"This is the first message!\"}\n\n" # double \n indicates end of message
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
yield "event: message\n"
|
||||
yield "data: {\"role\": \"assistant\", \"content\": \"This is the second message!\"}\n\n"
|
||||
|
||||
yield "event: done\n"
|
||||
yield "data: {... state data ...}\n\n"
|
||||
except Exception as e:
|
||||
yield "event: error\n"
|
||||
yield "data: {... error data ...}\n\n"
|
||||
|
||||
return Response(generate(), mimetype='text/event-stream')
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting Flask server...")
|
||||
from waitress import serve
|
||||
|
|
|
|||
|
|
@ -66,17 +66,14 @@ export async function scrapeWebpage(url: string): Promise<z.infer<typeof Webpage
|
|||
};
|
||||
}
|
||||
|
||||
export async function getAssistantResponse(
|
||||
projectId: string,
|
||||
request: z.infer<typeof AgenticAPIChatRequest>,
|
||||
): Promise<{
|
||||
export async function getAssistantResponse(request: z.infer<typeof AgenticAPIChatRequest>): Promise<{
|
||||
messages: z.infer<typeof apiV1.ChatMessage>[],
|
||||
state: unknown,
|
||||
rawRequest: unknown,
|
||||
rawResponse: unknown,
|
||||
}> {
|
||||
await projectAuthCheck(projectId);
|
||||
if (!await check_query_limit(projectId)) {
|
||||
await projectAuthCheck(request.projectId);
|
||||
if (!await check_query_limit(request.projectId)) {
|
||||
throw new QueryLimitError();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -88,6 +88,7 @@ export async function POST(
|
|||
// get assistant response
|
||||
const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow);
|
||||
const request: z.infer<typeof AgenticAPIChatRequest> = {
|
||||
projectId,
|
||||
messages: convertFromApiToAgenticApiMessages(reqMessages),
|
||||
state: currentState,
|
||||
agents,
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ export async function POST(
|
|||
let state: unknown = chat.agenticState ?? { last_agent_name: startAgent };
|
||||
|
||||
const request: z.infer<typeof AgenticAPIChatRequest> = {
|
||||
projectId: session.projectId,
|
||||
messages: convertToAgenticAPIChatMessages([systemMessage, ...messages, ...unsavedMessages]),
|
||||
state,
|
||||
agents,
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ export const AgenticAPITool = WorkflowTool
|
|||
})
|
||||
|
||||
export const AgenticAPIChatRequest = z.object({
|
||||
projectId: z.string(),
|
||||
messages: z.array(AgenticAPIChatMessage),
|
||||
state: z.unknown(),
|
||||
agents: z.array(AgenticAPIAgent),
|
||||
|
|
|
|||
|
|
@ -96,6 +96,7 @@ export function Chat({
|
|||
setFetchResponseError(null);
|
||||
const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow);
|
||||
const request: z.infer<typeof AgenticAPIChatRequest> = {
|
||||
projectId,
|
||||
messages: convertToAgenticAPIChatMessages([{
|
||||
role: 'system',
|
||||
content: systemMessage || '',
|
||||
|
|
@ -116,7 +117,7 @@ export function Chat({
|
|||
setLastAgenticResponse(null);
|
||||
|
||||
try {
|
||||
const response = await getAssistantResponse(projectId, request);
|
||||
const response = await getAssistantResponse(request);
|
||||
if (ignore) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue