diff --git a/apps/rowboat/app/actions/klavis_actions.ts b/apps/rowboat/app/actions/klavis_actions.ts index 8ed4812e..66d86a66 100644 --- a/apps/rowboat/app/actions/klavis_actions.ts +++ b/apps/rowboat/app/actions/klavis_actions.ts @@ -388,7 +388,8 @@ export async function createMcpServerInstance( const requestBody = { serverName, userId: projectId, - platformName + platformName, + connectionType: "StreamableHttp", }; console.log('[Klavis API] Creating server instance:', requestBody); diff --git a/apps/rowboat/app/actions/mcp_actions.ts b/apps/rowboat/app/actions/mcp_actions.ts index 6ca18356..c8003124 100644 --- a/apps/rowboat/app/actions/mcp_actions.ts +++ b/apps/rowboat/app/actions/mcp_actions.ts @@ -2,12 +2,42 @@ import { z } from "zod"; import { WorkflowTool } from "../lib/types/workflow_types"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; import { projectAuthCheck } from "./project_actions"; import { projectsCollection, agentWorkflowsCollection } from "../lib/mongodb"; import { Project } from "../lib/types/project_types"; -import { MCPServer, McpTool, McpServerTool, convertMcpServerToolToWorkflowTool } from "../lib/types/types"; -import { ObjectId } from "mongodb"; +import { MCPServer, McpServerTool, convertMcpServerToolToWorkflowTool } from "../lib/types/types"; + +async function getMcpClient(serverUrl: string, serverName: string): Promise { + let client: Client | undefined = undefined; + const baseUrl = new URL(serverUrl); + + // Try to connect using Streamable HTTP transport + try { + client = new Client({ + name: 'streamable-http-client', + version: '1.0.0' + }); + const transport = new StreamableHTTPClientTransport( + new URL(baseUrl) + ); + await client.connect(transport); + console.log(`[MCP] Connected using Streamable HTTP transport to ${serverName}`); + return client; + } catch (error) { + // If that fails with a 4xx error, try the older SSE transport + console.log(`[MCP] Streamable HTTP connection failed, falling back to SSE transport for ${serverName}`); + client = new Client({ + name: 'sse-client', + version: '1.0.0' + }); + const sseTransport = new SSEClientTransport(baseUrl); + await client.connect(sseTransport); + console.log(`[MCP] Connected using SSE transport to ${serverName}`); + return client; + } +} export async function fetchMcpTools(projectId: string): Promise[]> { await projectAuthCheck(projectId); @@ -21,25 +51,9 @@ export async function fetchMcpTools(projectId: string): Promise=0.23.1", markers = "sys_platform != \"emscripten\""} [package.extras] cli = ["python-dotenv (>=1.0.0)", "typer (>=0.12.4)"] @@ -2903,6 +2904,18 @@ files = [ [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "python-multipart" +version = "0.0.20" +description = "A streaming multipart parser for Python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104"}, + {file = "python_multipart-0.0.20.tar.gz", hash = "sha256:8dd0cab45b8e23064ae09147625994d090fa46f5b0d1e13af944c331a7fa9d13"}, +] + [[package]] name = "pytz" version = "2024.2" @@ -3479,6 +3492,7 @@ description = "The lightning-fast ASGI server." optional = false python-versions = ">=3.9" groups = ["main"] +markers = "sys_platform != \"emscripten\"" files = [ {file = "uvicorn-0.34.0-py3-none-any.whl", hash = "sha256:023dc038422502fa28a09c7a30bf2b6991512da7dcdb8fd35fe57cfc154126f4"}, {file = "uvicorn-0.34.0.tar.gz", hash = "sha256:404051050cd7e905de2c9a7e61790943440b3416f49cb409f965d9dcd0fa73e9"}, @@ -3993,4 +4007,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "9b132012b1e894f31b66796668c874f0c81ca3077c67e12878b00ccc3e8242ac" +content-hash = "30be3fa96b1ed90e3b6f9c9f7327fb18cf427b8c209948bd3bbb927f5bc5c8f9" diff --git a/apps/rowboat_agents/pyproject.toml b/apps/rowboat_agents/pyproject.toml index 11edf8be..d7e8f4bb 100644 --- a/apps/rowboat_agents/pyproject.toml +++ b/apps/rowboat_agents/pyproject.toml @@ -54,7 +54,7 @@ keyring = "^25.6.0" lxml = "^5.3.0" markdownify = "^0.13.1" MarkupSafe = "^3.0.2" -mcp = "*" +mcp = "^1.9" more-itertools = "^10.6.0" motor = "*" msgpack = "^1.1.0" diff --git a/apps/rowboat_agents/src/graph/execute_turn.py b/apps/rowboat_agents/src/graph/execute_turn.py index 8eba3d4d..0c20b53b 100644 --- a/apps/rowboat_agents/src/graph/execute_turn.py +++ b/apps/rowboat_agents/src/graph/execute_turn.py @@ -23,6 +23,7 @@ from typing import Any import asyncio from mcp import ClientSession from mcp.client.sse import sse_client +from mcp.client.streamable_http import streamablehttp_client from pydantic import BaseModel from typing import List, Optional, Dict @@ -98,16 +99,34 @@ async def call_webhook(tool_name: str, args: str, webhook_url: str, signing_secr async def call_mcp(tool_name: str, args: str, mcp_server_url: str) -> str: try: print(f"MCP tool called for: {tool_name} with args: {args} at url: {mcp_server_url}") - async with sse_client(url=mcp_server_url) as streams: - async with ClientSession(*streams) as session: - await session.initialize() - jargs = json.loads(args) - response = await session.call_tool(tool_name, arguments=jargs) - json_output = json.dumps(response.content, default=lambda x: x.__dict__ if hasattr(x, '__dict__') else str(x), indent=2) - return json_output + # Try StreamableHTTP first + try: + print("Attempting to connect using StreamableHTTP...") + async with streamablehttp_client(mcp_server_url) as (read_stream, write_stream, _): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + jargs = json.loads(args) + response = await session.call_tool(tool_name, arguments=jargs) + json_output = json.dumps(response.content, default=lambda x: x.__dict__ if hasattr(x, '__dict__') else str(x), indent=2) + print("Successfully connected using StreamableHTTP") + return json_output + except Exception as streamable_error: + print(f"StreamableHTTP connection failed: {str(streamable_error)}") + print("Falling back to SSE...") + + # Fallback to SSE + async with sse_client(url=mcp_server_url) as streams: + async with ClientSession(*streams) as session: + await session.initialize() + jargs = json.loads(args) + response = await session.call_tool(tool_name, arguments=jargs) + json_output = json.dumps(response.content, default=lambda x: x.__dict__ if hasattr(x, '__dict__') else str(x), indent=2) + print("Successfully connected using SSE fallback") + return json_output + except Exception as e: - print(f"Error in call_mcp: {str(e)}") + print(f"Error in call_mcp (both StreamableHTTP and SSE failed): {str(e)}") return f"Error: {str(e)}" async def catch_all(ctx: RunContextWrapper[Any], args: str, tool_name: str, tool_config: dict, complete_request: dict) -> str: