feat: MCP tool client infrastructure for agent extensibility

Add the full MCP tool pipeline enabling agents to invoke external tools
(like Brave Search) via MCP servers:

- Add ToolRequest/ToolResponse types and mcp-tool topics to @trustgraph/base
- Create McpToolService (FlowProcessor) that connects to external MCP servers
  via @modelcontextprotocol/sdk StreamableHTTP transport
- Add createMcpTool() to wire MCP tools into the agent's ReAct loop
- Implement config-driven tool registration in AgentService with backward-
  compatible fallback to hardcoded tools
- Add tool filtering by group and state (port of Python tool_filter.py)
- Register mcp-tool in gateway dispatcher and export from @trustgraph/flow
- Fix flow restart race condition: skip restart when flow definitions unchanged
- Update seed config with MCP server config and tool definitions
- Add run scripts for MCP tool service and Brave Search MCP server

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
elpresidank 2026-04-10 05:45:46 -05:00
parent f2b376abef
commit b854b56558
17 changed files with 600 additions and 17 deletions

View file

@ -0,0 +1 @@
export { McpToolService } from "./service.js";

View file

@ -0,0 +1,160 @@
/**
* MCP tool-calling service calls external MCP tool servers.
*
* Receives ToolRequest (name + JSON-encoded parameters) over NATS,
* connects to the appropriate MCP server via the MCP SDK,
* invokes the tool, and returns the result as a ToolResponse.
*
* MCP service configs are pushed via the config system under the "mcp" key.
*
* Python reference: trustgraph-flow/trustgraph/agent/mcp_tool/service.py
*/
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import {
FlowProcessor,
ConsumerSpec,
ProducerSpec,
type ProcessorConfig,
type FlowContext,
type ToolRequest,
type ToolResponse,
} from "@trustgraph/base";
interface McpServiceConfig {
url: string;
"remote-name"?: string;
"auth-token"?: string;
}
export class McpToolService extends FlowProcessor {
private mcpServices: Record<string, McpServiceConfig> = {};
constructor(config: ProcessorConfig) {
super(config);
this.registerSpecification(
new ConsumerSpec<ToolRequest>("request", this.onRequest.bind(this)),
);
this.registerSpecification(new ProducerSpec<ToolResponse>("response"));
this.registerConfigHandler(this.onMcpConfig.bind(this));
}
private async onMcpConfig(
config: Record<string, unknown>,
version: number,
): Promise<void> {
console.log(`[McpToolService] Got config version ${version}`);
if (!("mcp" in config) || typeof config.mcp !== "object" || config.mcp === null) {
this.mcpServices = {};
return;
}
const mcpConfig = config.mcp as Record<string, string>;
this.mcpServices = {};
for (const [name, value] of Object.entries(mcpConfig)) {
try {
this.mcpServices[name] = JSON.parse(value) as McpServiceConfig;
console.log(`[McpToolService] Registered MCP service: ${name}`);
} catch (err) {
console.error(`[McpToolService] Failed to parse MCP config for ${name}:`, err);
}
}
console.log(
`[McpToolService] ${Object.keys(this.mcpServices).length} MCP services configured`,
);
}
private async onRequest(
msg: ToolRequest,
properties: Record<string, string>,
flowCtx: FlowContext,
): Promise<void> {
const requestId = properties.id;
if (!requestId) return;
const responseProducer = flowCtx.flow.producer<ToolResponse>("response");
try {
const result = await this.invokeTool(
msg.name,
msg.parameters ? JSON.parse(msg.parameters) : {},
);
if (typeof result === "string") {
await responseProducer.send(requestId, { text: result });
} else {
await responseProducer.send(requestId, { object: JSON.stringify(result) });
}
} catch (err) {
console.error(`[McpToolService] Error invoking tool ${msg.name}:`, err);
const message = err instanceof Error ? err.message : String(err);
await responseProducer.send(requestId, {
error: { type: "tool-error", message },
});
}
}
private async invokeTool(
name: string,
parameters: Record<string, unknown>,
): Promise<string | unknown> {
if (!(name in this.mcpServices)) {
throw new Error(`MCP service "${name}" not known`);
}
const svcConfig = this.mcpServices[name];
if (!svcConfig.url) {
throw new Error(`MCP service "${name}" URL not defined`);
}
const remoteName = svcConfig["remote-name"] ?? name;
// Build headers with optional bearer token
const headers: Record<string, string> = {};
if (svcConfig["auth-token"]) {
headers["Authorization"] = `Bearer ${svcConfig["auth-token"]}`;
}
console.log(`[McpToolService] Invoking ${remoteName} at ${svcConfig.url}`);
// Connect to streamable HTTP MCP server
const transport = new StreamableHTTPClientTransport(
new URL(svcConfig.url),
{ requestInit: { headers } },
);
const client = new Client({ name: "trustgraph-mcp-client", version: "1.0.0" });
try {
await client.connect(transport);
const result = await client.callTool({
name: remoteName,
arguments: parameters,
});
// Extract response — prefer structured content, fall back to text
if (result.structuredContent) {
return result.structuredContent;
}
if (result.content && Array.isArray(result.content)) {
return result.content
.filter((c): c is { type: "text"; text: string } => c.type === "text")
.map((c) => c.text)
.join("");
}
return "No content";
} finally {
await transport.close();
}
}
}

View file

@ -9,6 +9,10 @@
* 4. Executes tools and feeds observations back to the LLM
* 5. Sends streaming AgentResponse chunks (thought, observation, answer, error)
*
* Tools can be registered statically (hardcoded fallback) or dynamically via
* config-push. When a "tool" section is present in config, tools are built
* from that config; otherwise the 3 default tools are used for backward compat.
*
* Python reference: trustgraph-flow/trustgraph/agent/react/service.py
*/
@ -29,19 +33,26 @@ import {
type DocumentRagResponse,
type TriplesQueryRequest,
type TriplesQueryResponse,
type ToolRequest,
type ToolResponse,
} from "@trustgraph/base";
import {
createKnowledgeQueryTool,
createDocumentQueryTool,
createTriplesQueryTool,
createMcpTool,
} from "./tools.js";
import { buildReActPrompt } from "./prompt.js";
import type { AgentTool } from "./types.js";
import { filterToolsByGroupAndState, getNextState } from "../tool-filter.js";
import type { AgentTool, ToolArg } from "./types.js";
const MAX_ITERATIONS = 10;
export class AgentService extends FlowProcessor {
/** Config-driven tools; null means "use hardcoded fallback". */
private configuredTools: AgentTool[] | null = null;
constructor(config: ProcessorConfig) {
super(config);
@ -83,9 +94,175 @@ export class AgentService extends FlowProcessor {
),
);
// MCP tool invocation client
this.registerSpecification(
new RequestResponseSpec<ToolRequest, ToolResponse>(
"mcp-tool",
"mcp-tool-request",
"mcp-tool-response",
),
);
// Register for config-push to build tools dynamically
this.registerConfigHandler(this.onToolsConfig.bind(this));
console.log("[AgentService] Service initialized");
}
// ---------- Config-driven tool registration ----------
private async onToolsConfig(
config: Record<string, unknown>,
version: number,
): Promise<void> {
console.log(`[AgentService] Loading tool configuration version ${version}`);
try {
if (!("tool" in config) || typeof config.tool !== "object" || config.tool === null) {
// No tool config — keep using hardcoded fallback
this.configuredTools = null;
console.log("[AgentService] No tool config found, using default tools");
return;
}
const toolConfig = config.tool as Record<string, string>;
const tools: AgentTool[] = [];
for (const [_toolId, toolValue] of Object.entries(toolConfig)) {
try {
const data = JSON.parse(toolValue) as Record<string, unknown>;
const implType = data["type"] as string;
const name = data["name"] as string;
const description = data["description"] as string ?? "";
if (!name) {
console.warn(`[AgentService] Skipping tool with no name: ${_toolId}`);
continue;
}
let tool: AgentTool | null = null;
switch (implType) {
case "knowledge-query":
// Will be wired to requestor at request time
tool = {
name,
description: description || "Query the knowledge graph for information about entities and their relationships.",
args: [{ name: "question", type: "string", description: "The question to ask" }],
config: data,
execute: async () => "", // placeholder — wired at request time
};
break;
case "document-query":
tool = {
name,
description: description || "Search documents for relevant information.",
args: [{ name: "question", type: "string", description: "The question to search for" }],
config: data,
execute: async () => "",
};
break;
case "triples-query":
tool = {
name,
description: description || "Query for specific triples in the knowledge graph.",
args: [
{ name: "subject", type: "string", description: "Subject entity (optional)" },
{ name: "predicate", type: "string", description: "Predicate/relationship (optional)" },
{ name: "object", type: "string", description: "Object entity (optional)" },
],
config: data,
execute: async () => "",
};
break;
case "mcp-tool": {
const configArgs = (data["arguments"] as Array<Record<string, string>>) ?? [];
const args: ToolArg[] = configArgs.map((a) => ({
name: a.name ?? "",
type: a.type ?? "string",
description: a.description ?? "",
}));
// Create a placeholder — will be wired to the MCP requestor at request time
tool = {
name,
description,
args,
config: data,
execute: async () => "", // placeholder
};
break;
}
default:
console.warn(`[AgentService] Unknown tool type "${implType}" for ${name}`);
continue;
}
if (tool) {
tools.push(tool);
console.log(`[AgentService] Registered tool: ${name} (${implType})`);
}
} catch (err) {
console.error(`[AgentService] Failed to parse tool config ${_toolId}:`, err);
}
}
this.configuredTools = tools.length > 0 ? tools : null;
console.log(`[AgentService] ${tools.length} tools loaded from config`);
} catch (err) {
console.error("[AgentService] Config reload failed:", err);
}
}
/**
* Wire up tool execute functions with live requestors from the flow context.
* Config-driven tools store placeholders; this replaces them with real impls.
*/
private wireTools(tools: AgentTool[], flowCtx: FlowContext, collection?: string): AgentTool[] {
return tools.map((tool) => {
const implType = tool.config?.["type"] as string | undefined;
switch (implType) {
case "knowledge-query": {
const live = createKnowledgeQueryTool(
flowCtx.flow.requestor<GraphRagRequest, GraphRagResponse>("graph-rag"),
collection,
);
return { ...tool, execute: live.execute };
}
case "document-query": {
const live = createDocumentQueryTool(
flowCtx.flow.requestor<DocumentRagRequest, DocumentRagResponse>("doc-rag"),
collection,
);
return { ...tool, execute: live.execute };
}
case "triples-query": {
const live = createTriplesQueryTool(
flowCtx.flow.requestor<TriplesQueryRequest, TriplesQueryResponse>("triples"),
collection,
);
return { ...tool, execute: live.execute };
}
case "mcp-tool": {
const live = createMcpTool(
flowCtx.flow.requestor<ToolRequest, ToolResponse>("mcp-tool"),
tool.name,
tool.description,
tool.args,
);
return { ...tool, execute: live.execute };
}
default:
return tool;
}
});
}
private async onRequest(
msg: AgentRequest,
properties: Record<string, string>,
@ -97,21 +274,31 @@ export class AgentService extends FlowProcessor {
const responseProducer = flowCtx.flow.producer<AgentResponse>("agent-response");
try {
// Build tools from flow requestors
const tools: AgentTool[] = [
createKnowledgeQueryTool(
flowCtx.flow.requestor<GraphRagRequest, GraphRagResponse>("graph-rag"),
msg.collection,
),
createDocumentQueryTool(
flowCtx.flow.requestor<DocumentRagRequest, DocumentRagResponse>("doc-rag"),
msg.collection,
),
createTriplesQueryTool(
flowCtx.flow.requestor<TriplesQueryRequest, TriplesQueryResponse>("triples"),
msg.collection,
),
];
// Build tools — config-driven or hardcoded fallback
let tools: AgentTool[];
if (this.configuredTools) {
tools = this.wireTools(this.configuredTools, flowCtx, msg.collection);
} else {
// Hardcoded fallback (backward compat)
tools = [
createKnowledgeQueryTool(
flowCtx.flow.requestor<GraphRagRequest, GraphRagResponse>("graph-rag"),
msg.collection,
),
createDocumentQueryTool(
flowCtx.flow.requestor<DocumentRagRequest, DocumentRagResponse>("doc-rag"),
msg.collection,
),
createTriplesQueryTool(
flowCtx.flow.requestor<TriplesQueryRequest, TriplesQueryResponse>("triples"),
msg.collection,
),
];
}
// Apply tool filtering by group and state
tools = filterToolsByGroupAndState(tools, msg.group, msg.state);
// Build the ReAct prompt
const { system, prompt: initialPrompt } = buildReActPrompt(

View file

@ -13,10 +13,12 @@ import type {
DocumentRagResponse,
TriplesQueryRequest,
TriplesQueryResponse,
ToolRequest,
ToolResponse,
Term,
} from "@trustgraph/base";
import type { AgentTool } from "./types.js";
import type { AgentTool, ToolArg } from "./types.js";
/**
* Format a Term to a human-readable string.
@ -197,3 +199,29 @@ export function createTriplesQueryTool(
},
};
}
/**
* Create an agent tool that delegates to the MCP tool service via NATS.
*
* The MCP tool service handles the actual MCP server connection;
* this function just wraps it as an AgentTool the ReAct agent can invoke.
*/
export function createMcpTool(
client: RequestResponse<ToolRequest, ToolResponse>,
toolName: string,
description: string,
args: ToolArg[],
): AgentTool {
return {
name: toolName,
description,
args,
async execute(input: string): Promise<string> {
const res = await client.request({ name: toolName, parameters: input });
if (res.error) return `Error: ${res.error.message}`;
if (res.text) return res.text;
if (res.object) return res.object;
return "No content";
},
};
}

View file

@ -13,6 +13,8 @@ export interface AgentTool {
description: string;
args: ToolArg[];
execute: (input: string) => Promise<string>;
/** Full tool config from config-push (used by tool filtering). */
config?: Record<string, unknown>;
}
export type ReActState =

View file

@ -0,0 +1,61 @@
/**
* Tool filtering logic for the TrustGraph tool group system.
*
* Filters available tools based on group membership and execution state.
*
* Python reference: trustgraph-flow/trustgraph/agent/tool_filter.py
*/
import type { AgentTool } from "./react/types.js";
/**
* Filter tools based on group membership and execution state.
*/
export function filterToolsByGroupAndState(
tools: AgentTool[],
requestedGroups?: string[],
currentState?: string,
): AgentTool[] {
const groups = requestedGroups ?? ["default"];
const state = currentState || "undefined";
return tools.filter((tool) => isToolAvailable(tool, groups, state));
}
function isToolAvailable(
tool: AgentTool,
requestedGroups: string[],
currentState: string,
): boolean {
const config = tool.config ?? {};
// Get tool groups (default to ["default"])
let toolGroups = config["group"] as string[] | string | undefined;
if (!toolGroups) toolGroups = ["default"];
if (!Array.isArray(toolGroups)) toolGroups = [toolGroups];
// Get tool applicable states (default to ["*"] = all states)
let applicableStates = config["applicable-states"] as string[] | string | undefined;
if (!applicableStates) applicableStates = ["*"];
if (!Array.isArray(applicableStates)) applicableStates = [applicableStates];
// Group match: wildcard in requested groups, or intersection non-empty
const groupMatch =
requestedGroups.includes("*") ||
toolGroups.some((g) => requestedGroups.includes(g));
// State match: wildcard in applicable states, or current state matches
const stateMatch =
applicableStates.includes("*") ||
applicableStates.includes(currentState);
return groupMatch && stateMatch;
}
/**
* Get the next state after successful tool execution.
*/
export function getNextState(tool: AgentTool, currentState: string): string {
const nextState = tool.config?.["state"] as string | undefined;
return nextState || currentState;
}

View file

@ -31,6 +31,7 @@ const FLOW_SERVICES: ReadonlyMap<string, { request: string; response: string }>
["graph-embeddings", { request: "graph-embeddings-request", response: "graph-embeddings-response" }],
["document-embeddings", { request: "doc-embeddings-request", response: "doc-embeddings-response" }],
["triples", { request: "triples-request", response: "triples-response" }],
["mcp-tool", { request: "mcp-tool-request", response: "mcp-tool-response" }],
]);
/**

View file

@ -48,6 +48,12 @@ export { ConfigService, type ConfigServiceConfig } from "./config/service.js";
// ReAct agent
export { AgentService } from "./agent/react/index.js";
// MCP tool service
export { McpToolService } from "./agent/mcp-tool/index.js";
// Tool filtering
export { filterToolsByGroupAndState, getNextState } from "./agent/tool-filter.js";
// Librarian service
export { LibrarianService, type LibrarianServiceConfig } from "./librarian/service.js";
export { CollectionManager, type CollectionEntry } from "./librarian/collection-manager.js";