everything is an agent

This commit is contained in:
Ramnique Singh 2025-11-15 01:51:22 +05:30
parent 2d6a647c70
commit 80dae17fd1
24 changed files with 1261 additions and 1573 deletions

View file

@ -1,29 +1,22 @@
import { Message, MessageList } from "../entities/message.js";
import { z } from "zod";
import { Step, StepInputT, StepOutputT } from "./step.js";
import { ModelMessage, stepCountIs, streamText, tool, Tool, ToolSet, jsonSchema } from "ai";
import { Agent, AgentTool } from "../entities/agent.js";
import { ModelConfig, WorkDir } from "../config/config.js";
import { jsonSchema, ModelMessage } from "ai";
import fs from "fs";
import path from "path";
import { loadWorkflow } from "./utils.js";
import { ModelConfig, WorkDir } from "../config/config.js";
import { Agent, ToolAttachment } from "../entities/agent.js";
import { createInterface, Interface } from "node:readline/promises";
import { stdin as input, stdout as output } from "node:process";
import { AssistantContentPart, AssistantMessage, Message, MessageList, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js";
import { runIdGenerator } from "./run-id-gen.js";
import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai";
import { z } from "zod";
import { getProvider } from "./models.js";
import { LlmStepStreamEvent } from "../entities/llm-step-events.js";
import { execTool } from "./exec-tool.js";
import { RunEvent } from "../entities/run-events.js";
import { CopilotAgent } from "../assistant/agent.js";
import { BuiltinTools } from "./builtin-tools.js";
const BashTool = tool({
description: "Run a command in the shell",
inputSchema: z.object({
command: z.string(),
}),
});
const AskHumanTool = tool({
description: "Ask the human for input",
inputSchema: z.object({
question: z.string(),
}),
});
function mapAgentTool(t: z.infer<typeof AgentTool>): Tool {
export async function mapAgentTool(t: z.infer<typeof ToolAttachment>): Promise<Tool> {
switch (t.type) {
case "mcp":
return tool({
@ -31,31 +24,136 @@ function mapAgentTool(t: z.infer<typeof AgentTool>): Tool {
description: t.description,
inputSchema: jsonSchema(t.inputSchema),
});
case "workflow":
const workflow = loadWorkflow(t.name);
if (!workflow) {
throw new Error(`Workflow ${t.name} not found`);
case "agent":
const agent = await loadAgent(t.name);
if (!agent) {
throw new Error(`Agent ${t.name} not found`);
}
return tool({
name: t.name,
description: workflow.description,
description: agent.description,
inputSchema: z.object({
message: z.string().describe("The message to send to the workflow"),
}),
});
case "builtin":
switch (t.name) {
case "bash":
return BashTool;
case "ask-human":
return AskHumanTool;
default:
throw new Error(`Unknown builtin tool: ${t.name}`);
const match = BuiltinTools[t.name];
if (!match) {
throw new Error(`Unknown builtin tool: ${t.name}`);
}
return tool({
description: match.description,
inputSchema: match.inputSchema,
});
}
}
function convertFromMessages(messages: z.infer<typeof Message>[]): ModelMessage[] {
export class RunLogger {
private logFile: string;
private fileHandle: fs.WriteStream;
ensureRunsDir() {
const runsDir = path.join(WorkDir, "runs");
if (!fs.existsSync(runsDir)) {
fs.mkdirSync(runsDir, { recursive: true });
}
}
constructor(runId: string) {
this.ensureRunsDir();
this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
this.fileHandle = fs.createWriteStream(this.logFile, {
flags: "a",
encoding: "utf8",
});
}
log(event: z.infer<typeof RunEvent>) {
if (event.type !== "stream-event") {
this.fileHandle.write(JSON.stringify(event) + "\n");
}
}
close() {
this.fileHandle.close();
}
}
export class LogAndYield {
private logger: RunLogger
constructor(logger: RunLogger) {
this.logger = logger;
}
async *logAndYield(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const ev = {
...event,
ts: new Date().toISOString(),
}
this.logger.log(ev);
yield ev;
}
}
export class StreamStepMessageBuilder {
private parts: z.infer<typeof AssistantContentPart>[] = [];
private textBuffer: string = "";
private reasoningBuffer: string = "";
flushBuffers() {
// skip reasoning
// if (this.reasoningBuffer) {
// this.parts.push({ type: "reasoning", text: this.reasoningBuffer });
// this.reasoningBuffer = "";
// }
if (this.textBuffer) {
this.parts.push({ type: "text", text: this.textBuffer });
this.textBuffer = "";
}
}
ingest(event: z.infer<typeof LlmStepStreamEvent>) {
switch (event.type) {
case "reasoning-start":
case "reasoning-end":
case "text-start":
case "text-end":
this.flushBuffers();
break;
case "reasoning-delta":
this.reasoningBuffer += event.delta;
break;
case "text-delta":
this.textBuffer += event.delta;
break;
case "tool-call":
this.parts.push({
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
arguments: event.input,
});
break;
}
}
get(): z.infer<typeof AssistantMessage> {
this.flushBuffers();
return {
role: "assistant",
content: this.parts,
};
}
}
export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> {
const agentPath = path.join(WorkDir, "agents", `${id}.json`);
const agent = fs.readFileSync(agentPath, "utf8");
return Agent.parse(JSON.parse(agent));
}
export function convertFromMessages(messages: z.infer<typeof Message>[]): ModelMessage[] {
const result: ModelMessage[] = [];
for (const msg of messages) {
switch (msg.role) {
@ -119,100 +217,275 @@ function convertFromMessages(messages: z.infer<typeof Message>[]): ModelMessage[
return result;
}
export class AgentNode implements Step {
private id: string;
private asTool: boolean;
private agent: z.infer<typeof Agent>;
constructor(id: string, asTool: boolean) {
this.id = id;
this.asTool = asTool;
const agentPath = path.join(WorkDir, "agents", `${id}.json`);
const agent = fs.readFileSync(agentPath, "utf8");
this.agent = Agent.parse(JSON.parse(agent));
}
export async function* streamAgent(opts: {
agent: string;
runId?: string;
input?: string;
interactive?: boolean;
}) {
const messages: z.infer<typeof MessageList> = [];
tools(): Record<string, z.infer<typeof AgentTool>> {
return this.agent.tools ?? {};
}
async* execute(input: StepInputT): StepOutputT {
// console.log("\n\n\t>>>>\t\tinput", JSON.stringify(input));
const tools: ToolSet = {};
// if (!this.background) {
// tools["ask-human"] = AskHumanTool;
// }
for (const [name, tool] of Object.entries(this.agent.tools ?? {})) {
if (this.asTool && name === "ask-human") {
continue;
}
try {
tools[name] = mapAgentTool(tool);
} catch (error) {
console.error(`Error mapping tool ${name}:`, error);
continue;
}
}
// console.log("\n\n\t>>>>\t\ttools", JSON.stringify(tools, null, 2));
const provider = getProvider(this.agent.provider);
const { fullStream } = streamText({
model: provider(this.agent.model || ModelConfig.defaults.model),
messages: convertFromMessages(input),
system: this.agent.instructions,
stopWhen: stepCountIs(1),
tools,
});
for await (const event of fullStream) {
// console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event));
switch (event.type) {
case "reasoning-start":
yield {
type: "reasoning-start",
};
break;
case "reasoning-delta":
yield {
type: "reasoning-delta",
delta: event.text,
};
break;
case "reasoning-end":
yield {
type: "reasoning-end",
};
break;
case "text-start":
yield {
type: "text-start",
};
break;
case "text-delta":
yield {
type: "text-delta",
delta: event.text,
};
break;
case "tool-call":
yield {
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
};
break;
case "finish":
yield {
type: "usage",
usage: event.totalUsage,
};
break;
default:
// console.warn("Unknown event type", event);
// load existing and assemble state if required
if (opts.runId) {
console.error("loading run", opts.runId);
let stream: fs.ReadStream | null = null;
let rl: Interface | null = null;
try {
const logFile = path.join(WorkDir, "runs", `${opts.runId}.jsonl`);
stream = fs.createReadStream(logFile, { encoding: "utf8" });
rl = createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
if (line.trim() === "") {
continue;
}
const parsed = JSON.parse(line);
const event = RunEvent.parse(parsed);
switch (event.type) {
case "message":
messages.push(event.message);
break;
}
}
} finally {
stream?.close();
}
}
}
// create runId if not present
if (!opts.runId) {
opts.runId = runIdGenerator.next();
}
// load agent data
let agent: z.infer<typeof Agent> | null = null;
if (opts.agent === "copilot") {
agent = CopilotAgent;
} else {
agent = await loadAgent(opts.agent);
}
if (!agent) {
throw new Error("unable to load agent");
}
// set up tools
const tools: ToolSet = {};
for (const [name, tool] of Object.entries(agent.tools ?? {})) {
try {
tools[name] = await mapAgentTool(tool);
} catch (error) {
console.error(`Error mapping tool ${name}:`, error);
continue;
}
}
// set up
const logger = new RunLogger(opts.runId);
const ly = new LogAndYield(logger);
const provider = getProvider(agent.provider);
const model = provider(agent.model || ModelConfig.defaults.model);
// get first input if needed
let rl: Interface | null = null;
if (opts.interactive) {
rl = createInterface({ input, output });
}
if (opts.input) {
const m: z.infer<typeof UserMessage> = {
role: "user",
content: opts.input,
};
messages.push(m);
yield *ly.logAndYield({
type: "message",
message: m,
});
}
try {
// loop b/w user and agent
while (true) {
// get input in interactive mode when last message is not user
if (opts.interactive && (messages.length === 0 || messages[messages.length - 1].role !== "user")) {
const input = await rl!.question("You: ");
// Exit condition
if (["q", "quit", "exit"].includes(input.toLowerCase())) {
console.log("\n👋 Goodbye!");
return;
}
const m: z.infer<typeof UserMessage> = {
role: "user",
content: input,
};
messages.push(m);
yield* ly.logAndYield({
type: "message",
message: m,
});
}
// inner loop to handle tool calls
while (true) {
// stream agent response and build message
const messageBuilder = new StreamStepMessageBuilder();
for await (const event of streamLlm(
model,
messages,
agent.instructions,
tools,
)) {
messageBuilder.ingest(event);
yield* ly.logAndYield({
type: "stream-event",
event: event,
});
}
// build and emit final message from agent response
const msg = messageBuilder.get();
messages.push(msg);
yield* ly.logAndYield({
type: "message",
message: msg,
});
// handle tool calls
const mappedToolCalls: z.infer<typeof MappedToolCall>[] = [];
let msgToolCallParts: z.infer<typeof ToolCallPart>[] = [];
if (msg.content instanceof Array) {
msgToolCallParts = msg.content.filter(part => part.type === "tool-call");
}
const hasToolCalls = msgToolCallParts.length > 0;
console.log(msgToolCallParts);
// validate and map tool calls
for (const part of msgToolCallParts) {
const agentTool = tools[part.toolName];
if (!agentTool) {
throw new Error(`Tool ${part.toolName} not found`);
}
mappedToolCalls.push({
toolCall: part,
agentTool: agent.tools![part.toolName],
});
}
for (const call of mappedToolCalls) {
const { agentTool, toolCall } = call;
yield* ly.logAndYield({
type: "tool-invocation",
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
});
const result = await execTool(agentTool, toolCall.arguments);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
toolName: toolCall.toolName,
result: result,
});
yield* ly.logAndYield({
type: "message",
message: resultMsg,
});
}
// if the agent response had tool calls, replay this agent
if (hasToolCalls) {
continue;
}
// otherwise, break
break;
}
// if not interactive, return
if (!opts.interactive) {
break;
}
}
} finally {
rl?.close();
logger.close();
}
}
async function* streamLlm(
model: LanguageModel,
messages: z.infer<typeof MessageList>,
instructions: string,
tools: ToolSet,
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
const { fullStream } = streamText({
model,
messages: convertFromMessages(messages),
system: instructions,
tools,
stopWhen: stepCountIs(1),
providerOptions: {
openai: {
reasoningEffort: "low",
reasoningSummary: "auto",
},
}
});
for await (const event of fullStream) {
// console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event));
switch (event.type) {
case "reasoning-start":
yield {
type: "reasoning-start",
};
break;
case "reasoning-delta":
yield {
type: "reasoning-delta",
delta: event.text,
};
break;
case "reasoning-end":
yield {
type: "reasoning-end",
};
break;
case "text-start":
yield {
type: "text-start",
};
break;
case "text-delta":
yield {
type: "text-delta",
delta: event.text,
};
break;
case "tool-call":
yield {
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
};
break;
case "finish":
yield {
type: "usage",
usage: event.totalUsage,
};
break;
default:
// console.warn("Unknown event type", event);
continue;
}
}
}
export const MappedToolCall = z.object({
toolCall: ToolCallPart,
agentTool: ToolAttachment,
});

View file

@ -0,0 +1,424 @@
import { z, ZodType } from "zod";
import * as fs from "fs/promises";
import * as path from "path";
import { WorkDir as BASE_DIR } from "../config/config.js";
import { executeCommand } from "./command-executor.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { Client } from "@modelcontextprotocol/sdk/client";
const BuiltinToolsSchema = z.record(z.string(), z.object({
description: z.string(),
inputSchema: z.custom<ZodType>(),
execute: z.function({
input: z.any(),
output: z.promise(z.any()),
}),
}));
export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
exploreDirectory: {
description: 'Recursively explore directory structure to understand existing workflows, agents, and file organization',
inputSchema: z.object({
subdirectory: z.string().optional().describe('Subdirectory to explore (optional, defaults to root)'),
maxDepth: z.number().optional().describe('Maximum depth to traverse (default: 3)'),
}),
execute: async ({ subdirectory, maxDepth = 3 }: { subdirectory?: string, maxDepth?: number }) => {
async function explore(dir: string, depth: number = 0): Promise<any> {
if (depth > maxDepth) return null;
try {
const entries = await fs.readdir(dir, { withFileTypes: true });
const result: any = { files: [], directories: {} };
for (const entry of entries) {
const fullPath = path.join(dir, entry.name);
if (entry.isFile()) {
const ext = path.extname(entry.name);
const size = (await fs.stat(fullPath)).size;
result.files.push({
name: entry.name,
type: ext || 'no-extension',
size: size,
relativePath: path.relative(BASE_DIR, fullPath),
});
} else if (entry.isDirectory()) {
result.directories[entry.name] = await explore(fullPath, depth + 1);
}
}
return result;
} catch (error) {
return { error: error instanceof Error ? error.message : 'Unknown error' };
}
}
const dirPath = subdirectory ? path.join(BASE_DIR, subdirectory) : BASE_DIR;
const structure = await explore(dirPath);
return {
success: true,
basePath: path.relative(BASE_DIR, dirPath) || '.',
structure,
};
},
},
readFile: {
description: 'Read and parse file contents. For JSON files, provides parsed structure.',
inputSchema: z.object({
filename: z.string().describe('The name of the file to read (relative to .rowboat directory)'),
}),
execute: async ({ filename }: { filename: string }) => {
try {
const filePath = path.join(BASE_DIR, filename);
const content = await fs.readFile(filePath, 'utf-8');
let parsed = null;
let fileType = path.extname(filename);
if (fileType === '.json') {
try {
parsed = JSON.parse(content);
} catch {
parsed = { error: 'Invalid JSON' };
}
}
return {
success: true,
filename,
fileType,
content,
parsed,
path: filePath,
size: content.length,
};
} catch (error) {
return {
success: false,
message: `Failed to read file: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
createFile: {
description: 'Create a new file with content. Automatically creates parent directories if needed.',
inputSchema: z.object({
filename: z.string().describe('The name of the file to create (relative to .rowboat directory)'),
content: z.string().describe('The content to write to the file'),
description: z.string().optional().describe('Optional description of why this file is being created'),
}),
execute: async ({ filename, content, description }: { filename: string, content: string, description?: string }) => {
try {
const filePath = path.join(BASE_DIR, filename);
const dir = path.dirname(filePath);
// Ensure directory exists
await fs.mkdir(dir, { recursive: true });
// Write file
await fs.writeFile(filePath, content, 'utf-8');
return {
success: true,
message: `File '${filename}' created successfully`,
description: description || 'No description provided',
path: filePath,
size: content.length,
};
} catch (error) {
return {
success: false,
message: `Failed to create file: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
updateFile: {
description: 'Update or overwrite the contents of an existing file',
inputSchema: z.object({
filename: z.string().describe('The name of the file to update (relative to .rowboat directory)'),
content: z.string().describe('The new content to write to the file'),
reason: z.string().optional().describe('Optional reason for the update'),
}),
execute: async ({ filename, content, reason }: { filename: string, content: string, reason?: string }) => {
try {
const filePath = path.join(BASE_DIR, filename);
// Check if file exists
await fs.access(filePath);
// Update file
await fs.writeFile(filePath, content, 'utf-8');
return {
success: true,
message: `File '${filename}' updated successfully`,
reason: reason || 'No reason provided',
path: filePath,
size: content.length,
};
} catch (error) {
return {
success: false,
message: `Failed to update file: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
deleteFile: {
description: 'Delete a file from the .rowboat directory',
inputSchema: z.object({
filename: z.string().describe('The name of the file to delete (relative to .rowboat directory)'),
}),
execute: async ({ filename }: { filename: string }) => {
try {
const filePath = path.join(BASE_DIR, filename);
await fs.unlink(filePath);
return {
success: true,
message: `File '${filename}' deleted successfully`,
path: filePath,
};
} catch (error) {
return {
success: false,
message: `Failed to delete file: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
listFiles: {
description: 'List all files and directories in the .rowboat directory or subdirectory',
inputSchema: z.object({
subdirectory: z.string().optional().describe('Optional subdirectory to list (relative to .rowboat directory)'),
}),
execute: async ({ subdirectory }: { subdirectory?: string }) => {
try {
const dirPath = subdirectory ? path.join(BASE_DIR, subdirectory) : BASE_DIR;
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const files = entries
.filter(entry => entry.isFile())
.map(entry => ({
name: entry.name,
type: path.extname(entry.name) || 'no-extension',
relativePath: path.relative(BASE_DIR, path.join(dirPath, entry.name)),
}));
const directories = entries
.filter(entry => entry.isDirectory())
.map(entry => entry.name);
return {
success: true,
path: dirPath,
relativePath: path.relative(BASE_DIR, dirPath) || '.',
files,
directories,
totalFiles: files.length,
totalDirectories: directories.length,
};
} catch (error) {
return {
success: false,
message: `Failed to list files: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
analyzeWorkflow: {
description: 'Read and analyze a workflow file to understand its structure, agents, and dependencies',
inputSchema: z.object({
workflowName: z.string().describe('Name of the workflow file to analyze (with or without .json extension)'),
}),
execute: async ({ workflowName }: { workflowName: string }) => {
try {
const filename = workflowName.endsWith('.json') ? workflowName : `${workflowName}.json`;
const filePath = path.join(BASE_DIR, 'workflows', filename);
const content = await fs.readFile(filePath, 'utf-8');
const workflow = JSON.parse(content);
// Extract key information
const analysis = {
name: workflow.name,
description: workflow.description || 'No description',
agentCount: workflow.agents ? workflow.agents.length : 0,
agents: workflow.agents || [],
tools: workflow.tools || {},
structure: workflow,
};
return {
success: true,
filePath: path.relative(BASE_DIR, filePath),
analysis,
};
} catch (error) {
return {
success: false,
message: `Failed to analyze workflow: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
listMcpServers: {
description: 'List all available MCP servers from the configuration',
inputSchema: z.object({}),
execute: async (): Promise<{ success: boolean, servers: any[], count: number, message: string }> => {
try {
const configPath = path.join(BASE_DIR, 'config', 'mcp.json');
// Check if config exists
try {
await fs.access(configPath);
} catch {
return {
success: true,
servers: [],
count: 0,
message: 'No MCP servers configured yet',
};
}
const content = await fs.readFile(configPath, 'utf-8');
const config = JSON.parse(content);
const servers = Object.keys(config.mcpServers || {}).map(name => {
const server = config.mcpServers[name];
return {
name,
type: 'command' in server ? 'stdio' : 'http',
command: server.command,
url: server.url,
};
});
return {
success: true,
servers,
count: servers.length,
message: `Found ${servers.length} MCP server(s)`,
};
} catch (error) {
return {
success: false,
servers: [],
count: 0,
message: `Failed to list MCP servers: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
listMcpTools: {
description: 'List all available tools from a specific MCP server',
inputSchema: z.object({
serverName: z.string().describe('Name of the MCP server to query'),
}),
execute: async ({ serverName }: { serverName: string }) => {
try {
const configPath = path.join(BASE_DIR, 'config', 'mcp.json');
const content = await fs.readFile(configPath, 'utf-8');
const config = JSON.parse(content);
const mcpConfig = config.mcpServers[serverName];
if (!mcpConfig) {
return {
success: false,
message: `MCP server '${serverName}' not found in configuration`,
};
}
// Create transport based on config type
let transport;
if ('command' in mcpConfig) {
transport = new StdioClientTransport({
command: mcpConfig.command,
args: mcpConfig.args || [],
env: mcpConfig.env || {},
});
} else {
try {
transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url));
} catch {
transport = new SSEClientTransport(new URL(mcpConfig.url));
}
}
// Create and connect client
const client = new Client({
name: 'rowboat-copilot',
version: '1.0.0',
});
await client.connect(transport);
// List available tools
const toolsList = await client.listTools();
// Close connection
client.close();
transport.close();
const tools = toolsList.tools.map((t: any) => ({
name: t.name,
description: t.description || 'No description',
inputSchema: t.inputSchema,
}));
return {
success: true,
serverName,
tools,
count: tools.length,
message: `Found ${tools.length} tool(s) in MCP server '${serverName}'`,
};
} catch (error) {
return {
success: false,
message: `Failed to list MCP tools: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
},
},
executeCommand: {
description: 'Execute a shell command and return the output. Use this to run bash/shell commands.',
inputSchema: z.object({
command: z.string().describe('The shell command to execute (e.g., "ls -la", "cat file.txt")'),
cwd: z.string().optional().describe('Working directory to execute the command in (defaults to .rowboat directory)'),
}),
execute: async ({ command, cwd }: { command: string, cwd?: string }) => {
try {
const workingDir = cwd ? path.join(BASE_DIR, cwd) : BASE_DIR;
const result = await executeCommand(command, { cwd: workingDir });
return {
success: result.exitCode === 0,
stdout: result.stdout,
stderr: result.stderr,
exitCode: result.exitCode,
command,
workingDir,
};
} catch (error) {
return {
success: false,
message: `Failed to execute command: ${error instanceof Error ? error.message : 'Unknown error'}`,
command,
};
}
},
},
};

View file

@ -1,20 +1,16 @@
import { tool, Tool } from "ai";
import { AgentTool } from "../entities/agent.js";
import { ToolAttachment } from "../entities/agent.js";
import { z } from "zod";
import { McpServers } from "../config/config.js";
import { getMcpClient } from "./mcp.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { Client } from "@modelcontextprotocol/sdk/client";
import { executeCommand } from "./command-executor.js";
import { loadWorkflow } from "./utils.js";
import { AssistantMessage } from "../entities/message.js";
import { executeWorkflow } from "./exec-workflow.js";
import readline from "readline";
import { BuiltinTools } from "./builtin-tools.js";
import { streamAgent } from "./agent.js";
async function execMcpTool(agentTool: z.infer<typeof AgentTool> & { type: "mcp" }, input: any): Promise<any> {
async function execMcpTool(agentTool: z.infer<typeof ToolAttachment> & { type: "mcp" }, input: any): Promise<any> {
// load mcp configuration from the tool
const mcpConfig = McpServers[agentTool.mcpServerName];
if (!mcpConfig) {
@ -57,34 +53,12 @@ async function execMcpTool(agentTool: z.infer<typeof AgentTool> & { type: "mcp"
return result;
}
async function execBashTool(agentTool: z.infer<typeof AgentTool>, input: any): Promise<any> {
const result = await executeCommand(input.command as string);
return {
stdout: result.stdout,
stderr: result.stderr,
exitCode: result.exitCode,
};
}
export async function execAskHumanTool(agentTool: z.infer<typeof AgentTool>, question: string): Promise<string> {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
let p = new Promise<string>((resolve, reject) => {
rl.question(`>> Provide answer to: ${question}:\n\n`, (answer) => {
resolve(answer);
rl.close();
});
});
const answer = await p;
return answer;
}
async function execWorkflowTool(agentTool: z.infer<typeof AgentTool> & { type: "workflow" }, input: any): Promise<any> {
async function execAgentTool(agentTool: z.infer<typeof ToolAttachment> & { type: "agent" }, input: any): Promise<any> {
let lastMsg: z.infer<typeof AssistantMessage> | null = null;
for await (const event of executeWorkflow(agentTool.name, input.message)) {
for await (const event of streamAgent({
agent: agentTool.name,
input: JSON.stringify(input),
})) {
if (event.type === "message" && event.message.role === "assistant") {
lastMsg = event.message;
}
@ -94,7 +68,7 @@ async function execWorkflowTool(agentTool: z.infer<typeof AgentTool> & { type: "
}
if (!lastMsg) {
throw new Error("No message received from workflow");
throw new Error("No message received from agent");
}
if (typeof lastMsg.content === "string") {
return lastMsg.content;
@ -107,18 +81,17 @@ async function execWorkflowTool(agentTool: z.infer<typeof AgentTool> & { type: "
}, "");
}
export async function execTool(agentTool: z.infer<typeof AgentTool>, input: any): Promise<any> {
export async function execTool(agentTool: z.infer<typeof ToolAttachment>, input: any): Promise<any> {
switch (agentTool.type) {
case "mcp":
return execMcpTool(agentTool, input);
case "workflow":
return execWorkflowTool(agentTool, input);
case "agent":
return execAgentTool(agentTool, input);
case "builtin":
switch (agentTool.name) {
case "bash":
return execBashTool(agentTool, input);
default:
throw new Error(`Unknown builtin tool: ${agentTool.name}`);
const builtinTool = BuiltinTools[agentTool.name];
if (!builtinTool || !builtinTool.execute) {
throw new Error(`Unsupported builtin tool: ${agentTool.name}`);
}
return builtinTool.execute(input);
}
}

View file

@ -1,449 +0,0 @@
import { loadWorkflow } from "./utils.js";
import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage, ToolCallPart } from "../entities/message.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
import { AgentNode } from "./agent.js";
import { z } from "zod";
import path from "path";
import { WorkDir } from "../config/config.js";
import fs from "fs";
import { createInterface, Interface } from "node:readline/promises";
import { FunctionsRegistry } from "../registry/functions.js";
import { RunEvent } from "../entities/workflow-event.js";
import { execAskHumanTool, execTool } from "./exec-tool.js";
import { AgentTool } from "../entities/agent.js";
import { runIdGenerator } from "./run-id-gen.js";
import { Workflow } from "../entities/workflow.js";
const MappedToolCall = z.object({
toolCall: ToolCallPart,
agentTool: AgentTool,
});
const State = z.object({
stepIndex: z.number(),
messages: MessageList,
workflow: Workflow.nullable(),
pendingToolCallId: z.string().nullable(),
});
class StateBuilder {
private state: z.infer<typeof State> = {
stepIndex: 0,
messages: [],
workflow: null,
pendingToolCallId: null,
};
ingest(event: z.infer<typeof RunEvent>) {
switch (event.type) {
case "start":
this.state.workflow = event.workflow;
break;
case "step-start":
this.state.stepIndex = event.stepIndex;
break;
case "message":
this.state.messages.push(event.message);
this.state.pendingToolCallId = null;
break;
case "pause-for-human-input":
this.state.pendingToolCallId = event.toolCallId;
break;
}
}
get(): z.infer<typeof State> {
return this.state;
}
}
class RunLogger {
private logFile: string;
private fileHandle: fs.WriteStream;
ensureRunsDir(workflowId: string) {
const runsDir = path.join(WorkDir, "runs", workflowId);
if (!fs.existsSync(runsDir)) {
fs.mkdirSync(runsDir, { recursive: true });
}
}
constructor(workflowId: string, runId: string) {
this.ensureRunsDir(workflowId);
this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
this.fileHandle = fs.createWriteStream(this.logFile, {
flags: "a",
encoding: "utf8",
});
}
log(event: z.infer<typeof RunEvent>) {
this.fileHandle.write(JSON.stringify(event) + "\n");
}
close() {
this.fileHandle.close();
}
}
class LogAndYield {
private logger: RunLogger
constructor(logger: RunLogger) {
this.logger = logger;
}
async *logAndYield(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const ev = {
...event,
ts: new Date().toISOString(),
}
this.logger.log(ev);
yield ev;
}
}
class StreamStepMessageBuilder {
private parts: z.infer<typeof AssistantContentPart>[] = [];
private textBuffer: string = "";
private reasoningBuffer: string = "";
flushBuffers() {
if (this.reasoningBuffer) {
this.parts.push({ type: "reasoning", text: this.reasoningBuffer });
this.reasoningBuffer = "";
}
if (this.textBuffer) {
this.parts.push({ type: "text", text: this.textBuffer });
this.textBuffer = "";
}
}
ingest(event: z.infer<typeof LlmStepStreamEvent>) {
switch (event.type) {
case "reasoning-start":
case "reasoning-end":
case "text-start":
case "text-end":
this.flushBuffers();
break;
case "reasoning-delta":
this.reasoningBuffer += event.delta;
break;
case "text-delta":
this.textBuffer += event.delta;
break;
case "tool-call":
this.parts.push({
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
arguments: event.input,
});
break;
}
}
get(): z.infer<typeof AssistantMessage> {
this.flushBuffers();
return {
role: "assistant",
content: this.parts,
};
}
}
function loadFunction(id: string) {
const func = FunctionsRegistry[id];
if (!func) {
throw new Error(`Function ${id} not found`);
}
return func;
}
export async function* executeWorkflow(id: string, input: string, interactive: boolean = true, asTool: boolean = false): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const runId = runIdGenerator.next();
yield* runFromState({
id,
runId,
state: {
stepIndex: 0,
messages: [{
role: "user",
content: input,
}],
workflow: null,
pendingToolCallId: null,
},
interactive,
asTool,
});
}
export async function* resumeWorkflow(runId: string, input: string, interactive: boolean = false): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
// read a run.jsonl file line by line and build state
const builder = new StateBuilder();
let rl: Interface | null = null;
let stream: fs.ReadStream | null = null;
try {
const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
stream = fs.createReadStream(logFile, { encoding: "utf8" });
rl = createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
if (line.trim() === "") {
continue;
}
// console.error('processing line', line);
const parsed = JSON.parse(line);
// console.error('parsed');
const event = RunEvent.parse(parsed);
// console.error('zod parsed');
builder.ingest(event);
}
} catch (error) {
// console.error("Failed to resume workflow:", error);
// yield {
// type: "error",
// error: error instanceof Error ? error.message : String(error),
// };
} finally {
rl?.close();
stream?.close();
}
const { workflow, messages, stepIndex, pendingToolCallId } = builder.get();
if (!workflow) {
throw new Error(`Workflow not found for run ${runId}`);
}
if (!pendingToolCallId) {
throw new Error(`No pending tool call found for run ${runId}`);
}
const stepId = workflow.steps[stepIndex].id;
// append user input as message
const logger = new RunLogger(workflow.name, runId);
const ly = new LogAndYield(logger);
yield *ly.logAndYield({
type: "resume"
});
// append user input as message
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(input),
toolCallId: pendingToolCallId,
toolName: "ask-human",
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
stepId,
toolName: "ask-human",
result: input,
});
yield* ly.logAndYield({
type: "message",
stepId,
message: resultMsg,
});
yield* runFromState({
id: workflow.name,
runId,
state: {
stepIndex,
messages,
workflow,
pendingToolCallId,
},
interactive,
asTool: false,
});
}
async function* runFromState(opts: {
id: string;
runId: string;
state: z.infer<typeof State>;
interactive: boolean;
asTool: boolean;
}) {
const { id, runId, state, interactive, asTool } = opts;
let stepIndex = state.stepIndex;
let messages = [...state.messages];
let workflow = state.workflow;
const logger = new RunLogger(id, runId);
const ly = new LogAndYield(logger);
try {
if (!workflow) {
workflow = loadWorkflow(id);
yield* ly.logAndYield({
type: "start",
runId,
workflowId: id,
workflow,
interactive,
});
}
while (true) {
const step = workflow.steps[stepIndex];
const node = step.type === "agent" ? new AgentNode(step.id, asTool) : loadFunction(step.id);
yield* ly.logAndYield({
type: "step-start",
stepIndex,
stepId: step.id,
stepType: step.type,
});
const messageBuilder = new StreamStepMessageBuilder();
// stream response from agent
for await (const event of node.execute(messages)) {
// console.log(" - event", JSON.stringify(event));
messageBuilder.ingest(event);
yield* ly.logAndYield({
type: "stream-event",
stepId: step.id,
event: event,
});
}
// build and emit final message from agent response
const msg = messageBuilder.get();
messages.push(msg);
yield* ly.logAndYield({
type: "message",
stepId: step.id,
message: msg,
});
// handle tool calls
const tools = node.tools();
const mappedToolCalls: z.infer<typeof MappedToolCall>[] = [];
let msgToolCallParts: z.infer<typeof ToolCallPart>[] = [];
if (msg.content instanceof Array) {
msgToolCallParts = msg.content.filter(part => part.type === "tool-call");
}
const hasToolCalls = msgToolCallParts.length > 0;
// validate and map tool calls
for (const part of msgToolCallParts) {
const agentTool = tools[part.toolName];
if (!agentTool) {
throw new Error(`Tool ${part.toolName} not found`);
}
mappedToolCalls.push({
toolCall: part,
agentTool: agentTool,
});
}
// first, exec all tool calls other than ask-human
for (const call of mappedToolCalls) {
const { agentTool, toolCall } = call;
if (agentTool.type === "builtin" && agentTool.name === "ask-human") {
continue;
}
yield* ly.logAndYield({
type: "tool-invocation",
stepId: step.id,
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
});
const result = await execTool(agentTool, toolCall.arguments);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
stepId: step.id,
toolName: toolCall.toolName,
result: result,
});
yield* ly.logAndYield({
type: "message",
stepId: step.id,
message: resultMsg,
});
}
// handle ask-tool call execution
for (const call of mappedToolCalls) {
const { agentTool, toolCall } = call;
if (agentTool.type !== "builtin" || agentTool.name !== "ask-human") {
continue;
}
yield* ly.logAndYield({
type: "tool-invocation",
stepId: step.id,
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
});
// if running in background mode, exit here
if (!interactive) {
yield* ly.logAndYield({
type: "pause-for-human-input",
toolCallId: toolCall.toolCallId,
});
return;
}
const result = await execAskHumanTool(agentTool, toolCall.arguments.question as string);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
stepId: step.id,
toolName: toolCall.toolName,
result: result,
});
yield* ly.logAndYield({
type: "message",
stepId: step.id,
message: resultMsg,
});
}
yield* ly.logAndYield({
type: "step-end",
stepIndex,
});
// if the agent response had tool calls, replay this agent
if (hasToolCalls) {
continue;
}
// otherwise, move to the next step
stepIndex++;
if (stepIndex >= workflow.steps.length) {
yield* ly.logAndYield({
type: "end",
});
break;
}
}
// console.log('\n\n', JSON.stringify(messages, null, 2));
} catch (error) {
yield* ly.logAndYield({
type: "error",
error: error instanceof Error ? error.message : String(error),
});
} finally {
logger.close();
}
}

View file

@ -1,7 +1,7 @@
import { MessageList } from "../entities/message.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
import { LlmStepStreamEvent } from "../entities/llm-step-events.js";
import { z } from "zod";
import { AgentTool } from "../entities/agent.js";
import { ToolAttachment } from "../entities/agent.js";
export type StepInputT = z.infer<typeof MessageList>;
export type StepOutputT = AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown>;
@ -9,5 +9,5 @@ export type StepOutputT = AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, voi
export interface Step {
execute(input: StepInputT): StepOutputT;
tools(): Record<string, z.infer<typeof AgentTool>>;
tools(): Record<string, z.infer<typeof ToolAttachment>>;
}

View file

@ -1,6 +1,6 @@
import { z } from "zod";
import { RunEvent } from "../entities/workflow-event.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
import { RunEvent } from "../entities/run-events.js";
import { LlmStepStreamEvent } from "../entities/llm-step-events.js";
export interface StreamRendererOptions {
showHeaders?: boolean;
@ -27,11 +27,11 @@ export class StreamRenderer {
render(event: z.infer<typeof RunEvent>) {
switch (event.type) {
case "start": {
this.onWorkflowStart(event.workflowId, event.runId, event.interactive);
this.onStart(event.agentId, event.runId, event.interactive);
break;
}
case "step-start": {
this.onStepStart(event.stepIndex, event.stepId, event.stepType);
this.onStepStart();
break;
}
case "stream-event": {
@ -43,23 +43,23 @@ export class StreamRenderer {
break;
}
case "tool-invocation": {
this.onStepToolInvocation(event.stepId, event.toolName, event.input);
this.onStepToolInvocation(event.toolName, event.input);
break;
}
case "tool-result": {
this.onStepToolResult(event.stepId, event.toolName, event.result);
this.onStepToolResult(event.toolName, event.result);
break;
}
case "step-end": {
this.onStepEnd(event.stepIndex);
this.onStepEnd();
break;
}
case "end": {
this.onWorkflowEnd();
this.onEnd();
break;
}
case "error": {
this.onWorkflowError(event.error);
this.onError(event.error);
break;
}
}
@ -94,29 +94,29 @@ export class StreamRenderer {
}
}
private onWorkflowStart(workflowId: string, runId: string, interactive: boolean) {
private onStart(workflowId: string, runId: string, interactive: boolean) {
this.write("\n");
this.write(this.bold(`▶ Workflow ${workflowId} (run ${runId})`));
if (!interactive) this.write(this.dim(" (--no-interactive)"));
this.write("\n");
}
private onWorkflowEnd() {
private onEnd() {
this.write(this.bold("\n■ Workflow complete\n"));
}
private onWorkflowError(error: string) {
private onError(error: string) {
this.write(this.red(`\n✖ Workflow error: ${error}\n`));
}
private onStepStart(stepIndex: number, stepId: string, stepType: "agent" | "function") {
private onStepStart() {
this.write("\n");
this.write(this.cyan(`─ Step ${stepIndex} [${stepType}]`));
this.write(this.cyan(`─ Step started`));
this.write("\n");
}
private onStepEnd(stepIndex: number) {
this.write(this.dim(`✓ Step ${stepIndex} finished\n`));
private onStepEnd() {
this.write(this.dim(`✓ Step finished\n`));
}
private onStepMessage(stepIndex: number, message: any) {
@ -131,7 +131,7 @@ export class StreamRenderer {
}
}
private onStepToolInvocation(stepId: string, toolName: string, input: string) {
private onStepToolInvocation(toolName: string, input: string) {
this.write(this.cyan(`\n→ Tool invoke ${toolName}`));
if (input && input.length) {
this.write("\n" + this.dim(this.indent(this.truncate(input))) + "\n");
@ -140,7 +140,7 @@ export class StreamRenderer {
}
}
private onStepToolResult(stepId: string, toolName: string, result: unknown) {
private onStepToolResult(toolName: string, result: unknown) {
const res = this.truncate(JSON.stringify(result, null, this.options.jsonIndent));
this.write(this.cyan(`\n← Tool result ${toolName}\n`));
this.write(this.dim(this.indent(res)) + "\n");

View file

@ -1,10 +0,0 @@
import fs from "fs";
import path from "path";
import { WorkDir } from "../config/config.js";
import { Workflow } from "../entities/workflow.js";
export function loadWorkflow(id: string) {
const workflowPath = path.join(WorkDir, "workflows", `${id}.json`);
const workflow = fs.readFileSync(workflowPath, "utf8");
return Workflow.parse(JSON.parse(workflow));
}