@@ -292,7 +257,6 @@ export function Messages({
messages,
toolCallResults,
loadingAssistantResponse,
- loadingUserResponse,
workflow,
testProfile = null,
systemMessage,
@@ -302,7 +266,6 @@ export function Messages({
messages: z.infer
[];
toolCallResults: Record>;
loadingAssistantResponse: boolean;
- loadingUserResponse: boolean;
workflow: z.infer;
testProfile: z.infer | null;
systemMessage: string | undefined;
@@ -314,7 +277,7 @@ export function Messages({
// scroll to bottom on new messages
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" })
- }, [messages, loadingAssistantResponse, loadingUserResponse]);
+ }, [messages, loadingAssistantResponse]);
return
@@ -368,7 +331,6 @@ export function Messages({
return <>>;
})}
{loadingAssistantResponse &&
}
- {loadingUserResponse &&
}
;
diff --git a/apps/rowboat/app/projects/[projectId]/workflow/app.tsx b/apps/rowboat/app/projects/[projectId]/workflow/app.tsx
index 331e20b7..20b94273 100644
--- a/apps/rowboat/app/projects/[projectId]/workflow/app.tsx
+++ b/apps/rowboat/app/projects/[projectId]/workflow/app.tsx
@@ -9,17 +9,15 @@ import { WorkflowSelector } from "./workflow_selector";
import { Spinner } from "@heroui/react";
import { cloneWorkflow, createWorkflow, fetchPublishedWorkflowId, fetchWorkflow } from "../../../actions/workflow_actions";
import { listDataSources } from "../../../actions/datasource_actions";
+import { listMcpServers } from "@/app/actions/mcp_actions";
+import { getProjectConfig } from "@/app/actions/project_actions";
export function App({
projectId,
useRag,
- mcpServerUrls,
- toolWebhookUrl,
}: {
projectId: string;
useRag: boolean;
- mcpServerUrls: Array>;
- toolWebhookUrl: string;
}) {
const [selectorKey, setSelectorKey] = useState(0);
const [workflow, setWorkflow] = useState> | null>(null);
@@ -27,17 +25,23 @@ export function App({
const [dataSources, setDataSources] = useState>[] | null>(null);
const [loading, setLoading] = useState(false);
const [autoSelectIfOnlyOneWorkflow, setAutoSelectIfOnlyOneWorkflow] = useState(true);
+ const [mcpServerUrls, setMcpServerUrls] = useState>>([]);
+ const [toolWebhookUrl, setToolWebhookUrl] = useState('');
const handleSelect = useCallback(async (workflowId: string) => {
setLoading(true);
const workflow = await fetchWorkflow(projectId, workflowId);
const publishedWorkflowId = await fetchPublishedWorkflowId(projectId);
const dataSources = await listDataSources(projectId);
+ const mcpServers = await listMcpServers(projectId);
+ const projectConfig = await getProjectConfig(projectId);
// Store the selected workflow ID in local storage
localStorage.setItem(`lastWorkflowId_${projectId}`, workflowId);
setWorkflow(workflow);
setPublishedWorkflowId(publishedWorkflowId);
setDataSources(dataSources);
+ setMcpServerUrls(mcpServers);
+ setToolWebhookUrl(projectConfig.webhookUrl ?? '');
setLoading(false);
}, [projectId]);
diff --git a/apps/rowboat/app/projects/[projectId]/workflow/page.tsx b/apps/rowboat/app/projects/[projectId]/workflow/page.tsx
index be6d588c..09ec0573 100644
--- a/apps/rowboat/app/projects/[projectId]/workflow/page.tsx
+++ b/apps/rowboat/app/projects/[projectId]/workflow/page.tsx
@@ -13,18 +13,16 @@ export default async function Page({
}: {
params: { projectId: string };
}) {
+ console.log('->>> workflow page being rendered');
const project = await projectsCollection.findOne({
_id: params.projectId,
});
if (!project) {
notFound();
}
- const toolWebhookUrl = project.webhookUrl ?? '';
return ;
}
diff --git a/apps/rowboat_agents/pyproject.toml b/apps/rowboat_agents/pyproject.toml
index 42334612..6bdc8bbe 100644
--- a/apps/rowboat_agents/pyproject.toml
+++ b/apps/rowboat_agents/pyproject.toml
@@ -80,6 +80,7 @@ python-docx = "^1.1.2"
python-dotenv = "^1.0.1"
pytz = "^2024.2"
qdrant-client = "*"
+Quart = "^0.20.0"
RapidFuzz = "^3.11.0"
redis = "^5.2.1"
requests = "^2.32.3"
diff --git a/apps/rowboat_agents/requirements.txt b/apps/rowboat_agents/requirements.txt
index a7035d18..3d2be2cc 100644
--- a/apps/rowboat_agents/requirements.txt
+++ b/apps/rowboat_agents/requirements.txt
@@ -66,6 +66,7 @@ python-docx==1.1.2
python-dotenv==1.0.1
pytz==2024.2
qdrant-client
+Quart==0.20.0
RapidFuzz==3.11.0
redis==5.2.1
requests==2.32.3
diff --git a/apps/rowboat_agents/src/app/main.py b/apps/rowboat_agents/src/app/main.py
index ccc65d45..6526c9e3 100644
--- a/apps/rowboat_agents/src/app/main.py
+++ b/apps/rowboat_agents/src/app/main.py
@@ -1,13 +1,13 @@
-from flask import Flask, request, jsonify, Response
+from quart import Quart, request, jsonify, Response
from datetime import datetime
from functools import wraps
import os
import redis
import uuid
import json
-import asyncio
from hypercorn.config import Config
from hypercorn.asyncio import serve
+import asyncio
from src.graph.core import run_turn, run_turn_streamed
from src.graph.tools import RAG_TOOL, CLOSE_CHAT_TOOL
@@ -17,19 +17,34 @@ from pprint import pprint
logger = common_logger
redis_client = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
-app = Flask(__name__)
+app = Quart(__name__)
+
+# filter out agent transfer messages using a function
+def is_agent_transfer_message(msg):
+ if (msg.get("role") == "assistant" and
+ msg.get("content") is None and
+ msg.get("tool_calls") is not None and
+ len(msg.get("tool_calls")) > 0 and
+ msg.get("tool_calls")[0].get("function").get("name") == "transfer_to_agent"):
+ return True
+ if (msg.get("role") == "tool" and
+ msg.get("tool_calls") is None and
+ msg.get("tool_call_id") is not None and
+ msg.get("tool_name") == "transfer_to_agent"):
+ return True
+ return False
@app.route("/health", methods=["GET"])
-def health():
+async def health():
return jsonify({"status": "ok"})
@app.route("/")
-def home():
+async def home():
return "Hello, World!"
def require_api_key(f):
@wraps(f)
- def decorated(*args, **kwargs):
+ async def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing or invalid authorization header'}), 401
@@ -39,16 +54,16 @@ def require_api_key(f):
if actual and token != actual:
return jsonify({'error': 'Invalid API key'}), 403
- return f(*args, **kwargs)
+ return await f(*args, **kwargs)
return decorated
@app.route("/chat", methods=["POST"])
@require_api_key
-def chat():
+async def chat():
logger.info('='*100)
logger.info(f"{'*'*100}Running server mode{'*'*100}")
try:
- data = request.get_json()
+ data = await request.get_json()
logger.info('Complete request:')
logger.info(data)
logger.info('-'*100)
@@ -56,9 +71,12 @@ def chat():
start_time = datetime.now()
config = read_json_from_file("./configs/default_config.json")
+ # filter out agent transfer messages
+ input_messages = [msg for msg in data.get("messages", []) if not is_agent_transfer_message(msg)]
+
logger.info('Beginning turn')
resp_messages, resp_tokens_used, resp_state = run_turn(
- messages=data.get("messages", []),
+ messages=input_messages,
start_agent_name=data.get("startAgent", ""),
agent_configs=data.get("agents", []),
tool_configs=data.get("tools", []),
@@ -94,19 +112,27 @@ def chat():
@app.route("/chat_stream_init", methods=["POST"])
@require_api_key
-def chat_stream_init():
+async def chat_stream_init():
# create a uuid for the stream
stream_id = str(uuid.uuid4())
# store the request data in redis with 10 minute TTL
- data = request.get_json()
+ data = await request.get_json()
redis_client.setex(f"stream_request_{stream_id}", 600, json.dumps(data))
- return jsonify({"stream_id": stream_id})
+ print('* stream init'*200)
+
+ return jsonify({"streamId": stream_id})
+
+def format_sse(data: dict, event: str = None) -> str:
+ msg = f"data: {json.dumps(data)}\n\n"
+ if event is not None:
+ msg = f"event: {event}\n{msg}"
+ return msg
@app.route("/chat_stream/", methods=["GET"])
@require_api_key
-def chat_stream(stream_id):
+async def chat_stream(stream_id):
# get the request data from redis
request_data = redis_client.get(f"stream_request_{stream_id}")
if not request_data:
@@ -114,17 +140,18 @@ def chat_stream(stream_id):
request_data = json.loads(request_data)
config = read_json_from_file("./configs/default_config.json")
+
+ # filter out agent transfer messages
+ input_messages = [msg for msg in request_data["messages"] if not is_agent_transfer_message(msg)]
# Preprocess messages to handle null content and role issues
- for msg in request_data["messages"]:
- # Handle null content in assistant messages with tool calls
+ for msg in input_messages:
if (msg.get("role") == "assistant" and
msg.get("content") is None and
msg.get("tool_calls") is not None and
len(msg.get("tool_calls")) > 0):
msg["content"] = "Calling tool"
- # Handle role issues
if msg.get("role") == "tool":
msg["role"] = "developer"
elif not msg.get("role"):
@@ -135,12 +162,11 @@ def chat_stream(stream_id):
print('*'*200)
pprint(request_data)
print('='*200)
-
- async def process_stream():
+ async def generate():
try:
async for event_type, event_data in run_turn_streamed(
- messages=request_data.get("messages", []),
+ messages=input_messages,
start_agent_name=request_data.get("startAgent", ""),
agent_configs=request_data.get("agents", []),
tool_configs=request_data.get("tools", []),
@@ -153,43 +179,16 @@ def chat_stream(stream_id):
print('*'*200)
print("Yielding message:")
print('*'*200)
- to_yield = f"event: message\ndata: {json.dumps(event_data)}\n\n"
- print(to_yield)
- print('='*200)
- yield to_yield
+ yield format_sse(event_data, "message")
elif event_type == 'done':
print('*'*200)
print("Yielding done:")
print('*'*200)
- to_yield = f"event: done\ndata: {json.dumps(event_data)}\n\n"
- print(to_yield)
- print('='*200)
- yield to_yield
+ yield format_sse(event_data, "done")
except Exception as e:
logger.error(f"Streaming error: {str(e)}")
- yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
-
- def generate():
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- try:
- async def get_all_chunks():
- chunks = []
- async for chunk in process_stream():
- chunks.append(chunk)
- return chunks
-
- chunks = loop.run_until_complete(get_all_chunks())
- for chunk in chunks:
- yield chunk
-
- except Exception as e:
- logger.error(f"Error in generate: {e}")
- raise
- finally:
- loop.close()
+ yield format_sse({"error": str(e)}, "error")
return Response(generate(), mimetype='text/event-stream')
diff --git a/apps/rowboat_agents/src/graph/core.py b/apps/rowboat_agents/src/graph/core.py
index 371e9aee..af00f870 100644
--- a/apps/rowboat_agents/src/graph/core.py
+++ b/apps/rowboat_agents/src/graph/core.py
@@ -1,6 +1,7 @@
from copy import deepcopy
from datetime import datetime
-
+import json
+import uuid
import logging
from .helpers.access import (
get_agent_by_name,
@@ -285,16 +286,45 @@ async def run_turn_streamed(
# Update current agent when it changes
elif event.type == "agent_updated_stream_event":
current_agent = event.new_agent
+ tool_call_id = str(uuid.uuid4())
+
+ # yield the transfer invocation
message = {
- 'content': f"Agent changed to {current_agent.name}",
+ 'content': None,
'role': 'assistant',
'sender': current_agent.name,
- 'tool_calls': None,
+ 'tool_calls': [{
+ 'function': {
+ 'name': 'transfer_to_agent',
+ 'arguments': json.dumps({
+ 'assistant': event.new_agent.name
+ })
+ },
+ 'id': tool_call_id,
+ 'type': 'function'
+ }],
'tool_call_id': None,
+ 'tool_name': None,
'response_type': 'internal'
}
print("Yielding message: ", message)
yield ('message', message)
+
+ # yield the transfer result
+ message = {
+ 'content': json.dumps({
+ 'assistant': event.new_agent.name
+ }),
+ 'role': 'tool',
+ 'sender': None,
+ 'tool_calls': None,
+ 'tool_call_id': tool_call_id,
+ 'tool_name': 'transfer_to_agent',
+ }
+ print("Yielding message: ", message)
+ yield ('message', message)
+
+ current_agent = event.new_agent
continue
# Handle run items (tools, messages, etc)
From 695a961333917a5dc3ee9f7642b5db15586619c6 Mon Sep 17 00:00:00 2001
From: akhisud3195
Date: Wed, 26 Mar 2025 15:22:59 +0530
Subject: [PATCH 13/14] Fix tool call id
---
apps/rowboat_agents/src/graph/core.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/apps/rowboat_agents/src/graph/core.py b/apps/rowboat_agents/src/graph/core.py
index af00f870..b0f5b633 100644
--- a/apps/rowboat_agents/src/graph/core.py
+++ b/apps/rowboat_agents/src/graph/core.py
@@ -267,9 +267,9 @@ async def run_turn_streamed(
# Process streaming events
async for event in stream_result.stream_events():
- # print('='*50)
- # print("Received event: ", event)
- # print('-'*50)
+ print('='*50)
+ print("Received event: ", event)
+ print('-'*50)
# Handle raw response events and accumulate tokens
if event.type == "raw_response_event":
@@ -339,7 +339,7 @@ async def run_turn_streamed(
'name': event.item.raw_item.name,
'arguments': event.item.raw_item.arguments
},
- 'id': event.item.raw_item.id,
+ 'id': event.item.raw_item.call_id,
'type': 'function'
}],
'tool_call_id': None,
From 277a553ee10b8f476be38980cfa7002905f7749d Mon Sep 17 00:00:00 2001
From: Ramnique Singh <30795890+ramnique@users.noreply.github.com>
Date: Wed, 26 Mar 2025 17:01:49 +0530
Subject: [PATCH 14/14] docker fixes for agents
---
.../v1/stream-response/[streamId]/route.ts | 2 +-
apps/rowboat_agents/.dockerignore | 1 +
apps/rowboat_agents/Dockerfile | 4 +-
apps/rowboat_agents/poetry.lock | 42 ++++++++++++++++++-
apps/rowboat_agents/src/app/main.py | 9 ----
docker-compose.yml | 1 +
6 files changed, 46 insertions(+), 13 deletions(-)
diff --git a/apps/rowboat/app/api/v1/stream-response/[streamId]/route.ts b/apps/rowboat/app/api/v1/stream-response/[streamId]/route.ts
index dac45025..563db7bb 100644
--- a/apps/rowboat/app/api/v1/stream-response/[streamId]/route.ts
+++ b/apps/rowboat/app/api/v1/stream-response/[streamId]/route.ts
@@ -6,7 +6,7 @@ export async function GET(request: Request, { params }: { params: { streamId: st
// Fetch the upstream SSE stream.
const upstreamResponse = await fetch(upstreamUrl, {
headers: {
- 'Authorization': `Bearer ${process.env.AGENTS_API_KEY}`,
+ 'Authorization': `Bearer ${process.env.AGENTS_API_KEY || 'test'}`,
},
cache: 'no-store',
});
diff --git a/apps/rowboat_agents/.dockerignore b/apps/rowboat_agents/.dockerignore
index 4a45bc02..8b13b832 100644
--- a/apps/rowboat_agents/.dockerignore
+++ b/apps/rowboat_agents/.dockerignore
@@ -2,3 +2,4 @@
.env*
__pycache__/
venv/
+.venv/
\ No newline at end of file
diff --git a/apps/rowboat_agents/Dockerfile b/apps/rowboat_agents/Dockerfile
index 592a5444..019c6c2d 100644
--- a/apps/rowboat_agents/Dockerfile
+++ b/apps/rowboat_agents/Dockerfile
@@ -20,9 +20,9 @@ RUN poetry install --no-interaction --no-ansi
COPY . .
# Set environment variables
-ENV FLASK_APP=src.app.main
+ENV QUART_APP=src.app.main
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Command to run Flask development server
-CMD ["flask", "run", "--host=0.0.0.0", "--port=3001"]
+CMD ["quart", "run", "--host=0.0.0.0", "--port=3001"]
diff --git a/apps/rowboat_agents/poetry.lock b/apps/rowboat_agents/poetry.lock
index 71efeadb..43b83c76 100644
--- a/apps/rowboat_agents/poetry.lock
+++ b/apps/rowboat_agents/poetry.lock
@@ -1,5 +1,18 @@
# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand.
+[[package]]
+name = "aiofiles"
+version = "24.1.0"
+description = "File support for asyncio."
+optional = false
+python-versions = ">=3.8"
+groups = ["main"]
+markers = "python_version <= \"3.11\" or python_version == \"3.12\" or python_version >= \"3.13\""
+files = [
+ {file = "aiofiles-24.1.0-py3-none-any.whl", hash = "sha256:b4ec55f4195e3eb5d7abd1bf7e061763e864dd4954231fb8539a0ef8bb8260e5"},
+ {file = "aiofiles-24.1.0.tar.gz", hash = "sha256:22a075c9e5a3810f0c2e48f3008c94d68c65d763b9b03857924c99e57355166c"},
+]
+
[[package]]
name = "aiohappyeyeballs"
version = "2.6.1"
@@ -3037,6 +3050,33 @@ urllib3 = ">=1.26.14,<3"
fastembed = ["fastembed (==0.5.1)"]
fastembed-gpu = ["fastembed-gpu (==0.5.1)"]
+[[package]]
+name = "quart"
+version = "0.20.0"
+description = "A Python ASGI web framework with the same API as Flask"
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+markers = "python_version <= \"3.11\" or python_version == \"3.12\" or python_version >= \"3.13\""
+files = [
+ {file = "quart-0.20.0-py3-none-any.whl", hash = "sha256:003c08f551746710acb757de49d9b768986fd431517d0eb127380b656b98b8f1"},
+ {file = "quart-0.20.0.tar.gz", hash = "sha256:08793c206ff832483586f5ae47018c7e40bdd75d886fee3fabbdaa70c2cf505d"},
+]
+
+[package.dependencies]
+aiofiles = "*"
+blinker = ">=1.6"
+click = ">=8.0"
+flask = ">=3.0"
+hypercorn = ">=0.11.2"
+itsdangerous = "*"
+jinja2 = "*"
+markupsafe = "*"
+werkzeug = ">=3.0"
+
+[package.extras]
+dotenv = ["python-dotenv"]
+
[[package]]
name = "rapidfuzz"
version = "3.12.2"
@@ -4091,4 +4131,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
-content-hash = "fc7b2d64a9f856b5a0c1aceb6be83ac18f8f9c6f95b8db63b28e69d385dcd38b"
+content-hash = "fa1d4cfd411dca631759ca36e0c7a02c1fe4c11b1ee121557a92cd616867e9a7"
diff --git a/apps/rowboat_agents/src/app/main.py b/apps/rowboat_agents/src/app/main.py
index 6526c9e3..e282fae4 100644
--- a/apps/rowboat_agents/src/app/main.py
+++ b/apps/rowboat_agents/src/app/main.py
@@ -120,8 +120,6 @@ async def chat_stream_init():
data = await request.get_json()
redis_client.setex(f"stream_request_{stream_id}", 600, json.dumps(data))
- print('* stream init'*200)
-
return jsonify({"streamId": stream_id})
def format_sse(data: dict, event: str = None) -> str:
@@ -157,11 +155,8 @@ async def chat_stream(stream_id):
elif not msg.get("role"):
msg["role"] = "user"
- print('*'*200)
print("Request:")
- print('*'*200)
pprint(request_data)
- print('='*200)
async def generate():
try:
@@ -176,14 +171,10 @@ async def chat_stream(stream_id):
complete_request=request_data
):
if event_type == 'message':
- print('*'*200)
print("Yielding message:")
- print('*'*200)
yield format_sse(event_data, "message")
elif event_type == 'done':
- print('*'*200)
print("Yielding done:")
- print('*'*200)
yield format_sse(event_data, "done")
except Exception as e:
diff --git a/docker-compose.yml b/docker-compose.yml
index cfd098ae..f54fba01 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -46,6 +46,7 @@ services:
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- API_KEY=${AGENTS_API_KEY}
+ - REDIS_URL=redis://redis:6379
restart: unless-stopped
copilot: