From 122ee00fac9c4c4983a8401ea580768fb9e3bb2d Mon Sep 17 00:00:00 2001 From: arkml Date: Wed, 9 Apr 2025 19:35:25 +0530 Subject: [PATCH] bubble up exceptions in tools --- .../rowboat_agents/src/graph/swarm_wrapper.py | 167 ++++++++---------- 1 file changed, 76 insertions(+), 91 deletions(-) diff --git a/apps/rowboat_agents/src/graph/swarm_wrapper.py b/apps/rowboat_agents/src/graph/swarm_wrapper.py index 254dc10b..4389d633 100644 --- a/apps/rowboat_agents/src/graph/swarm_wrapper.py +++ b/apps/rowboat_agents/src/graph/swarm_wrapper.py @@ -38,62 +38,45 @@ class NewResponse(BaseModel): error_msg: Optional[str] = "" async def mock_tool(tool_name: str, args: str, description: str, mock_instructions: str) -> str: - """ - Handles tool execution by either using mock instructions or generating a response. + try: + print(f"Mock tool called for: {tool_name}") - Args: - tool_name: The name of the tool - args: The arguments passed to the tool - tool_config: The configuration of the tool + messages = [ + {"role": "system", "content": f"You are simulating the execution of a tool called '{tool_name}'.Here is the description of the tool: {description}. Here are the instructions for the mock tool: {mock_instructions}. Generate a realistic response as if the tool was actually executed with the given parameters."}, + {"role": "user", "content": f"Generate a realistic response for the tool '{tool_name}' with these parameters: {args}. The response should be concise and focused on what the tool would actually return."} + ] - Returns: - The response from the tool - """ - print(f"Mock tool called for: {tool_name}") - - - messages = [ - {"role": "system", "content": f"You are simulating the execution of a tool called '{tool_name}'.Here is the description of the tool: {description}. Here are the instructions for the mock tool: {mock_instructions}. Generate a realistic response as if the tool was actually executed with the given parameters."}, - {"role": "user", "content": f"Generate a realistic response for the tool '{tool_name}' with these parameters: {args}. The response should be concise and focused on what the tool would actually return."} - ] - - print(f"Generating simulated response for tool: {tool_name}") - response_content = generate_openai_output(messages, output_type='text', model="gpt-4o") - return response_content + print(f"Generating simulated response for tool: {tool_name}") + response_content = generate_openai_output(messages, output_type='text', model="gpt-4o") + return response_content + except Exception as e: + logger.error(f"Error in mock_tool: {str(e)}") + return f"Error: {str(e)}" async def call_webhook(tool_name: str, args: str, webhook_url: str, signing_secret: str) -> str: - """ - Calls the webhook with the given tool name and arguments. - - Args: - tool_name (str): The name of the tool to call. - args (str): The arguments for the tool as a JSON string. - - Returns: - str: The response from the webhook, or an error message if the call fails. - """ - content_dict = { - "toolCall": { - "function": { - "name": tool_name, - "arguments": args + try: + print(f"Calling webhook for tool: {tool_name}") + content_dict = { + "toolCall": { + "function": { + "name": tool_name, + "arguments": args + } } } - } - request_body = { - "content": json.dumps(content_dict) - } + request_body = { + "content": json.dumps(content_dict) + } - # Prepare headers - headers = {} - if signing_secret: - content_str = request_body["content"] - body_hash = hashlib.sha256(content_str.encode('utf-8')).hexdigest() - payload = {"bodyHash": body_hash} - signature_jwt = jwt.encode(payload, signing_secret, algorithm="HS256") - headers["X-Signature-Jwt"] = signature_jwt + # Prepare headers + headers = {} + if signing_secret: + content_str = request_body["content"] + body_hash = hashlib.sha256(content_str.encode('utf-8')).hexdigest() + payload = {"bodyHash": body_hash} + signature_jwt = jwt.encode(payload, signing_secret, algorithm="HS256") + headers["X-Signature-Jwt"] = signature_jwt - try: async with aiohttp.ClientSession() as session: async with session.post(webhook_url, json=request_body, headers=headers) as response: if response.status == 200: @@ -104,58 +87,60 @@ async def call_webhook(tool_name: str, args: str, webhook_url: str, signing_secr print(f"Webhook error: {error_msg}") return f"Error: {error_msg}" except Exception as e: - print(f"Exception in call_webhook: {str(e)}") + logger.error(f"Exception in call_webhook: {str(e)}") return f"Error: Failed to call webhook - {str(e)}" async def call_mcp(tool_name: str, args: str, mcp_server_url: str) -> str: - """ - Calls the MCP with the given tool name and arguments. - """ + try: + print(f"MCP tool called for: {tool_name}") + async with sse_client(url=mcp_server_url) as streams: + async with ClientSession(*streams) as session: + await session.initialize() + jargs = json.loads(args) + response = await session.call_tool(tool_name, arguments=jargs) + json_output = json.dumps([item.__dict__ for item in response.content], indent=2) - async with sse_client(url=mcp_server_url) as streams: - async with ClientSession(*streams) as session: - await session.initialize() - jargs = json.loads(args) - response = await session.call_tool(tool_name, arguments=jargs) - json_output = json.dumps([item.__dict__ for item in response.content], indent=2) - - return json_output + return json_output + except Exception as e: + logger.error(f"Error in call_mcp: {str(e)}") + return f"Error: {str(e)}" async def catch_all(ctx: RunContextWrapper[Any], args: str, tool_name: str, tool_config: dict, complete_request: dict) -> str: - """ - Handles all tool calls by dispatching to appropriate functions. - """ - print(f"Catch all called for tool: {tool_name}") - print(f"Args: {args}") - print(f"Tool config: {tool_config}") - - # Create event loop for async operations try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + print(f"Catch all called for tool: {tool_name}") + print(f"Args: {args}") + print(f"Tool config: {tool_config}") - response_content = None - if tool_config.get("mockTool", False) or complete_request.get("testProfile", {}).get("mockTools", False): - # Call mock_tool to handle the response (it will decide whether to use mock instructions or generate a response) - if complete_request.get("testProfile", {}).get("mockPrompt", ""): - response_content = await mock_tool(tool_name, args, tool_config.get("description", ""), complete_request.get("testProfile", {}).get("mockPrompt", "")) + # Create event loop for async operations + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + response_content = None + if tool_config.get("mockTool", False) or complete_request.get("testProfile", {}).get("mockTools", False): + # Call mock_tool to handle the response (it will decide whether to use mock instructions or generate a response) + if complete_request.get("testProfile", {}).get("mockPrompt", ""): + response_content = await mock_tool(tool_name, args, tool_config.get("description", ""), complete_request.get("testProfile", {}).get("mockPrompt", "")) + else: + response_content = await mock_tool(tool_name, args, tool_config.get("description", ""), tool_config.get("mockInstructions", "")) + print(response_content) + elif tool_config.get("isMcp", False): + mcp_server_name = tool_config.get("mcpServerName", "") + mcp_servers = complete_request.get("mcpServers", {}) + mcp_server_url = next((server.get("url", "") for server in mcp_servers if server.get("name") == mcp_server_name), "") + response_content = await call_mcp(tool_name, args, mcp_server_url) else: - response_content = await mock_tool(tool_name, args, tool_config.get("description", ""), tool_config.get("mockInstructions", "")) - print(response_content) - elif tool_config.get("isMcp", False): - mcp_server_name = tool_config.get("mcpServerName", "") - mcp_servers = complete_request.get("mcpServers", {}) - mcp_server_url = next((server.get("url", "") for server in mcp_servers if server.get("name") == mcp_server_name), "") - response_content = await call_mcp(tool_name, args, mcp_server_url) - else: - collection = db["projects"] - doc = collection.find_one({"_id": complete_request.get("projectId", "")}) - signing_secret = doc.get("secret", "") - webhook_url = complete_request.get("toolWebhookUrl", "") - response_content = await call_webhook(tool_name, args, webhook_url, signing_secret) - return response_content + collection = db["projects"] + doc = collection.find_one({"_id": complete_request.get("projectId", "")}) + signing_secret = doc.get("secret", "") + webhook_url = complete_request.get("toolWebhookUrl", "") + response_content = await call_webhook(tool_name, args, webhook_url, signing_secret) + return response_content + except Exception as e: + logger.error(f"Error in catch_all: {str(e)}") + return f"Error: {str(e)}" def get_rag_tool(config: dict, complete_request: dict) -> FunctionTool: