diff --git a/apps/cli/.rowboat/.gitignore b/apps/cli/.rowboat/.gitignore deleted file mode 100644 index d6b7ef32..00000000 --- a/apps/cli/.rowboat/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!.gitignore diff --git a/apps/cli/package-lock.json b/apps/cli/package-lock.json index 23f6872e..28c676f5 100644 --- a/apps/cli/package-lock.json +++ b/apps/cli/package-lock.json @@ -1,21 +1,25 @@ { - "name": "cli", - "version": "1.0.0", + "name": "@rowboatlabs/rowboatx", + "version": "0.3.0", "lockfileVersion": 3, "requires": true, "packages": { "": { - "name": "cli", - "version": "1.0.0", - "license": "ISC", + "name": "@rowboatlabs/rowboatx", + "version": "0.3.0", + "license": "MIT", "dependencies": { "@ai-sdk/google": "^2.0.25", "@ai-sdk/openai": "^2.0.53", "@modelcontextprotocol/sdk": "^1.20.2", "ai": "^5.0.78", + "json-schema-to-zod": "^2.6.1", "nanoid": "^5.1.6", "zod": "^4.1.12" }, + "bin": { + "rowboatx": "bin/app.js" + }, "devDependencies": { "@types/node": "^24.9.1", "ts-node": "^10.9.2", @@ -859,6 +863,15 @@ "integrity": "sha512-es94M3nTIfsEPisRafak+HDLfHXnKBhV3vU5eqPcS3flIWqcxJWgXHXiey3YrpaNsanY5ei1VoYEbOzijuq9BA==", "license": "(AFL-2.1 OR BSD-3-Clause)" }, + "node_modules/json-schema-to-zod": { + "version": "2.6.1", + "resolved": "https://registry.npmjs.org/json-schema-to-zod/-/json-schema-to-zod-2.6.1.tgz", + "integrity": "sha512-uiHmWH21h9FjKJkRBntfVGTLpYlCZ1n98D0izIlByqQLqpmkQpNTBtfbdP04Na6+43lgsvrShFh2uWLkQDKJuQ==", + "license": "ISC", + "bin": { + "json-schema-to-zod": "dist/cjs/cli.js" + } + }, "node_modules/json-schema-traverse": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", diff --git a/apps/cli/package.json b/apps/cli/package.json index 50ceceb7..dfd4c3bb 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -29,6 +29,7 @@ "@ai-sdk/openai": "^2.0.53", "@modelcontextprotocol/sdk": "^1.20.2", "ai": "^5.0.78", + "json-schema-to-zod": "^2.6.1", "nanoid": "^5.1.6", "zod": "^4.1.12" } diff --git a/apps/cli/src/app.ts b/apps/cli/src/app.ts index 03f46280..4198972a 100644 --- a/apps/cli/src/app.ts +++ b/apps/cli/src/app.ts @@ -1,185 +1,15 @@ -import fs from "fs"; -import path from "path"; -import { WorkDir, McpServers } from "./application/config/config.js"; -import { Workflow } from "./application/entities/workflow.js"; -import { FunctionsRegistry } from "./application/registry/functions.js"; -import { AgentNode } from "./application/nodes/agent.js"; -import { MessageList, AssistantContentPart } from "./application/entities/message.js"; -import { z } from "zod"; -import { getMcpClient } from "./application/lib/mcp.js"; -import { streamText } from "ai"; -import { openai } from "@ai-sdk/openai"; -import { google } from "@ai-sdk/google"; +import { executeWorkflow } from "./application/lib/exec-workflow.js"; import { StreamRenderer } from "./application/lib/stream-renderer.js"; -import { StreamEvent } from "./application/entities/stream-event.js"; -import { AssistantMessage, Message } from "./application/entities/message.js"; -import { randomId } from "./application/lib/random-id.js"; -class RunLogger { - private logFile: string; - private fileHandle: fs.WriteStream; - - ensureRunsDir(workflowId: string) { - const runsDir = path.join(WorkDir, "runs", workflowId); - if (!fs.existsSync(runsDir)) { - fs.mkdirSync(runsDir, { recursive: true }); - } - } - - constructor(workflowId: string, runId: string) { - this.ensureRunsDir(workflowId); - this.logFile = path.join(WorkDir, "runs", `${workflowId}`, `${runId}.jsonl`); - this.fileHandle = fs.createWriteStream(this.logFile, { - flags: "a", - encoding: "utf8", - }); - } - - log(message: z.infer) { - this.fileHandle.write(JSON.stringify(message) + "\n"); - } - - close() { - this.fileHandle.close(); - } -} - -class StreamStepMessageBuilder { - private parts: z.infer[] = []; - private textBuffer: string = ""; - private reasoningBuffer: string = ""; - - flushBuffers() { - if (this.reasoningBuffer) { - this.parts.push({ type: "reasoning", text: this.reasoningBuffer }); - this.reasoningBuffer = ""; - } - if (this.textBuffer) { - this.parts.push({ type: "text", text: this.textBuffer }); - this.textBuffer = ""; - } - } - - ingest(event: z.infer) { - switch (event.type) { - case "reasoning-start": - case "reasoning-end": - case "text-start": - case "text-end": - this.flushBuffers(); - break; - case "reasoning-delta": - this.reasoningBuffer += event.delta; - break; - case "text-delta": - this.textBuffer += event.delta; - break; - case "tool-call": - this.parts.push({ - type: "tool-call", - toolCallId: event.toolCallId, - toolName: event.toolName, - arguments: event.input, - }); - break; - } - } - - get(): z.infer { - this.flushBuffers(); - return { - role: "assistant", - content: this.parts, - }; - } -} - -function loadWorkflow(id: string) { - const workflowPath = path.join(WorkDir, "workflows", `${id}.json`); - const workflow = fs.readFileSync(workflowPath, "utf8"); - return Workflow.parse(JSON.parse(workflow)); -} - -function loadFunction(id: string) { - const func = FunctionsRegistry[id]; - if (!func) { - throw new Error(`Function ${id} not found`); - } - return func; -} - -async function callMcpTool(serverName: string, toolName: string, args: Record) { - const server = McpServers.find(server => server.name === serverName); - if (!server) { - throw new Error(`MCP server ${serverName} not found`); - } - const client = await getMcpClient(server.url, server.name); - const response = await client.callTool({ name: toolName, arguments: args }); - return response; -} - -async function executeWorkflow(id: string) { - const workflow = loadWorkflow(id); - // console.log("got", JSON.stringify(workflow)); - - const runId = await randomId(); - const logger = new RunLogger(id, runId); - - const input: z.infer = [{ - role: "user", - content: "What is the current date?" - }]; - const msgs: z.infer = [...input]; - - try { - const renderer = new StreamRenderer(); - - for await (const step of workflow.steps) { - const node = step.type === "agent" ? new AgentNode(step.id) : loadFunction(step.id); - const messageBuilder = new StreamStepMessageBuilder(); - for await (const event of node.execute(msgs)) { - // console.log(" - event", JSON.stringify(event)); - messageBuilder.ingest(event); - renderer.render(event); - } - const msg = messageBuilder.get(); - logger.log(msg); - msgs.push(msg); - } - } finally { - logger.close(); - } - - console.log('\n\n', JSON.stringify(msgs, null, 2)); -} - -async function streamEventTest() { - const { fullStream } = streamText({ - model: openai("gpt-5"), - system: "You are a helpful assistant that reasons about the world. Provide a reason for invoking any tools", - messages: [{ role: "user", content: "what is the current date and time?" }], - tools: { - getDate: { - description: "Get the current date", - inputSchema: z.object({ - format: z.enum(["long", "short"]).default("long"), - }), - }, - getTime: { - description: "Get the current time", - inputSchema: z.object({ - format: z.enum(["long", "short"]).default("long"), - }), - }, - }, - }); +async function runWorkflow(id: string, userInput: string) { const renderer = new StreamRenderer(); - for await (const event of fullStream) { - renderer.render(event as any); + for await (const event of executeWorkflow(id, userInput)) { + renderer.render(event); } } -// streamEventTest(); +const workflowId = process.argv[2] ?? "example_workflow"; +const userInputMsg = process.argv[3] ?? ""; -executeWorkflow("example_workflow"); \ No newline at end of file +runWorkflow(workflowId, userInputMsg); \ No newline at end of file diff --git a/apps/cli/src/application/assistant/chat.ts b/apps/cli/src/application/assistant/chat.ts index e96a23ee..bd3a2d45 100644 --- a/apps/cli/src/application/assistant/chat.ts +++ b/apps/cli/src/application/assistant/chat.ts @@ -152,7 +152,7 @@ export async function startCopilot(): Promise { } const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); - console.log("Rowboat Copilot (type 'exit' to quit)"); + console.log("XRowboat Copilot (type 'exit' to quit)"); const debugMode = process.argv.includes("--debug") || process.env.COPILOT_DEBUG === "1"; const conversationHistory: ConversationMessage[] = []; diff --git a/apps/cli/src/application/assistant/commands.ts b/apps/cli/src/application/assistant/commands.ts index 0e8d97c8..9ccf86ba 100644 --- a/apps/cli/src/application/assistant/commands.ts +++ b/apps/cli/src/application/assistant/commands.ts @@ -407,13 +407,23 @@ export async function executeCommand(cmd: ChatCommandT): Promise } case "list_mcp_servers": { const config = readMcpConfig(); - const servers = config.mcpServers; + const servers = Object.keys(config.mcpServers); + + const list: string[] = []; + for (const server of servers) { + if ('url' in config.mcpServers[server]) { + list.push(`${server} → ${config.mcpServers[server].url}`); + } else { + list.push(`${server} → ${config.mcpServers[server].command}`); + } + } + return asCommandOutcome({ headline: servers.length === 0 ? "No MCP servers configured." : `Found ${servers.length} MCP server${servers.length === 1 ? "" : "s"}.`, - list: servers.map((server) => `${server.name} → ${server.url}`), + list, data: servers, }); } @@ -427,16 +437,14 @@ export async function executeCommand(cmd: ChatCommandT): Promise }); } const config = readMcpConfig(); - const withoutExisting = config.mcpServers.filter( - (server) => server.name !== serverConfig.name - ); - const updated = { - mcpServers: [...withoutExisting, { ...serverConfig }], + config.mcpServers[serverConfig.name] = { + url: serverConfig.url, + headers: {}, }; - writeMcpConfig(updated); + writeMcpConfig(config); return asCommandOutcome({ headline: `MCP server "${serverConfig.name}" saved.`, - data: updated.mcpServers, + data: config.mcpServers, }); } case "remove_mcp_server": { @@ -449,16 +457,14 @@ export async function executeCommand(cmd: ChatCommandT): Promise }); } const config = readMcpConfig(); - const remaining = config.mcpServers.filter( - (server) => server.name !== name - ); - const removed = remaining.length !== config.mcpServers.length; - writeMcpConfig({ mcpServers: remaining }); + delete config.mcpServers[name]; + writeMcpConfig(config); + const removed = name in config.mcpServers; return asCommandOutcome({ headline: removed ? `MCP server "${name}" removed.` : `MCP server "${name}" was not registered.`, - data: remaining, + data: config.mcpServers, }); } case "run_workflow": { diff --git a/apps/cli/src/application/assistant/mcp/service.ts b/apps/cli/src/application/assistant/mcp/service.ts index f5f27597..44014ec7 100644 --- a/apps/cli/src/application/assistant/mcp/service.ts +++ b/apps/cli/src/application/assistant/mcp/service.ts @@ -10,7 +10,7 @@ export function mcpConfigPath(): string { export function readMcpConfig(): z.infer { const p = mcpConfigPath(); - if (!fs.existsSync(p)) return { mcpServers: [] }; + if (!fs.existsSync(p)) return { mcpServers: {} }; const raw = fs.readFileSync(p, "utf8"); return McpServerConfig.parse(JSON.parse(raw)); } diff --git a/apps/cli/src/application/config/config.ts b/apps/cli/src/application/config/config.ts index 9ab37014..8ee404b9 100644 --- a/apps/cli/src/application/config/config.ts +++ b/apps/cli/src/application/config/config.ts @@ -7,19 +7,45 @@ import { homedir } from "os"; // Resolve app root relative to compiled file location (dist/...) export const WorkDir = path.join(homedir(), ".rowboat"); +const baseMcpConfig = { + mcpServers: { + firecrawl: { + command: "npx", + args: ["-y", "supergateway", "--stdio", "npx -y firecrawl-mcp"], + env: { + FIRECRAWL_API_KEY: "fc-aaacee4bdd164100a4d83af85bef6fdc", + }, + }, + test: { + url: "http://localhost:3000", + headers: { + "Authorization": "Bearer test", + }, + }, + } +} + +function ensureMcpConfig() { + const configPath = path.join(WorkDir, "config", "mcp.json"); + if (!fs.existsSync(configPath)) { + fs.writeFileSync(configPath, JSON.stringify(baseMcpConfig, null, 2)); + } +} + function ensureDirs() { const ensure = (p: string) => { if (!fs.existsSync(p)) fs.mkdirSync(p, { recursive: true }); }; ensure(WorkDir); ensure(path.join(WorkDir, "workflows")); ensure(path.join(WorkDir, "agents")); - ensure(path.join(WorkDir, "mcp")); + ensure(path.join(WorkDir, "config")); + ensureMcpConfig(); } ensureDirs(); function loadMcpServerConfig(): z.infer { - const configPath = path.join(WorkDir, "mcp", "servers.json"); - if (!fs.existsSync(configPath)) return { mcpServers: [] }; + const configPath = path.join(WorkDir, "config", "mcp.json"); + if (!fs.existsSync(configPath)) return { mcpServers: {} }; const config = fs.readFileSync(configPath, "utf8"); return McpServerConfig.parse(JSON.parse(config)); } diff --git a/apps/cli/src/application/entities/agent.ts b/apps/cli/src/application/entities/agent.ts index a8d493e9..adea0505 100644 --- a/apps/cli/src/application/entities/agent.ts +++ b/apps/cli/src/application/entities/agent.ts @@ -1,7 +1,34 @@ import { z } from "zod"; + +export const BaseAgentTool = z.object({ + name: z.string(), +}); + +export const BuiltinAgentTool = BaseAgentTool.extend({ + type: z.literal("builtin"), +}); + +export const McpAgentTool = BaseAgentTool.extend({ + type: z.literal("mcp"), + description: z.string(), + inputSchema: z.any(), + mcpServerName: z.string(), +}); + +export const WorkflowAgentTool = BaseAgentTool.extend({ + type: z.literal("workflow"), +}); + +export const AgentTool = z.discriminatedUnion("type", [ + BuiltinAgentTool, + McpAgentTool, + WorkflowAgentTool, +]); + export const Agent = z.object({ name: z.string(), model: z.string(), description: z.string(), instructions: z.string(), + tools: z.record(z.string(), AgentTool).optional(), }); diff --git a/apps/cli/src/application/entities/llm-step-event.ts b/apps/cli/src/application/entities/llm-step-event.ts new file mode 100644 index 00000000..65e7d8a5 --- /dev/null +++ b/apps/cli/src/application/entities/llm-step-event.ts @@ -0,0 +1,56 @@ +import { z } from "zod"; + +export const LlmStepStreamReasoningStartEvent = z.object({ + type: z.literal("reasoning-start"), +}); + +export const LlmStepStreamReasoningDeltaEvent = z.object({ + type: z.literal("reasoning-delta"), + delta: z.string(), +}); + +export const LlmStepStreamReasoningEndEvent = z.object({ + type: z.literal("reasoning-end"), +}); + +export const LlmStepStreamTextStartEvent = z.object({ + type: z.literal("text-start"), +}); + +export const LlmStepStreamTextDeltaEvent = z.object({ + type: z.literal("text-delta"), + delta: z.string(), +}); + +export const LlmStepStreamTextEndEvent = z.object({ + type: z.literal("text-end"), +}); + +export const LlmStepStreamToolCallEvent = z.object({ + type: z.literal("tool-call"), + toolCallId: z.string(), + toolName: z.string(), + input: z.any(), +}); + +export const LlmStepStreamUsageEvent = z.object({ + type: z.literal("usage"), + usage: z.object({ + inputTokens: z.number().optional(), + outputTokens: z.number().optional(), + totalTokens: z.number().optional(), + reasoningTokens: z.number().optional(), + cachedInputTokens: z.number().optional(), + }), +}); + +export const LlmStepStreamEvent = z.union([ + LlmStepStreamReasoningStartEvent, + LlmStepStreamReasoningDeltaEvent, + LlmStepStreamReasoningEndEvent, + LlmStepStreamTextStartEvent, + LlmStepStreamTextDeltaEvent, + LlmStepStreamTextEndEvent, + LlmStepStreamToolCallEvent, + LlmStepStreamUsageEvent, +]); \ No newline at end of file diff --git a/apps/cli/src/application/entities/mcp.ts b/apps/cli/src/application/entities/mcp.ts index 77aa63d3..4f6490cd 100644 --- a/apps/cli/src/application/entities/mcp.ts +++ b/apps/cli/src/application/entities/mcp.ts @@ -1,8 +1,16 @@ import z from "zod"; +const StdioMcpServerConfig = z.object({ + command: z.string(), + args: z.array(z.string()).optional(), + env: z.record(z.string(), z.string()).optional(), +}); + +const HttpMcpServerConfig = z.object({ + url: z.string(), + headers: z.record(z.string(), z.string()).optional(), +}); + export const McpServerConfig = z.object({ - mcpServers: z.array(z.object({ - name: z.string(), - url: z.string(), - })), + mcpServers: z.record(z.string(), z.union([StdioMcpServerConfig, HttpMcpServerConfig])), }); \ No newline at end of file diff --git a/apps/cli/src/application/entities/message.ts b/apps/cli/src/application/entities/message.ts index ef937e51..ce5d4b67 100644 --- a/apps/cli/src/application/entities/message.ts +++ b/apps/cli/src/application/entities/message.ts @@ -14,7 +14,7 @@ export const ToolCallPart = z.object({ type: z.literal("tool-call"), toolCallId: z.string(), toolName: z.string(), - arguments: z.string(), + arguments: z.any(), }); export const AssistantContentPart = z.union([ diff --git a/apps/cli/src/application/entities/stream-event.ts b/apps/cli/src/application/entities/stream-event.ts deleted file mode 100644 index 8898599f..00000000 --- a/apps/cli/src/application/entities/stream-event.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { z } from "zod"; - -export const ReasoningStartEvent = z.object({ - type: z.literal("reasoning-start"), -}); - -export const ReasoningDeltaEvent = z.object({ - type: z.literal("reasoning-delta"), - delta: z.string(), -}); - -export const ReasoningEndEvent = z.object({ - type: z.literal("reasoning-end"), -}); - -export const TextStartEvent = z.object({ - type: z.literal("text-start"), -}); - -export const TextDeltaEvent = z.object({ - type: z.literal("text-delta"), - delta: z.string(), -}); - -export const TextEndEvent = z.object({ - type: z.literal("text-end"), -}); - -export const ToolCallEvent = z.object({ - type: z.literal("tool-call"), - toolCallId: z.string(), - toolName: z.string(), - input: z.any(), -}); - -export const UsageEvent = z.object({ - type: z.literal("usage"), - usage: z.object({ - inputTokens: z.number().optional(), - outputTokens: z.number().optional(), - totalTokens: z.number().optional(), - reasoningTokens: z.number().optional(), - cachedInputTokens: z.number().optional(), - }), -}); - -export const StreamEvent = z.union([ - ReasoningStartEvent, - ReasoningDeltaEvent, - ReasoningEndEvent, - TextStartEvent, - TextDeltaEvent, - TextEndEvent, - ToolCallEvent, - UsageEvent, -]); \ No newline at end of file diff --git a/apps/cli/src/application/entities/workflow-event.ts b/apps/cli/src/application/entities/workflow-event.ts new file mode 100644 index 00000000..4811db11 --- /dev/null +++ b/apps/cli/src/application/entities/workflow-event.ts @@ -0,0 +1,69 @@ +import { z } from "zod"; +import { LlmStepStreamEvent } from "./llm-step-event.js"; +import { Workflow } from "./workflow.js"; +import { Message } from "./message.js"; + +export const WorkflowStreamStartEvent = z.object({ + type: z.literal("workflow-start"), + workflowId: z.string(), + workflow: Workflow, + background: z.boolean(), +}); + +export const WorkflowStreamStepStartEvent = z.object({ + type: z.literal("workflow-step-start"), + stepId: z.string(), + stepType: z.enum(["agent", "function"]), +}); + +export const WorkflowStreamStepStreamEventEvent = z.object({ + type: z.literal("workflow-step-stream-event"), + stepId: z.string(), + event: LlmStepStreamEvent, +}); + +export const WorkflowStreamStepMessageEvent = z.object({ + type: z.literal("workflow-step-message"), + stepId: z.string(), + message: Message, +}); + +export const WorkflowStreamStepToolInvocationEvent = z.object({ + type: z.literal("workflow-step-tool-invocation"), + stepId: z.string(), + toolName: z.string(), + input: z.string(), +}); + +export const WorkflowStreamStepToolResultEvent = z.object({ + type: z.literal("workflow-step-tool-result"), + stepId: z.string(), + toolName: z.string(), + result: z.any(), +}); + +export const WorkflowStreamStepEndEvent = z.object({ + type: z.literal("workflow-step-end"), + stepId: z.string(), +}); + +export const WorkflowStreamEndEvent = z.object({ + type: z.literal("workflow-end"), +}); + +export const WorkflowStreamErrorEvent = z.object({ + type: z.literal("workflow-error"), + error: z.string(), +}); + +export const WorkflowStreamEvent = z.union([ + WorkflowStreamStartEvent, + WorkflowStreamStepStartEvent, + WorkflowStreamStepStreamEventEvent, + WorkflowStreamStepMessageEvent, + WorkflowStreamStepToolInvocationEvent, + WorkflowStreamStepToolResultEvent, + WorkflowStreamStepEndEvent, + WorkflowStreamEndEvent, + WorkflowStreamErrorEvent, +]); \ No newline at end of file diff --git a/apps/cli/src/application/entities/workflow.ts b/apps/cli/src/application/entities/workflow.ts index 9304371e..804dad3c 100644 --- a/apps/cli/src/application/entities/workflow.ts +++ b/apps/cli/src/application/entities/workflow.ts @@ -10,7 +10,7 @@ const FunctionStep = z.object({ id: z.string(), }); -const Step = z.discriminatedUnion("type", [AgentStep, FunctionStep]); +export const Step = z.discriminatedUnion("type", [AgentStep, FunctionStep]); export const Workflow = z.object({ name: z.string(), diff --git a/apps/cli/src/application/functions/get_date.ts b/apps/cli/src/application/functions/get_date.ts index 9f421086..e8561d80 100644 --- a/apps/cli/src/application/functions/get_date.ts +++ b/apps/cli/src/application/functions/get_date.ts @@ -1,7 +1,9 @@ -import { Node, NodeOutputT } from "../nodes/node.js"; +import { z } from "zod"; +import { Step, StepOutputT } from "../lib/step.js"; +import { AgentTool } from "../entities/agent.js"; -export class GetDate implements Node { - async* execute(): NodeOutputT { +export class GetDate implements Step { + async* execute(): StepOutputT { yield { type: "text-start", }; @@ -13,4 +15,8 @@ export class GetDate implements Node { type: "text-end", }; } + + tools(): Record> { + return {}; + } } \ No newline at end of file diff --git a/apps/cli/src/application/nodes/agent.ts b/apps/cli/src/application/lib/agent.ts similarity index 52% rename from apps/cli/src/application/nodes/agent.ts rename to apps/cli/src/application/lib/agent.ts index 4e0be41a..ef8321e5 100644 --- a/apps/cli/src/application/nodes/agent.ts +++ b/apps/cli/src/application/lib/agent.ts @@ -1,12 +1,58 @@ -import { Message } from "../entities/message.js"; +import { Message, MessageList } from "../entities/message.js"; import { z } from "zod"; -import { Node, NodeInputT, NodeOutputT } from "./node.js"; +import { Step, StepInputT, StepOutputT } from "./step.js"; import { openai } from "@ai-sdk/openai"; -import { generateText, ModelMessage, stepCountIs, streamText } from "ai"; -import { Agent } from "../entities/agent.js"; +import { google } from "@ai-sdk/google"; +import { generateText, ModelMessage, stepCountIs, streamText, tool, Tool, ToolSet, jsonSchema } from "ai"; +import { Agent, AgentTool } from "../entities/agent.js"; import { WorkDir } from "../config/config.js"; import fs from "fs"; import path from "path"; +import { loadWorkflow } from "./utils.js"; + +const BashTool = tool({ + description: "Run a command in the shell", + inputSchema: z.object({ + command: z.string(), + }), +}); + +const AskHumanTool = tool({ + description: "Ask the human for input", + inputSchema: z.object({ + question: z.string(), + }), +}); + +function mapAgentTool(t: z.infer): Tool { + switch (t.type) { + case "mcp": + return tool({ + name: t.name, + description: t.description, + inputSchema: jsonSchema(t.inputSchema), + }); + case "workflow": + const workflow = loadWorkflow(t.name); + if (!workflow) { + throw new Error(`Workflow ${t.name} not found`); + } + return tool({ + name: t.name, + description: workflow.description, + inputSchema: z.object({ + message: z.string().describe("The message to send to the workflow"), + }), + }); + case "builtin": + switch (t.name) { + case "bash": + return BashTool; + default: + throw new Error(`Unknown builtin tool: ${t.name}`); + } + } +} function convertFromMessages(messages: z.infer[]): ModelMessage[] { const result: ModelMessage[] = []; @@ -51,34 +97,72 @@ function convertFromMessages(messages: z.infer[]): ModelMessage[ content: msg.content, }); break; + case "tool": + result.push({ + role: "tool", + content: [ + { + type: "tool-result", + toolCallId: msg.toolCallId, + toolName: msg.toolName, + output: { + type: "text", + value: msg.content, + }, + }, + ], + }); + break; } } return result; } -export class AgentNode implements Node { +export class AgentNode implements Step { private id: string; + private background: boolean; + private agent: z.infer; - constructor(id: string) { + constructor(id: string, background: boolean) { this.id = id; - } - - private loadAgent(id: string): z.infer { + this.background = background; const agentPath = path.join(WorkDir, "agents", `${id}.json`); const agent = fs.readFileSync(agentPath, "utf8"); - return Agent.parse(JSON.parse(agent)); + this.agent = Agent.parse(JSON.parse(agent)); + } + + tools(): Record> { + return this.agent.tools ?? {}; } - async* execute(input: NodeInputT): NodeOutputT { - const agent = this.loadAgent(this.id); - const { fullStream } = await streamText({ - model: openai(agent.model), + async* execute(input: StepInputT): StepOutputT { + // console.log("\n\n\t>>>>\t\tinput", JSON.stringify(input)); + const tools: ToolSet = {}; + if (!this.background) { + tools["ask-human"] = AskHumanTool; + } + for (const [name, tool] of Object.entries(this.agent.tools ?? {})) { + try { + tools[name] = mapAgentTool(tool); + } catch (error) { + console.error(`Error mapping tool ${name}:`, error); + continue; + } + } + + // console.log("\n\n\t>>>>\t\ttools", JSON.stringify(tools, null, 2)); + + const { fullStream } = streamText({ + model: openai("gpt-4.1"), + // model: google("gemini-2.5-pro"), messages: convertFromMessages(input), - system: agent.instructions, + system: this.agent.instructions, stopWhen: stepCountIs(1), + tools, }); for await (const event of fullStream) { + // console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event)); switch (event.type) { case "reasoning-start": yield { diff --git a/apps/cli/src/application/lib/exec-tool.ts b/apps/cli/src/application/lib/exec-tool.ts new file mode 100644 index 00000000..e703fd41 --- /dev/null +++ b/apps/cli/src/application/lib/exec-tool.ts @@ -0,0 +1,102 @@ +import { tool, Tool } from "ai"; +import { AgentTool } from "../entities/agent.js"; +import { z } from "zod"; +import { McpServers } from "../config/config.js"; +import { getMcpClient } from "./mcp.js"; +import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; +import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import { Client } from "@modelcontextprotocol/sdk/client"; +import { executeCommand } from "./command-executor.js"; +import { loadWorkflow } from "./utils.js"; +import { AssistantMessage } from "../entities/message.js"; +import { executeWorkflow } from "./exec-workflow.js"; + +async function execMcpTool(agentTool: z.infer & { type: "mcp" }, input: any): Promise { + // load mcp configuration from the tool + const mcpConfig = McpServers[agentTool.mcpServerName]; + if (!mcpConfig) { + throw new Error(`MCP server ${agentTool.mcpServerName} not found`); + } + + // create transport + let transport: Transport; + if ("command" in mcpConfig) { + transport = new StdioClientTransport({ + command: mcpConfig.command, + args: mcpConfig.args, + env: mcpConfig.env, + }); + } else { + // first try streamable http transport + try { + transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url)); + } catch (error) { + // if that fails, try sse transport + transport = new SSEClientTransport(new URL(mcpConfig.url)); + } + } + + if (!transport) { + throw new Error(`No transport found for ${agentTool.mcpServerName}`); + } + + // create client + const client = new Client({ + name: 'rowboatx', + version: '1.0.0', + }); + await client.connect(transport); + + // call tool + const result = await client.callTool({ name: agentTool.name, arguments: input }); + client.close(); + transport.close(); + return result; +} + +async function execBashTool(agentTool: z.infer, input: any): Promise { + const result = await executeCommand(input.command as string); + return { + stdout: result.stdout, + stderr: result.stderr, + exitCode: result.exitCode, + }; +} + +async function execWorkflowTool(agentTool: z.infer & { type: "workflow" }, input: any): Promise { + let lastMsg: z.infer | null = null; + for await (const event of executeWorkflow(agentTool.name, input.message)) { + if (event.type === "workflow-step-message" && event.message.role === "assistant") { + lastMsg = event.message; + } + if (event.type === "workflow-error") { + throw new Error(event.error); + } + } + + if (!lastMsg) { + throw new Error("No message received from workflow"); + } + if (typeof lastMsg.content === "string") { + return lastMsg.content; + } + return lastMsg.content.reduce((acc, part) => { + if (part.type === "text") { + acc += part.text; + } + return acc; + }, ""); +} + +export async function execTool(agentTool: z.infer, input: any): Promise { + switch (agentTool.type) { + case "mcp": + return execMcpTool(agentTool, input); + case "workflow": + return execWorkflowTool(agentTool, input); + case "builtin": + return execBashTool(agentTool, input); + } +} \ No newline at end of file diff --git a/apps/cli/src/application/lib/exec-workflow.ts b/apps/cli/src/application/lib/exec-workflow.ts new file mode 100644 index 00000000..39c620d1 --- /dev/null +++ b/apps/cli/src/application/lib/exec-workflow.ts @@ -0,0 +1,215 @@ +import { loadWorkflow } from "./utils.js"; +import { randomId } from "./random-id.js"; +import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage } from "../entities/message.js"; +import { LlmStepStreamEvent } from "../entities/llm-step-event.js"; +import { AgentNode } from "./agent.js"; +import { z } from "zod"; +import path from "path"; +import { WorkDir } from "../config/config.js"; +import fs from "fs"; +import { FunctionsRegistry } from "../registry/functions.js"; +import { WorkflowStreamEvent } from "../entities/workflow-event.js"; +import { execTool } from "./exec-tool.js"; + +class RunLogger { + private logFile: string; + private fileHandle: fs.WriteStream; + + ensureRunsDir(workflowId: string) { + const runsDir = path.join(WorkDir, "runs", workflowId); + if (!fs.existsSync(runsDir)) { + fs.mkdirSync(runsDir, { recursive: true }); + } + } + + constructor(workflowId: string, runId: string) { + this.ensureRunsDir(workflowId); + this.logFile = path.join(WorkDir, "runs", `${workflowId}`, `${runId}.jsonl`); + this.fileHandle = fs.createWriteStream(this.logFile, { + flags: "a", + encoding: "utf8", + }); + } + + log(message: z.infer) { + this.fileHandle.write(JSON.stringify(message) + "\n"); + } + + close() { + this.fileHandle.close(); + } +} + +class StreamStepMessageBuilder { + private parts: z.infer[] = []; + private textBuffer: string = ""; + private reasoningBuffer: string = ""; + + flushBuffers() { + if (this.reasoningBuffer) { + this.parts.push({ type: "reasoning", text: this.reasoningBuffer }); + this.reasoningBuffer = ""; + } + if (this.textBuffer) { + this.parts.push({ type: "text", text: this.textBuffer }); + this.textBuffer = ""; + } + } + + ingest(event: z.infer) { + switch (event.type) { + case "reasoning-start": + case "reasoning-end": + case "text-start": + case "text-end": + this.flushBuffers(); + break; + case "reasoning-delta": + this.reasoningBuffer += event.delta; + break; + case "text-delta": + this.textBuffer += event.delta; + break; + case "tool-call": + this.parts.push({ + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + arguments: event.input, + }); + break; + } + } + + get(): z.infer { + this.flushBuffers(); + return { + role: "assistant", + content: this.parts, + }; + } +} + +function loadFunction(id: string) { + const func = FunctionsRegistry[id]; + if (!func) { + throw new Error(`Function ${id} not found`); + } + return func; +} + +export async function* executeWorkflow(id: string, input: string, background: boolean = false): AsyncGenerator, void, unknown> { + try { + const workflow = loadWorkflow(id); + const runId = await randomId(); + + yield { + type: "workflow-start", + workflowId: id, + workflow: workflow, + background: background, + }; + + const logger = new RunLogger(id, runId); + + const messages: z.infer = [{ + role: "user", + content: input ?? "" + }]; + + try { + let stepIndex = 0; + + while (true) { + const step = workflow.steps[stepIndex]; + const node = step.type === "agent" ? new AgentNode(step.id, background) : loadFunction(step.id); + const messageBuilder = new StreamStepMessageBuilder(); + + // stream response from agent + for await (const event of node.execute(messages)) { + // console.log(" - event", JSON.stringify(event)); + messageBuilder.ingest(event); + yield { + type: "workflow-step-stream-event", + stepId: step.id, + event: event, + }; + } + + // build and emit final message from agent response + const msg = messageBuilder.get(); + logger.log(msg); + messages.push(msg); + yield { + type: "workflow-step-message", + stepId: step.id, + message: msg, + }; + + // if the agent response contains tool calls, execute them + const tools = node.tools(); + let hasToolCalls = false; + if (msg.content instanceof Array) { + for (const part of msg.content) { + if (part.type === "tool-call") { + hasToolCalls = true; + if (!(part.toolName in tools)) { + throw new Error(`Tool ${part.toolName} not found`); + } + yield { + type: "workflow-step-tool-invocation", + stepId: step.id, + toolName: part.toolName, + input: part.arguments, + } + const result = await execTool(tools[part.toolName], part.arguments); + const resultMsg: z.infer = { + role: "tool", + content: JSON.stringify(result), + toolCallId: part.toolCallId, + toolName: part.toolName, + }; + logger.log(resultMsg); + messages.push(resultMsg); + yield { + type: "workflow-step-tool-result", + stepId: step.id, + toolName: part.toolName, + result: result, + }; + yield { + type: "workflow-step-message", + stepId: step.id, + message: resultMsg, + }; + } + } + } + + // if the agent response had tool calls, replay this agent + if (hasToolCalls) { + continue; + } + + // otherwise, move to the next step + stepIndex++; + if (stepIndex >= workflow.steps.length) { + break; + } + } + } finally { + logger.close(); + } + + // console.log('\n\n', JSON.stringify(messages, null, 2)); + } catch (error) { + yield { + type: "workflow-error", + error: error instanceof Error ? error.message : String(error), + }; + } finally { + yield { + type: "workflow-end", + }; + } +} \ No newline at end of file diff --git a/apps/cli/src/application/lib/step.ts b/apps/cli/src/application/lib/step.ts new file mode 100644 index 00000000..ec3c2146 --- /dev/null +++ b/apps/cli/src/application/lib/step.ts @@ -0,0 +1,13 @@ +import { MessageList } from "../entities/message.js"; +import { LlmStepStreamEvent } from "../entities/llm-step-event.js"; +import { z } from "zod"; +import { AgentTool } from "../entities/agent.js"; + +export type StepInputT = z.infer; +export type StepOutputT = AsyncGenerator, void, unknown>; + +export interface Step { + execute(input: StepInputT): StepOutputT; + + tools(): Record>; +} \ No newline at end of file diff --git a/apps/cli/src/application/lib/stream-renderer.ts b/apps/cli/src/application/lib/stream-renderer.ts index a8d69d7b..0bf06986 100644 --- a/apps/cli/src/application/lib/stream-renderer.ts +++ b/apps/cli/src/application/lib/stream-renderer.ts @@ -1,5 +1,6 @@ import { z } from "zod"; -import { StreamEvent } from "../entities/stream-event.js"; +import { WorkflowStreamEvent } from "../entities/workflow-event.js"; +import { LlmStepStreamEvent } from "../entities/llm-step-event.js"; export interface StreamRendererOptions { showHeaders?: boolean; @@ -23,7 +24,48 @@ export class StreamRenderer { }; } - render(event: z.infer) { + render(event: z.infer) { + switch (event.type) { + case "workflow-start": { + this.onWorkflowStart(event.workflowId, event.background); + break; + } + case "workflow-step-start": { + this.onStepStart(event.stepId, event.stepType); + break; + } + case "workflow-step-stream-event": { + this.renderLlmEvent(event.event); + break; + } + case "workflow-step-message": { + // this.onStepMessage(event.stepId, event.message); + break; + } + case "workflow-step-tool-invocation": { + this.onStepToolInvocation(event.stepId, event.toolName, event.input); + break; + } + case "workflow-step-tool-result": { + this.onStepToolResult(event.stepId, event.toolName, event.result); + break; + } + case "workflow-step-end": { + this.onStepEnd(event.stepId); + break; + } + case "workflow-end": { + this.onWorkflowEnd(); + break; + } + case "workflow-error": { + this.onWorkflowError(event.error); + break; + } + } + } + + private renderLlmEvent(event: z.infer) { switch (event.type) { case "reasoning-start": this.onReasoningStart(); @@ -52,6 +94,58 @@ export class StreamRenderer { } } + private onWorkflowStart(workflowId: string, background: boolean) { + this.write("\n"); + this.write(this.bold(`▶ Workflow ${workflowId}`)); + if (background) this.write(this.dim(" (background)")); + this.write("\n"); + } + + private onWorkflowEnd() { + this.write(this.bold("\n■ Workflow complete\n")); + } + + private onWorkflowError(error: string) { + this.write(this.red(`\n✖ Workflow error: ${error}\n`)); + } + + private onStepStart(stepId: string, stepType: "agent" | "function") { + this.write("\n"); + this.write(this.cyan(`─ Step ${stepId} [${stepType}]`)); + this.write("\n"); + } + + private onStepEnd(stepId: string) { + this.write(this.dim(`✓ Step ${stepId} finished\n`)); + } + + private onStepMessage(stepId: string, message: any) { + const role = message?.role ?? "message"; + const content = message?.content; + this.write(this.bold(`${role}: `)); + if (typeof content === "string") { + this.write(content + "\n"); + } else { + const pretty = this.truncate(JSON.stringify(message, null, this.options.jsonIndent)); + this.write(this.dim("\n" + this.indent(pretty) + "\n")); + } + } + + private onStepToolInvocation(stepId: string, toolName: string, input: string) { + this.write(this.cyan(`\n→ Tool invoke ${toolName}`)); + if (input && input.length) { + this.write("\n" + this.dim(this.indent(this.truncate(input))) + "\n"); + } else { + this.write("\n"); + } + } + + private onStepToolResult(stepId: string, toolName: string, result: unknown) { + const res = this.truncate(JSON.stringify(result, null, this.options.jsonIndent)); + this.write(this.cyan(`\n← Tool result ${toolName}\n`)); + this.write(this.dim(this.indent(res)) + "\n"); + } + private onReasoningStart() { if (this.reasoningActive) return; this.reasoningActive = true; @@ -146,6 +240,10 @@ export class StreamRenderer { private cyan(text: string): string { return "\x1b[36m" + text + "\x1b[0m"; } + + private red(text: string): string { + return "\x1b[31m" + text + "\x1b[0m"; + } } diff --git a/apps/cli/src/application/lib/utils.ts b/apps/cli/src/application/lib/utils.ts new file mode 100644 index 00000000..da211b36 --- /dev/null +++ b/apps/cli/src/application/lib/utils.ts @@ -0,0 +1,10 @@ +import fs from "fs"; +import path from "path"; +import { WorkDir } from "../config/config.js"; +import { Workflow } from "../entities/workflow.js"; + +export function loadWorkflow(id: string) { + const workflowPath = path.join(WorkDir, "workflows", `${id}.json`); + const workflow = fs.readFileSync(workflowPath, "utf8"); + return Workflow.parse(JSON.parse(workflow)); +} diff --git a/apps/cli/src/application/nodes/node.ts b/apps/cli/src/application/nodes/node.ts deleted file mode 100644 index 5d5d3186..00000000 --- a/apps/cli/src/application/nodes/node.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { MessageList } from "../entities/message.js"; -import { StreamEvent } from "../entities/stream-event.js"; -import { z } from "zod"; - -export type NodeInputT = z.infer; -export type NodeOutputT = AsyncGenerator, void, unknown>; - -export interface Node { - execute(input: NodeInputT): NodeOutputT; -} \ No newline at end of file diff --git a/apps/cli/src/application/registry/functions.ts b/apps/cli/src/application/registry/functions.ts index 3b131c30..1d4c1a9b 100644 --- a/apps/cli/src/application/registry/functions.ts +++ b/apps/cli/src/application/registry/functions.ts @@ -1,6 +1,6 @@ import { GetDate } from "../functions/get_date.js"; -import { Node } from "../nodes/node.js"; +import { Step } from "../lib/step.js"; -export const FunctionsRegistry: Record = { +export const FunctionsRegistry: Record = { get_date: new GetDate(), } as const; \ No newline at end of file