diff --git a/apps/agents/src/app/main.py b/apps/agents/src/app/main.py index 2f93dfbf..a694fe90 100644 --- a/apps/agents/src/app/main.py +++ b/apps/agents/src/app/main.py @@ -1,7 +1,11 @@ -from flask import Flask, request, jsonify +from flask import Flask, request, jsonify, Response from datetime import datetime from functools import wraps import os +import redis +import uuid +import json +import time from src.graph.core import run_turn from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL @@ -9,6 +13,7 @@ from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL from src.utils.common import common_logger, read_json_from_file logger = common_logger +redis_client = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379')) app = Flask(__name__) @app.route("/health", methods=["GET"]) @@ -83,6 +88,56 @@ def chat(): logger.error(f"Error: {e}") return jsonify({"error": str(e)}), 500 +@app.route("/chat_stream_init", methods=["POST"]) +@require_api_key +def chat_stream_init(): + # create a uuid for the stream + stream_id = str(uuid.uuid4()) + + # store the the request data in redis with 10 minute TTL + # using the key name `stream_request_` + # set ttl to 10 minutes + redis_client.setex(f"stream_request_{stream_id}", 600, json.dumps(request.get_json())) + + return jsonify({"stream_id": stream_id}) + +@app.route("/chat_stream/", methods=["GET"]) +@require_api_key +def chat_stream(stream_id): + # get the request data from redis + request_data = redis_client.get(f"stream_request_{stream_id}") + if not request_data: + return jsonify({"error": "Stream not found"}), 404 + + # invoke run_streamed() from agents-sdk + + def generate(): + # example of HTTP SSE event stream: + # https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events + # -------------------------------- + # id: + # event: + # data: {... event data ...} + # + # event: + # data: {... event data ...} + try: + yield "event: message\n" + yield "data: {\"role\": \"assistant\", \"content\": \"This is the first message!\"}\n\n" # double \n indicates end of message + + time.sleep(2) + + yield "event: message\n" + yield "data: {\"role\": \"assistant\", \"content\": \"This is the second message!\"}\n\n" + + yield "event: done\n" + yield "data: {... state data ...}\n\n" + except Exception as e: + yield "event: error\n" + yield "data: {... error data ...}\n\n" + + return Response(generate(), mimetype='text/event-stream') + if __name__ == "__main__": print("Starting Flask server...") from waitress import serve diff --git a/apps/rowboat/app/actions/actions.ts b/apps/rowboat/app/actions/actions.ts index b3385cd0..ed8ced54 100644 --- a/apps/rowboat/app/actions/actions.ts +++ b/apps/rowboat/app/actions/actions.ts @@ -66,17 +66,14 @@ export async function scrapeWebpage(url: string): Promise, -): Promise<{ +export async function getAssistantResponse(request: z.infer): Promise<{ messages: z.infer[], state: unknown, rawRequest: unknown, rawResponse: unknown, }> { - await projectAuthCheck(projectId); - if (!await check_query_limit(projectId)) { + await projectAuthCheck(request.projectId); + if (!await check_query_limit(request.projectId)) { throw new QueryLimitError(); } diff --git a/apps/rowboat/app/api/v1/[projectId]/chat/route.ts b/apps/rowboat/app/api/v1/[projectId]/chat/route.ts index 4e542f2a..9671ab1c 100644 --- a/apps/rowboat/app/api/v1/[projectId]/chat/route.ts +++ b/apps/rowboat/app/api/v1/[projectId]/chat/route.ts @@ -88,6 +88,7 @@ export async function POST( // get assistant response const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow); const request: z.infer = { + projectId, messages: convertFromApiToAgenticApiMessages(reqMessages), state: currentState, agents, diff --git a/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts b/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts index d40b95ff..8d136e58 100644 --- a/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts +++ b/apps/rowboat/app/api/widget/v1/chats/[chatId]/turn/route.ts @@ -95,6 +95,7 @@ export async function POST( let state: unknown = chat.agenticState ?? { last_agent_name: startAgent }; const request: z.infer = { + projectId: session.projectId, messages: convertToAgenticAPIChatMessages([systemMessage, ...messages, ...unsavedMessages]), state, agents, diff --git a/apps/rowboat/app/lib/types/agents_api_types.ts b/apps/rowboat/app/lib/types/agents_api_types.ts index 1e6b132a..42c652da 100644 --- a/apps/rowboat/app/lib/types/agents_api_types.ts +++ b/apps/rowboat/app/lib/types/agents_api_types.ts @@ -47,6 +47,7 @@ export const AgenticAPITool = WorkflowTool }) export const AgenticAPIChatRequest = z.object({ + projectId: z.string(), messages: z.array(AgenticAPIChatMessage), state: z.unknown(), agents: z.array(AgenticAPIAgent), diff --git a/apps/rowboat/app/projects/[projectId]/playground/chat.tsx b/apps/rowboat/app/projects/[projectId]/playground/chat.tsx index f7f51345..57bbd63c 100644 --- a/apps/rowboat/app/projects/[projectId]/playground/chat.tsx +++ b/apps/rowboat/app/projects/[projectId]/playground/chat.tsx @@ -96,6 +96,7 @@ export function Chat({ setFetchResponseError(null); const { agents, tools, prompts, startAgent } = convertWorkflowToAgenticAPI(workflow); const request: z.infer = { + projectId, messages: convertToAgenticAPIChatMessages([{ role: 'system', content: systemMessage || '', @@ -116,7 +117,7 @@ export function Chat({ setLastAgenticResponse(null); try { - const response = await getAssistantResponse(projectId, request); + const response = await getAssistantResponse(request); if (ignore) { return; }