support streameable HTTP mcp

This commit is contained in:
Ramnique Singh 2025-06-08 16:23:51 +05:30
parent f25e3e2ed4
commit a79667b401
7 changed files with 100 additions and 85 deletions

View file

@ -388,7 +388,8 @@ export async function createMcpServerInstance(
const requestBody = {
serverName,
userId: projectId,
platformName
platformName,
connectionType: "StreamableHttp",
};
console.log('[Klavis API] Creating server instance:', requestBody);

View file

@ -2,12 +2,42 @@
import { z } from "zod";
import { WorkflowTool } from "../lib/types/workflow_types";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { projectAuthCheck } from "./project_actions";
import { projectsCollection, agentWorkflowsCollection } from "../lib/mongodb";
import { Project } from "../lib/types/project_types";
import { MCPServer, McpTool, McpServerTool, convertMcpServerToolToWorkflowTool } from "../lib/types/types";
import { ObjectId } from "mongodb";
import { MCPServer, McpServerTool, convertMcpServerToolToWorkflowTool } from "../lib/types/types";
async function getMcpClient(serverUrl: string, serverName: string): Promise<Client> {
let client: Client | undefined = undefined;
const baseUrl = new URL(serverUrl);
// Try to connect using Streamable HTTP transport
try {
client = new Client({
name: 'streamable-http-client',
version: '1.0.0'
});
const transport = new StreamableHTTPClientTransport(
new URL(baseUrl)
);
await client.connect(transport);
console.log(`[MCP] Connected using Streamable HTTP transport to ${serverName}`);
return client;
} catch (error) {
// If that fails with a 4xx error, try the older SSE transport
console.log(`[MCP] Streamable HTTP connection failed, falling back to SSE transport for ${serverName}`);
client = new Client({
name: 'sse-client',
version: '1.0.0'
});
const sseTransport = new SSEClientTransport(baseUrl);
await client.connect(sseTransport);
console.log(`[MCP] Connected using SSE transport to ${serverName}`);
return client;
}
}
export async function fetchMcpTools(projectId: string): Promise<z.infer<typeof WorkflowTool>[]> {
await projectAuthCheck(projectId);
@ -21,25 +51,9 @@ export async function fetchMcpTools(projectId: string): Promise<z.infer<typeof W
for (const mcpServer of mcpServers) {
if (!mcpServer.isActive) continue;
try {
const transport = new SSEClientTransport(new URL(mcpServer.serverUrl!));
const client = new Client(
{
name: "rowboat-client",
version: "1.0.0"
},
{
capabilities: {
prompts: {},
resources: {},
tools: {}
}
}
);
await client.connect(transport);
const client = await getMcpClient(mcpServer.serverUrl!, mcpServer.name);
// List tools
const result = await client.listTools();
@ -110,23 +124,7 @@ export async function fetchMcpToolsForServer(projectId: string, serverName: stri
url: mcpServer.serverUrl
});
const transport = new SSEClientTransport(new URL(mcpServer.serverUrl));
const client = new Client(
{
name: "rowboat-client",
version: "1.0.0"
},
{
capabilities: {
prompts: {},
resources: {},
tools: {}
}
}
);
await client.connect(transport);
console.log('[Klavis API] MCP connection established:', { serverName });
const client = await getMcpClient(mcpServer.serverUrl, mcpServer.name);
// List tools
const result = await client.listTools();
@ -394,23 +392,8 @@ export async function testMcpTool(
toolId
});
const transport = new SSEClientTransport(new URL(mcpServer.serverUrl));
const client = new Client(
{
name: "rowboat-client",
version: "1.0.0"
},
{
capabilities: {
prompts: {},
resources: {},
tools: {}
}
}
);
const client = await getMcpClient(mcpServer.serverUrl, mcpServer.name);
await client.connect(transport);
console.log('[MCP Test] Connected to server, calling tool:', {
toolId,
parameters

View file

@ -24,7 +24,7 @@
"@langchain/core": "^0.3.7",
"@langchain/textsplitters": "^0.1.0",
"@mendable/firecrawl-js": "^1.0.3",
"@modelcontextprotocol/sdk": "^1.7.0",
"@modelcontextprotocol/sdk": "^1.12.1",
"@primer/react": "^36.27.0",
"@qdrant/js-client-rest": "^1.13.0",
"ai": "^4.3.13",
@ -10034,13 +10034,15 @@
}
},
"node_modules/@modelcontextprotocol/sdk": {
"version": "1.9.0",
"resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.9.0.tgz",
"integrity": "sha512-Jq2EUCQpe0iyO5FGpzVYDNFR6oR53AIrwph9yWl7uSc7IWUMsrmpmSaTGra5hQNunXpM+9oit85p924jWuHzUA==",
"version": "1.12.1",
"resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.12.1.tgz",
"integrity": "sha512-KG1CZhZfWg+u8pxeM/mByJDScJSrjjxLc8fwQqbsS8xCjBmQfMNEBTotYdNanKekepnfRI85GtgQlctLFpcYPw==",
"license": "MIT",
"dependencies": {
"ajv": "^6.12.6",
"content-type": "^1.0.5",
"cors": "^2.8.5",
"cross-spawn": "^7.0.3",
"cross-spawn": "^7.0.5",
"eventsource": "^3.0.2",
"express": "^5.0.1",
"express-rate-limit": "^7.5.0",
@ -13694,7 +13696,6 @@
"version": "6.12.6",
"resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz",
"integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==",
"dev": true,
"dependencies": {
"fast-deep-equal": "^3.1.1",
"fast-json-stable-stringify": "^2.0.0",
@ -14616,9 +14617,10 @@
}
},
"node_modules/cross-spawn": {
"version": "7.0.3",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz",
"integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==",
"version": "7.0.6",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz",
"integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==",
"license": "MIT",
"dependencies": {
"path-key": "^3.1.0",
"shebang-command": "^2.0.0",
@ -15961,8 +15963,7 @@
"node_modules/fast-deep-equal": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz",
"integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==",
"dev": true
"integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q=="
},
"node_modules/fast-diff": {
"version": "1.3.0",
@ -15998,8 +15999,7 @@
"node_modules/fast-json-stable-stringify": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz",
"integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==",
"dev": true
"integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw=="
},
"node_modules/fast-levenshtein": {
"version": "2.0.6",
@ -17443,8 +17443,7 @@
"node_modules/json-schema-traverse": {
"version": "0.4.1",
"resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz",
"integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==",
"dev": true
"integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg=="
},
"node_modules/json-stable-stringify-without-jsonify": {
"version": "1.0.1",
@ -21640,7 +21639,6 @@
"version": "4.4.1",
"resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz",
"integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==",
"dev": true,
"dependencies": {
"punycode": "^2.1.0"
}

View file

@ -31,7 +31,7 @@
"@langchain/core": "^0.3.7",
"@langchain/textsplitters": "^0.1.0",
"@mendable/firecrawl-js": "^1.0.3",
"@modelcontextprotocol/sdk": "^1.7.0",
"@modelcontextprotocol/sdk": "^1.12.1",
"@primer/react": "^36.27.0",
"@qdrant/js-client-rest": "^1.13.0",
"ai": "^4.3.13",

View file

@ -1819,14 +1819,14 @@ files = [
[[package]]
name = "mcp"
version = "1.6.0"
version = "1.9.3"
description = "Model Context Protocol SDK"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "mcp-1.6.0-py3-none-any.whl", hash = "sha256:7bd24c6ea042dbec44c754f100984d186620d8b841ec30f1b19eda9b93a634d0"},
{file = "mcp-1.6.0.tar.gz", hash = "sha256:d9324876de2c5637369f43161cd71eebfd803df5a95e46225cab8d280e366723"},
{file = "mcp-1.9.3-py3-none-any.whl", hash = "sha256:69b0136d1ac9927402ed4cf221d4b8ff875e7132b0b06edd446448766f34f9b9"},
{file = "mcp-1.9.3.tar.gz", hash = "sha256:587ba38448e81885e5d1b84055cfcc0ca56d35cd0c58f50941cab01109405388"},
]
[package.dependencies]
@ -1835,9 +1835,10 @@ httpx = ">=0.27"
httpx-sse = ">=0.4"
pydantic = ">=2.7.2,<3.0.0"
pydantic-settings = ">=2.5.2"
python-multipart = ">=0.0.9"
sse-starlette = ">=1.6.1"
starlette = ">=0.27"
uvicorn = ">=0.23.1"
uvicorn = {version = ">=0.23.1", markers = "sys_platform != \"emscripten\""}
[package.extras]
cli = ["python-dotenv (>=1.0.0)", "typer (>=0.12.4)"]
@ -2903,6 +2904,18 @@ files = [
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-multipart"
version = "0.0.20"
description = "A streaming multipart parser for Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104"},
{file = "python_multipart-0.0.20.tar.gz", hash = "sha256:8dd0cab45b8e23064ae09147625994d090fa46f5b0d1e13af944c331a7fa9d13"},
]
[[package]]
name = "pytz"
version = "2024.2"
@ -3479,6 +3492,7 @@ description = "The lightning-fast ASGI server."
optional = false
python-versions = ">=3.9"
groups = ["main"]
markers = "sys_platform != \"emscripten\""
files = [
{file = "uvicorn-0.34.0-py3-none-any.whl", hash = "sha256:023dc038422502fa28a09c7a30bf2b6991512da7dcdb8fd35fe57cfc154126f4"},
{file = "uvicorn-0.34.0.tar.gz", hash = "sha256:404051050cd7e905de2c9a7e61790943440b3416f49cb409f965d9dcd0fa73e9"},
@ -3993,4 +4007,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
content-hash = "9b132012b1e894f31b66796668c874f0c81ca3077c67e12878b00ccc3e8242ac"
content-hash = "30be3fa96b1ed90e3b6f9c9f7327fb18cf427b8c209948bd3bbb927f5bc5c8f9"

View file

@ -54,7 +54,7 @@ keyring = "^25.6.0"
lxml = "^5.3.0"
markdownify = "^0.13.1"
MarkupSafe = "^3.0.2"
mcp = "*"
mcp = "^1.9"
more-itertools = "^10.6.0"
motor = "*"
msgpack = "^1.1.0"

View file

@ -23,6 +23,7 @@ from typing import Any
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from pydantic import BaseModel
from typing import List, Optional, Dict
@ -98,16 +99,34 @@ async def call_webhook(tool_name: str, args: str, webhook_url: str, signing_secr
async def call_mcp(tool_name: str, args: str, mcp_server_url: str) -> str:
try:
print(f"MCP tool called for: {tool_name} with args: {args} at url: {mcp_server_url}")
async with sse_client(url=mcp_server_url) as streams:
async with ClientSession(*streams) as session:
await session.initialize()
jargs = json.loads(args)
response = await session.call_tool(tool_name, arguments=jargs)
json_output = json.dumps(response.content, default=lambda x: x.__dict__ if hasattr(x, '__dict__') else str(x), indent=2)
return json_output
# Try StreamableHTTP first
try:
print("Attempting to connect using StreamableHTTP...")
async with streamablehttp_client(mcp_server_url) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
jargs = json.loads(args)
response = await session.call_tool(tool_name, arguments=jargs)
json_output = json.dumps(response.content, default=lambda x: x.__dict__ if hasattr(x, '__dict__') else str(x), indent=2)
print("Successfully connected using StreamableHTTP")
return json_output
except Exception as streamable_error:
print(f"StreamableHTTP connection failed: {str(streamable_error)}")
print("Falling back to SSE...")
# Fallback to SSE
async with sse_client(url=mcp_server_url) as streams:
async with ClientSession(*streams) as session:
await session.initialize()
jargs = json.loads(args)
response = await session.call_tool(tool_name, arguments=jargs)
json_output = json.dumps(response.content, default=lambda x: x.__dict__ if hasattr(x, '__dict__') else str(x), indent=2)
print("Successfully connected using SSE fallback")
return json_output
except Exception as e:
print(f"Error in call_mcp: {str(e)}")
print(f"Error in call_mcp (both StreamableHTTP and SSE failed): {str(e)}")
return f"Error: {str(e)}"
async def catch_all(ctx: RunContextWrapper[Any], args: str, tool_name: str, tool_config: dict, complete_request: dict) -> str: