set up basic workflow execution

This commit is contained in:
Ramnique Singh 2025-11-07 11:42:10 +05:30
parent 7758139893
commit c004bc5eb6
24 changed files with 794 additions and 298 deletions

View file

@ -0,0 +1,214 @@
import { Message, MessageList } from "../entities/message.js";
import { z } from "zod";
import { Step, StepInputT, StepOutputT } from "./step.js";
import { openai } from "@ai-sdk/openai";
import { google } from "@ai-sdk/google";
import { generateText, ModelMessage, stepCountIs, streamText, tool, Tool, ToolSet, jsonSchema } from "ai";
import { Agent, AgentTool } from "../entities/agent.js";
import { WorkDir } from "../config/config.js";
import fs from "fs";
import path from "path";
import { loadWorkflow } from "./utils.js";
const BashTool = tool({
description: "Run a command in the shell",
inputSchema: z.object({
command: z.string(),
}),
});
const AskHumanTool = tool({
description: "Ask the human for input",
inputSchema: z.object({
question: z.string(),
}),
});
function mapAgentTool(t: z.infer<typeof AgentTool>): Tool {
switch (t.type) {
case "mcp":
return tool({
name: t.name,
description: t.description,
inputSchema: jsonSchema(t.inputSchema),
});
case "workflow":
const workflow = loadWorkflow(t.name);
if (!workflow) {
throw new Error(`Workflow ${t.name} not found`);
}
return tool({
name: t.name,
description: workflow.description,
inputSchema: z.object({
message: z.string().describe("The message to send to the workflow"),
}),
});
case "builtin":
switch (t.name) {
case "bash":
return BashTool;
default:
throw new Error(`Unknown builtin tool: ${t.name}`);
}
}
}
function convertFromMessages(messages: z.infer<typeof Message>[]): ModelMessage[] {
const result: ModelMessage[] = [];
for (const msg of messages) {
switch (msg.role) {
case "assistant":
if (typeof msg.content === 'string') {
result.push({
role: "assistant",
content: msg.content,
});
} else {
result.push({
role: "assistant",
content: msg.content.map(part => {
switch (part.type) {
case 'text':
return part;
case 'reasoning':
return part;
case 'tool-call':
return {
type: 'tool-call',
toolCallId: part.toolCallId,
toolName: part.toolName,
input: part.arguments,
};
}
}),
});
}
break;
case "system":
result.push({
role: "system",
content: msg.content,
});
break;
case "user":
result.push({
role: "user",
content: msg.content,
});
break;
case "tool":
result.push({
role: "tool",
content: [
{
type: "tool-result",
toolCallId: msg.toolCallId,
toolName: msg.toolName,
output: {
type: "text",
value: msg.content,
},
},
],
});
break;
}
}
return result;
}
export class AgentNode implements Step {
private id: string;
private background: boolean;
private agent: z.infer<typeof Agent>;
constructor(id: string, background: boolean) {
this.id = id;
this.background = background;
const agentPath = path.join(WorkDir, "agents", `${id}.json`);
const agent = fs.readFileSync(agentPath, "utf8");
this.agent = Agent.parse(JSON.parse(agent));
}
tools(): Record<string, z.infer<typeof AgentTool>> {
return this.agent.tools ?? {};
}
async* execute(input: StepInputT): StepOutputT {
// console.log("\n\n\t>>>>\t\tinput", JSON.stringify(input));
const tools: ToolSet = {};
if (!this.background) {
tools["ask-human"] = AskHumanTool;
}
for (const [name, tool] of Object.entries(this.agent.tools ?? {})) {
try {
tools[name] = mapAgentTool(tool);
} catch (error) {
console.error(`Error mapping tool ${name}:`, error);
continue;
}
}
// console.log("\n\n\t>>>>\t\ttools", JSON.stringify(tools, null, 2));
const { fullStream } = streamText({
model: openai("gpt-4.1"),
// model: google("gemini-2.5-pro"),
messages: convertFromMessages(input),
system: this.agent.instructions,
stopWhen: stepCountIs(1),
tools,
});
for await (const event of fullStream) {
// console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event));
switch (event.type) {
case "reasoning-start":
yield {
type: "reasoning-start",
};
break;
case "reasoning-delta":
yield {
type: "reasoning-delta",
delta: event.text,
};
break;
case "reasoning-end":
yield {
type: "reasoning-end",
};
break;
case "text-start":
yield {
type: "text-start",
};
break;
case "text-delta":
yield {
type: "text-delta",
delta: event.text,
};
break;
case "tool-call":
yield {
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
};
break;
case "finish":
yield {
type: "usage",
usage: event.totalUsage,
};
break;
default:
// console.warn("Unknown event type", event);
continue;
}
}
}
}

View file

@ -0,0 +1,102 @@
import { tool, Tool } from "ai";
import { AgentTool } from "../entities/agent.js";
import { z } from "zod";
import { McpServers } from "../config/config.js";
import { getMcpClient } from "./mcp.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { Client } from "@modelcontextprotocol/sdk/client";
import { executeCommand } from "./command-executor.js";
import { loadWorkflow } from "./utils.js";
import { AssistantMessage } from "../entities/message.js";
import { executeWorkflow } from "./exec-workflow.js";
async function execMcpTool(agentTool: z.infer<typeof AgentTool> & { type: "mcp" }, input: any): Promise<any> {
// load mcp configuration from the tool
const mcpConfig = McpServers[agentTool.mcpServerName];
if (!mcpConfig) {
throw new Error(`MCP server ${agentTool.mcpServerName} not found`);
}
// create transport
let transport: Transport;
if ("command" in mcpConfig) {
transport = new StdioClientTransport({
command: mcpConfig.command,
args: mcpConfig.args,
env: mcpConfig.env,
});
} else {
// first try streamable http transport
try {
transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url));
} catch (error) {
// if that fails, try sse transport
transport = new SSEClientTransport(new URL(mcpConfig.url));
}
}
if (!transport) {
throw new Error(`No transport found for ${agentTool.mcpServerName}`);
}
// create client
const client = new Client({
name: 'rowboatx',
version: '1.0.0',
});
await client.connect(transport);
// call tool
const result = await client.callTool({ name: agentTool.name, arguments: input });
client.close();
transport.close();
return result;
}
async function execBashTool(agentTool: z.infer<typeof AgentTool>, input: any): Promise<any> {
const result = await executeCommand(input.command as string);
return {
stdout: result.stdout,
stderr: result.stderr,
exitCode: result.exitCode,
};
}
async function execWorkflowTool(agentTool: z.infer<typeof AgentTool> & { type: "workflow" }, input: any): Promise<any> {
let lastMsg: z.infer<typeof AssistantMessage> | null = null;
for await (const event of executeWorkflow(agentTool.name, input.message)) {
if (event.type === "workflow-step-message" && event.message.role === "assistant") {
lastMsg = event.message;
}
if (event.type === "workflow-error") {
throw new Error(event.error);
}
}
if (!lastMsg) {
throw new Error("No message received from workflow");
}
if (typeof lastMsg.content === "string") {
return lastMsg.content;
}
return lastMsg.content.reduce((acc, part) => {
if (part.type === "text") {
acc += part.text;
}
return acc;
}, "");
}
export async function execTool(agentTool: z.infer<typeof AgentTool>, input: any): Promise<any> {
switch (agentTool.type) {
case "mcp":
return execMcpTool(agentTool, input);
case "workflow":
return execWorkflowTool(agentTool, input);
case "builtin":
return execBashTool(agentTool, input);
}
}

View file

@ -0,0 +1,215 @@
import { loadWorkflow } from "./utils.js";
import { randomId } from "./random-id.js";
import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage } from "../entities/message.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
import { AgentNode } from "./agent.js";
import { z } from "zod";
import path from "path";
import { WorkDir } from "../config/config.js";
import fs from "fs";
import { FunctionsRegistry } from "../registry/functions.js";
import { WorkflowStreamEvent } from "../entities/workflow-event.js";
import { execTool } from "./exec-tool.js";
class RunLogger {
private logFile: string;
private fileHandle: fs.WriteStream;
ensureRunsDir(workflowId: string) {
const runsDir = path.join(WorkDir, "runs", workflowId);
if (!fs.existsSync(runsDir)) {
fs.mkdirSync(runsDir, { recursive: true });
}
}
constructor(workflowId: string, runId: string) {
this.ensureRunsDir(workflowId);
this.logFile = path.join(WorkDir, "runs", `${workflowId}`, `${runId}.jsonl`);
this.fileHandle = fs.createWriteStream(this.logFile, {
flags: "a",
encoding: "utf8",
});
}
log(message: z.infer<typeof Message>) {
this.fileHandle.write(JSON.stringify(message) + "\n");
}
close() {
this.fileHandle.close();
}
}
class StreamStepMessageBuilder {
private parts: z.infer<typeof AssistantContentPart>[] = [];
private textBuffer: string = "";
private reasoningBuffer: string = "";
flushBuffers() {
if (this.reasoningBuffer) {
this.parts.push({ type: "reasoning", text: this.reasoningBuffer });
this.reasoningBuffer = "";
}
if (this.textBuffer) {
this.parts.push({ type: "text", text: this.textBuffer });
this.textBuffer = "";
}
}
ingest(event: z.infer<typeof LlmStepStreamEvent>) {
switch (event.type) {
case "reasoning-start":
case "reasoning-end":
case "text-start":
case "text-end":
this.flushBuffers();
break;
case "reasoning-delta":
this.reasoningBuffer += event.delta;
break;
case "text-delta":
this.textBuffer += event.delta;
break;
case "tool-call":
this.parts.push({
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
arguments: event.input,
});
break;
}
}
get(): z.infer<typeof AssistantMessage> {
this.flushBuffers();
return {
role: "assistant",
content: this.parts,
};
}
}
function loadFunction(id: string) {
const func = FunctionsRegistry[id];
if (!func) {
throw new Error(`Function ${id} not found`);
}
return func;
}
export async function* executeWorkflow(id: string, input: string, background: boolean = false): AsyncGenerator<z.infer<typeof WorkflowStreamEvent>, void, unknown> {
try {
const workflow = loadWorkflow(id);
const runId = await randomId();
yield {
type: "workflow-start",
workflowId: id,
workflow: workflow,
background: background,
};
const logger = new RunLogger(id, runId);
const messages: z.infer<typeof MessageList> = [{
role: "user",
content: input ?? ""
}];
try {
let stepIndex = 0;
while (true) {
const step = workflow.steps[stepIndex];
const node = step.type === "agent" ? new AgentNode(step.id, background) : loadFunction(step.id);
const messageBuilder = new StreamStepMessageBuilder();
// stream response from agent
for await (const event of node.execute(messages)) {
// console.log(" - event", JSON.stringify(event));
messageBuilder.ingest(event);
yield {
type: "workflow-step-stream-event",
stepId: step.id,
event: event,
};
}
// build and emit final message from agent response
const msg = messageBuilder.get();
logger.log(msg);
messages.push(msg);
yield {
type: "workflow-step-message",
stepId: step.id,
message: msg,
};
// if the agent response contains tool calls, execute them
const tools = node.tools();
let hasToolCalls = false;
if (msg.content instanceof Array) {
for (const part of msg.content) {
if (part.type === "tool-call") {
hasToolCalls = true;
if (!(part.toolName in tools)) {
throw new Error(`Tool ${part.toolName} not found`);
}
yield {
type: "workflow-step-tool-invocation",
stepId: step.id,
toolName: part.toolName,
input: part.arguments,
}
const result = await execTool(tools[part.toolName], part.arguments);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: part.toolCallId,
toolName: part.toolName,
};
logger.log(resultMsg);
messages.push(resultMsg);
yield {
type: "workflow-step-tool-result",
stepId: step.id,
toolName: part.toolName,
result: result,
};
yield {
type: "workflow-step-message",
stepId: step.id,
message: resultMsg,
};
}
}
}
// if the agent response had tool calls, replay this agent
if (hasToolCalls) {
continue;
}
// otherwise, move to the next step
stepIndex++;
if (stepIndex >= workflow.steps.length) {
break;
}
}
} finally {
logger.close();
}
// console.log('\n\n', JSON.stringify(messages, null, 2));
} catch (error) {
yield {
type: "workflow-error",
error: error instanceof Error ? error.message : String(error),
};
} finally {
yield {
type: "workflow-end",
};
}
}

View file

@ -0,0 +1,13 @@
import { MessageList } from "../entities/message.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
import { z } from "zod";
import { AgentTool } from "../entities/agent.js";
export type StepInputT = z.infer<typeof MessageList>;
export type StepOutputT = AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown>;
export interface Step {
execute(input: StepInputT): StepOutputT;
tools(): Record<string, z.infer<typeof AgentTool>>;
}

View file

@ -1,5 +1,6 @@
import { z } from "zod";
import { StreamEvent } from "../entities/stream-event.js";
import { WorkflowStreamEvent } from "../entities/workflow-event.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
export interface StreamRendererOptions {
showHeaders?: boolean;
@ -23,7 +24,48 @@ export class StreamRenderer {
};
}
render(event: z.infer<typeof StreamEvent>) {
render(event: z.infer<typeof WorkflowStreamEvent>) {
switch (event.type) {
case "workflow-start": {
this.onWorkflowStart(event.workflowId, event.background);
break;
}
case "workflow-step-start": {
this.onStepStart(event.stepId, event.stepType);
break;
}
case "workflow-step-stream-event": {
this.renderLlmEvent(event.event);
break;
}
case "workflow-step-message": {
// this.onStepMessage(event.stepId, event.message);
break;
}
case "workflow-step-tool-invocation": {
this.onStepToolInvocation(event.stepId, event.toolName, event.input);
break;
}
case "workflow-step-tool-result": {
this.onStepToolResult(event.stepId, event.toolName, event.result);
break;
}
case "workflow-step-end": {
this.onStepEnd(event.stepId);
break;
}
case "workflow-end": {
this.onWorkflowEnd();
break;
}
case "workflow-error": {
this.onWorkflowError(event.error);
break;
}
}
}
private renderLlmEvent(event: z.infer<typeof LlmStepStreamEvent>) {
switch (event.type) {
case "reasoning-start":
this.onReasoningStart();
@ -52,6 +94,58 @@ export class StreamRenderer {
}
}
private onWorkflowStart(workflowId: string, background: boolean) {
this.write("\n");
this.write(this.bold(`▶ Workflow ${workflowId}`));
if (background) this.write(this.dim(" (background)"));
this.write("\n");
}
private onWorkflowEnd() {
this.write(this.bold("\n■ Workflow complete\n"));
}
private onWorkflowError(error: string) {
this.write(this.red(`\n✖ Workflow error: ${error}\n`));
}
private onStepStart(stepId: string, stepType: "agent" | "function") {
this.write("\n");
this.write(this.cyan(`─ Step ${stepId} [${stepType}]`));
this.write("\n");
}
private onStepEnd(stepId: string) {
this.write(this.dim(`✓ Step ${stepId} finished\n`));
}
private onStepMessage(stepId: string, message: any) {
const role = message?.role ?? "message";
const content = message?.content;
this.write(this.bold(`${role}: `));
if (typeof content === "string") {
this.write(content + "\n");
} else {
const pretty = this.truncate(JSON.stringify(message, null, this.options.jsonIndent));
this.write(this.dim("\n" + this.indent(pretty) + "\n"));
}
}
private onStepToolInvocation(stepId: string, toolName: string, input: string) {
this.write(this.cyan(`\n→ Tool invoke ${toolName}`));
if (input && input.length) {
this.write("\n" + this.dim(this.indent(this.truncate(input))) + "\n");
} else {
this.write("\n");
}
}
private onStepToolResult(stepId: string, toolName: string, result: unknown) {
const res = this.truncate(JSON.stringify(result, null, this.options.jsonIndent));
this.write(this.cyan(`\n← Tool result ${toolName}\n`));
this.write(this.dim(this.indent(res)) + "\n");
}
private onReasoningStart() {
if (this.reasoningActive) return;
this.reasoningActive = true;
@ -146,6 +240,10 @@ export class StreamRenderer {
private cyan(text: string): string {
return "\x1b[36m" + text + "\x1b[0m";
}
private red(text: string): string {
return "\x1b[31m" + text + "\x1b[0m";
}
}

View file

@ -0,0 +1,10 @@
import fs from "fs";
import path from "path";
import { WorkDir } from "../config/config.js";
import { Workflow } from "../entities/workflow.js";
export function loadWorkflow(id: string) {
const workflowPath = path.join(WorkDir, "workflows", `${id}.json`);
const workflow = fs.readFileSync(workflowPath, "utf8");
return Workflow.parse(JSON.parse(workflow));
}