diff --git a/apps/cli/src/app.ts b/apps/cli/src/app.ts index 4198972a..2945956c 100644 --- a/apps/cli/src/app.ts +++ b/apps/cli/src/app.ts @@ -1,15 +1,117 @@ -import { executeWorkflow } from "./application/lib/exec-workflow.js"; +import { executeWorkflow, resumeWorkflow } from "./application/lib/exec-workflow.js"; import { StreamRenderer } from "./application/lib/stream-renderer.js"; +import { createInterface } from "node:readline/promises"; +import { stdin as input, stdout as output } from "node:process"; +type ParsedArgs = { + command: "run" | "resume" | "help" | null; + id: string | null; + interactive: boolean; + message: string; +}; -async function runWorkflow(id: string, userInput: string) { - const renderer = new StreamRenderer(); - for await (const event of executeWorkflow(id, userInput)) { - renderer.render(event); +function parseArgs(argv: string[]): ParsedArgs { + const args = argv.slice(2); + if (args.length === 0) { + return { command: "help", id: null, interactive: true, message: "" }; + } + + let command: ParsedArgs["command"] = null; + let id: string | null = null; + let interactive = true; + const messageParts: string[] = []; + + if (args[0] !== "run" && args[0] !== "resume") { + command = "help"; + return { command, id: null, interactive, message: "" }; + } + command = args[0]; + + for (let i = 1; i < args.length; i++) { + const a = args[i]; + if (a.startsWith("--")) { + if (a === "--no-interactive") { + interactive = false; + } else if (a.startsWith("--interactive")) { + const [, value] = a.split("="); + if (value === undefined) { + interactive = true; + } else { + interactive = value !== "false"; + } + } + continue; + } + if (!id) { + id = a; + continue; + } + messageParts.push(a); + } + + return { command, id, interactive, message: messageParts.join(" ") }; +} + +function printUsage(): void { + console.log([ + "Usage:", + " rowboatx run [message...] [--interactive | --no-interactive]", + " rowboatx resume [message...] [--interactive | --no-interactive]", + "", + "Flags:", + " --interactive Run interactively (default: true)", + " --no-interactive Disable interactive prompts", + ].join("\n")); +} + +async function promptForResumeInput(): Promise { + const rl = createInterface({ input, output }); + try { + const answer = await rl.question("Enter input to resume the run: "); + return answer; + } finally { + rl.close(); } } -const workflowId = process.argv[2] ?? "example_workflow"; -const userInputMsg = process.argv[3] ?? ""; +async function render(generator: AsyncGenerator): Promise { + const renderer = new StreamRenderer(); + for await (const event of generator) { + renderer.render(event); + if (event?.type === "error") { + process.exitCode = 1; + } + } +} -runWorkflow(workflowId, userInputMsg); \ No newline at end of file +async function main() { + const { command, id, interactive, message } = parseArgs(process.argv); + + if (command === "help" || !command) { + printUsage(); + return; + } + if (!id) { + printUsage(); + process.exitCode = 1; + return; + } + + switch (command) { + case "run": { + const initialInput = message ?? ""; + await render(executeWorkflow(id, initialInput, interactive)); + break; + } + case "resume": { + const resumeInput = message !== "" ? message : (interactive ? await promptForResumeInput() : ""); + await render(resumeWorkflow(id, resumeInput, interactive)); + break; + } + } +} + +main().catch((err) => { + console.error("Failed:", err instanceof Error ? err.message : String(err)); + process.exitCode = 1; +}); \ No newline at end of file diff --git a/apps/cli/src/application/entities/workflow-event.ts b/apps/cli/src/application/entities/workflow-event.ts index 4811db11..2e463836 100644 --- a/apps/cli/src/application/entities/workflow-event.ts +++ b/apps/cli/src/application/entities/workflow-event.ts @@ -3,67 +3,84 @@ import { LlmStepStreamEvent } from "./llm-step-event.js"; import { Workflow } from "./workflow.js"; import { Message } from "./message.js"; -export const WorkflowStreamStartEvent = z.object({ - type: z.literal("workflow-start"), - workflowId: z.string(), - workflow: Workflow, - background: z.boolean(), +const BaseRunEvent = z.object({ + ts: z.iso.datetime().optional(), }); -export const WorkflowStreamStepStartEvent = z.object({ - type: z.literal("workflow-step-start"), +export const RunStartEvent = BaseRunEvent.extend({ + type: z.literal("start"), + runId: z.string(), + workflowId: z.string(), + workflow: Workflow, + interactive: z.boolean(), +}); + +export const RunStepStartEvent = BaseRunEvent.extend({ + type: z.literal("step-start"), + stepIndex: z.number(), stepId: z.string(), stepType: z.enum(["agent", "function"]), }); -export const WorkflowStreamStepStreamEventEvent = z.object({ - type: z.literal("workflow-step-stream-event"), +export const RunStreamEvent = BaseRunEvent.extend({ + type: z.literal("stream-event"), stepId: z.string(), event: LlmStepStreamEvent, }); -export const WorkflowStreamStepMessageEvent = z.object({ - type: z.literal("workflow-step-message"), +export const RunMessageEvent = BaseRunEvent.extend({ + type: z.literal("message"), stepId: z.string(), message: Message, }); -export const WorkflowStreamStepToolInvocationEvent = z.object({ - type: z.literal("workflow-step-tool-invocation"), +export const RunToolInvocationEvent = BaseRunEvent.extend({ + type: z.literal("tool-invocation"), stepId: z.string(), toolName: z.string(), input: z.string(), }); -export const WorkflowStreamStepToolResultEvent = z.object({ - type: z.literal("workflow-step-tool-result"), +export const RunToolResultEvent = BaseRunEvent.extend({ + type: z.literal("tool-result"), stepId: z.string(), toolName: z.string(), result: z.any(), }); -export const WorkflowStreamStepEndEvent = z.object({ - type: z.literal("workflow-step-end"), - stepId: z.string(), +export const RunStepEndEvent = BaseRunEvent.extend({ + type: z.literal("step-end"), + stepIndex: z.number(), }); -export const WorkflowStreamEndEvent = z.object({ - type: z.literal("workflow-end"), +export const RunEndEvent = BaseRunEvent.extend({ + type: z.literal("end"), }); -export const WorkflowStreamErrorEvent = z.object({ - type: z.literal("workflow-error"), +export const RunPauseEvent = BaseRunEvent.extend({ + type: z.literal("pause-for-human-input"), + toolCallId: z.string(), +}); + +export const RunResumeEvent = BaseRunEvent.extend({ + type: z.literal("resume"), +}); + +export const RunErrorEvent = BaseRunEvent.extend({ + type: z.literal("error"), error: z.string(), }); -export const WorkflowStreamEvent = z.union([ - WorkflowStreamStartEvent, - WorkflowStreamStepStartEvent, - WorkflowStreamStepStreamEventEvent, - WorkflowStreamStepMessageEvent, - WorkflowStreamStepToolInvocationEvent, - WorkflowStreamStepToolResultEvent, - WorkflowStreamStepEndEvent, - WorkflowStreamEndEvent, - WorkflowStreamErrorEvent, +export const RunEvent = z.union([ + RunStartEvent, + RunStepStartEvent, + RunStreamEvent, + RunMessageEvent, + RunToolInvocationEvent, + RunToolResultEvent, + RunStepEndEvent, + RunEndEvent, + RunPauseEvent, + RunResumeEvent, + RunErrorEvent, ]); \ No newline at end of file diff --git a/apps/cli/src/application/lib/exec-tool.ts b/apps/cli/src/application/lib/exec-tool.ts index 1da589ad..b6485355 100644 --- a/apps/cli/src/application/lib/exec-tool.ts +++ b/apps/cli/src/application/lib/exec-tool.ts @@ -66,14 +66,14 @@ async function execBashTool(agentTool: z.infer, input: any): P }; } -async function execAskHumanTool(agentTool: z.infer, input: any): Promise { +export async function execAskHumanTool(agentTool: z.infer, question: string): Promise { const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); let p = new Promise((resolve, reject) => { - rl.question(`>> Provide answer to: ${input.question}:\n\n`, (answer) => { + rl.question(`>> Provide answer to: ${question}:\n\n`, (answer) => { resolve(answer); rl.close(); }); @@ -85,10 +85,10 @@ async function execAskHumanTool(agentTool: z.infer, input: any async function execWorkflowTool(agentTool: z.infer & { type: "workflow" }, input: any): Promise { let lastMsg: z.infer | null = null; for await (const event of executeWorkflow(agentTool.name, input.message)) { - if (event.type === "workflow-step-message" && event.message.role === "assistant") { + if (event.type === "message" && event.message.role === "assistant") { lastMsg = event.message; } - if (event.type === "workflow-error") { + if (event.type === "error") { throw new Error(event.error); } } @@ -117,8 +117,6 @@ export async function execTool(agentTool: z.infer, input: any) switch (agentTool.name) { case "bash": return execBashTool(agentTool, input); - case "ask-human": - return execAskHumanTool(agentTool, input); default: throw new Error(`Unknown builtin tool: ${agentTool.name}`); } diff --git a/apps/cli/src/application/lib/exec-workflow.ts b/apps/cli/src/application/lib/exec-workflow.ts index 39c620d1..8c7c0ea9 100644 --- a/apps/cli/src/application/lib/exec-workflow.ts +++ b/apps/cli/src/application/lib/exec-workflow.ts @@ -1,15 +1,61 @@ import { loadWorkflow } from "./utils.js"; -import { randomId } from "./random-id.js"; -import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage } from "../entities/message.js"; +import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage, ToolCallPart } from "../entities/message.js"; import { LlmStepStreamEvent } from "../entities/llm-step-event.js"; import { AgentNode } from "./agent.js"; import { z } from "zod"; import path from "path"; import { WorkDir } from "../config/config.js"; import fs from "fs"; +import { createInterface, Interface } from "node:readline/promises"; import { FunctionsRegistry } from "../registry/functions.js"; -import { WorkflowStreamEvent } from "../entities/workflow-event.js"; -import { execTool } from "./exec-tool.js"; +import { RunEvent } from "../entities/workflow-event.js"; +import { execAskHumanTool, execTool } from "./exec-tool.js"; +import { AgentTool } from "../entities/agent.js"; +import { runIdGenerator } from "./run-id-gen.js"; +import { Workflow } from "../entities/workflow.js"; + +const MappedToolCall = z.object({ + toolCall: ToolCallPart, + agentTool: AgentTool, +}); + +const State = z.object({ + stepIndex: z.number(), + messages: MessageList, + workflow: Workflow.nullable(), + pendingToolCallId: z.string().nullable(), +}); + +class StateBuilder { + private state: z.infer = { + stepIndex: 0, + messages: [], + workflow: null, + pendingToolCallId: null, + }; + + ingest(event: z.infer) { + switch (event.type) { + case "start": + this.state.workflow = event.workflow; + break; + case "step-start": + this.state.stepIndex = event.stepIndex; + break; + case "message": + this.state.messages.push(event.message); + this.state.pendingToolCallId = null; + break; + case "pause-for-human-input": + this.state.pendingToolCallId = event.toolCallId; + break; + } + } + + get(): z.infer { + return this.state; + } +} class RunLogger { private logFile: string; @@ -24,15 +70,15 @@ class RunLogger { constructor(workflowId: string, runId: string) { this.ensureRunsDir(workflowId); - this.logFile = path.join(WorkDir, "runs", `${workflowId}`, `${runId}.jsonl`); + this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); this.fileHandle = fs.createWriteStream(this.logFile, { flags: "a", encoding: "utf8", }); } - log(message: z.infer) { - this.fileHandle.write(JSON.stringify(message) + "\n"); + log(event: z.infer) { + this.fileHandle.write(JSON.stringify(event) + "\n"); } close() { @@ -40,6 +86,23 @@ class RunLogger { } } +class LogAndYield { + private logger: RunLogger + + constructor(logger: RunLogger) { + this.logger = logger; + } + + async *logAndYield(event: z.infer): AsyncGenerator, void, unknown> { + const ev = { + ...event, + ts: new Date().toISOString(), + } + this.logger.log(ev); + yield ev; + } +} + class StreamStepMessageBuilder { private parts: z.infer[] = []; private textBuffer: string = ""; @@ -98,118 +161,286 @@ function loadFunction(id: string) { return func; } -export async function* executeWorkflow(id: string, input: string, background: boolean = false): AsyncGenerator, void, unknown> { +export async function* executeWorkflow(id: string, input: string, interactive: boolean = true): AsyncGenerator, void, unknown> { + const runId = runIdGenerator.next(); + yield* runFromState({ + id, + runId, + state: { + stepIndex: 0, + messages: [{ + role: "user", + content: input, + }], + workflow: null, + pendingToolCallId: null, + }, + interactive, + }); +} + +export async function* resumeWorkflow(runId: string, input: string, interactive: boolean = false): AsyncGenerator, void, unknown> { + // read a run.jsonl file line by line and build state + const builder = new StateBuilder(); + let rl: Interface | null = null; + let stream: fs.ReadStream | null = null; try { - const workflow = loadWorkflow(id); - const runId = await randomId(); - - yield { - type: "workflow-start", - workflowId: id, - workflow: workflow, - background: background, - }; - - const logger = new RunLogger(id, runId); - - const messages: z.infer = [{ - role: "user", - content: input ?? "" - }]; - - try { - let stepIndex = 0; - - while (true) { - const step = workflow.steps[stepIndex]; - const node = step.type === "agent" ? new AgentNode(step.id, background) : loadFunction(step.id); - const messageBuilder = new StreamStepMessageBuilder(); - - // stream response from agent - for await (const event of node.execute(messages)) { - // console.log(" - event", JSON.stringify(event)); - messageBuilder.ingest(event); - yield { - type: "workflow-step-stream-event", - stepId: step.id, - event: event, - }; - } - - // build and emit final message from agent response - const msg = messageBuilder.get(); - logger.log(msg); - messages.push(msg); - yield { - type: "workflow-step-message", - stepId: step.id, - message: msg, - }; - - // if the agent response contains tool calls, execute them - const tools = node.tools(); - let hasToolCalls = false; - if (msg.content instanceof Array) { - for (const part of msg.content) { - if (part.type === "tool-call") { - hasToolCalls = true; - if (!(part.toolName in tools)) { - throw new Error(`Tool ${part.toolName} not found`); - } - yield { - type: "workflow-step-tool-invocation", - stepId: step.id, - toolName: part.toolName, - input: part.arguments, - } - const result = await execTool(tools[part.toolName], part.arguments); - const resultMsg: z.infer = { - role: "tool", - content: JSON.stringify(result), - toolCallId: part.toolCallId, - toolName: part.toolName, - }; - logger.log(resultMsg); - messages.push(resultMsg); - yield { - type: "workflow-step-tool-result", - stepId: step.id, - toolName: part.toolName, - result: result, - }; - yield { - type: "workflow-step-message", - stepId: step.id, - message: resultMsg, - }; - } - } - } - - // if the agent response had tool calls, replay this agent - if (hasToolCalls) { - continue; - } - - // otherwise, move to the next step - stepIndex++; - if (stepIndex >= workflow.steps.length) { - break; - } + const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); + stream = fs.createReadStream(logFile, { encoding: "utf8" }); + rl = createInterface({ input: stream, crlfDelay: Infinity }); + for await (const line of rl) { + if (line.trim() === "") { + continue; } - } finally { - logger.close(); + // console.error('processing line', line); + const parsed = JSON.parse(line); + // console.error('parsed'); + const event = RunEvent.parse(parsed); + // console.error('zod parsed'); + builder.ingest(event); + } + } catch (error) { + // console.error("Failed to resume workflow:", error); + // yield { + // type: "error", + // error: error instanceof Error ? error.message : String(error), + // }; + } finally { + rl?.close(); + stream?.close(); + } + + const { workflow, messages, stepIndex, pendingToolCallId } = builder.get(); + if (!workflow) { + throw new Error(`Workflow not found for run ${runId}`); + } + if (!pendingToolCallId) { + throw new Error(`No pending tool call found for run ${runId}`); + } + const stepId = workflow.steps[stepIndex].id; + + // append user input as message + const logger = new RunLogger(workflow.name, runId); + const ly = new LogAndYield(logger); + yield *ly.logAndYield({ + type: "resume" + }); + + // append user input as message + const resultMsg: z.infer = { + role: "tool", + content: JSON.stringify(input), + toolCallId: pendingToolCallId, + toolName: "ask-human", + }; + messages.push(resultMsg); + yield* ly.logAndYield({ + type: "tool-result", + stepId, + toolName: "ask-human", + result: input, + }); + yield* ly.logAndYield({ + type: "message", + stepId, + message: resultMsg, + }); + + yield* runFromState({ + id: workflow.name, + runId, + state: { + stepIndex, + messages, + workflow, + pendingToolCallId, + }, + interactive, + }); +} + +async function* runFromState(opts: { + id: string; + runId: string; + state: z.infer; + interactive: boolean; +}) { + const { id, runId, state, interactive } = opts; + let stepIndex = state.stepIndex; + let messages = [...state.messages]; + let workflow = state.workflow; + + const logger = new RunLogger(id, runId); + const ly = new LogAndYield(logger); + + try { + if (!workflow) { + workflow = loadWorkflow(id); + + yield* ly.logAndYield({ + type: "start", + runId, + workflowId: id, + workflow, + interactive, + }); } + while (true) { + const step = workflow.steps[stepIndex]; + const node = step.type === "agent" ? new AgentNode(step.id, interactive) : loadFunction(step.id); + + yield* ly.logAndYield({ + type: "step-start", + stepIndex, + stepId: step.id, + stepType: step.type, + }); + + const messageBuilder = new StreamStepMessageBuilder(); + + // stream response from agent + for await (const event of node.execute(messages)) { + // console.log(" - event", JSON.stringify(event)); + messageBuilder.ingest(event); + yield* ly.logAndYield({ + type: "stream-event", + stepId: step.id, + event: event, + }); + } + + // build and emit final message from agent response + const msg = messageBuilder.get(); + messages.push(msg); + yield* ly.logAndYield({ + type: "message", + stepId: step.id, + message: msg, + }); + + // handle tool calls + const tools = node.tools(); + const mappedToolCalls: z.infer[] = []; + let msgToolCallParts: z.infer[] = []; + if (msg.content instanceof Array) { + msgToolCallParts = msg.content.filter(part => part.type === "tool-call"); + } + const hasToolCalls = msgToolCallParts.length > 0; + + // validate and map tool calls + for (const part of msgToolCallParts) { + const agentTool = tools[part.toolName]; + if (!agentTool) { + throw new Error(`Tool ${part.toolName} not found`); + } + mappedToolCalls.push({ + toolCall: part, + agentTool: agentTool, + }); + } + + // first, exec all tool calls other than ask-human + for (const call of mappedToolCalls) { + const { agentTool, toolCall } = call; + if (agentTool.type === "builtin" && agentTool.name === "ask-human") { + continue; + } + yield* ly.logAndYield({ + type: "tool-invocation", + stepId: step.id, + toolName: toolCall.toolName, + input: JSON.stringify(toolCall.arguments), + }); + const result = await execTool(agentTool, toolCall.arguments); + const resultMsg: z.infer = { + role: "tool", + content: JSON.stringify(result), + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + }; + messages.push(resultMsg); + yield* ly.logAndYield({ + type: "tool-result", + stepId: step.id, + toolName: toolCall.toolName, + result: result, + }); + yield* ly.logAndYield({ + type: "message", + stepId: step.id, + message: resultMsg, + }); + } + + // handle ask-tool call execution + for (const call of mappedToolCalls) { + const { agentTool, toolCall } = call; + if (agentTool.type !== "builtin" || agentTool.name !== "ask-human") { + continue; + } + yield* ly.logAndYield({ + type: "tool-invocation", + stepId: step.id, + toolName: toolCall.toolName, + input: JSON.stringify(toolCall.arguments), + }); + + // if running in background mode, exit here + if (!interactive) { + yield* ly.logAndYield({ + type: "pause-for-human-input", + toolCallId: toolCall.toolCallId, + }); + return; + } + const result = await execAskHumanTool(agentTool, toolCall.arguments.question as string); + const resultMsg: z.infer = { + role: "tool", + content: JSON.stringify(result), + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + }; + messages.push(resultMsg); + yield* ly.logAndYield({ + type: "tool-result", + stepId: step.id, + toolName: toolCall.toolName, + result: result, + }); + yield* ly.logAndYield({ + type: "message", + stepId: step.id, + message: resultMsg, + }); + } + + yield* ly.logAndYield({ + type: "step-end", + stepIndex, + }); + + // if the agent response had tool calls, replay this agent + if (hasToolCalls) { + continue; + } + + // otherwise, move to the next step + stepIndex++; + if (stepIndex >= workflow.steps.length) { + yield* ly.logAndYield({ + type: "end", + }); + break; + } + } // console.log('\n\n', JSON.stringify(messages, null, 2)); } catch (error) { - yield { - type: "workflow-error", + yield* ly.logAndYield({ + type: "error", error: error instanceof Error ? error.message : String(error), - }; + }); } finally { - yield { - type: "workflow-end", - }; + logger.close(); } -} \ No newline at end of file +} diff --git a/apps/cli/src/application/lib/stream-renderer.ts b/apps/cli/src/application/lib/stream-renderer.ts index 0bf06986..b5a2cc57 100644 --- a/apps/cli/src/application/lib/stream-renderer.ts +++ b/apps/cli/src/application/lib/stream-renderer.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import { WorkflowStreamEvent } from "../entities/workflow-event.js"; +import { RunEvent } from "../entities/workflow-event.js"; import { LlmStepStreamEvent } from "../entities/llm-step-event.js"; export interface StreamRendererOptions { @@ -24,41 +24,41 @@ export class StreamRenderer { }; } - render(event: z.infer) { + render(event: z.infer) { switch (event.type) { - case "workflow-start": { - this.onWorkflowStart(event.workflowId, event.background); + case "start": { + this.onWorkflowStart(event.workflowId, event.runId, event.interactive); break; } - case "workflow-step-start": { - this.onStepStart(event.stepId, event.stepType); + case "step-start": { + this.onStepStart(event.stepIndex, event.stepId, event.stepType); break; } - case "workflow-step-stream-event": { + case "stream-event": { this.renderLlmEvent(event.event); break; } - case "workflow-step-message": { + case "message": { // this.onStepMessage(event.stepId, event.message); break; } - case "workflow-step-tool-invocation": { + case "tool-invocation": { this.onStepToolInvocation(event.stepId, event.toolName, event.input); break; } - case "workflow-step-tool-result": { + case "tool-result": { this.onStepToolResult(event.stepId, event.toolName, event.result); break; } - case "workflow-step-end": { - this.onStepEnd(event.stepId); + case "step-end": { + this.onStepEnd(event.stepIndex); break; } - case "workflow-end": { + case "end": { this.onWorkflowEnd(); break; } - case "workflow-error": { + case "error": { this.onWorkflowError(event.error); break; } @@ -94,10 +94,10 @@ export class StreamRenderer { } } - private onWorkflowStart(workflowId: string, background: boolean) { + private onWorkflowStart(workflowId: string, runId: string, interactive: boolean) { this.write("\n"); - this.write(this.bold(`▶ Workflow ${workflowId}`)); - if (background) this.write(this.dim(" (background)")); + this.write(this.bold(`▶ Workflow ${workflowId} (run ${runId})`)); + if (!interactive) this.write(this.dim(" (--no-interactive)")); this.write("\n"); } @@ -109,17 +109,17 @@ export class StreamRenderer { this.write(this.red(`\n✖ Workflow error: ${error}\n`)); } - private onStepStart(stepId: string, stepType: "agent" | "function") { + private onStepStart(stepIndex: number, stepId: string, stepType: "agent" | "function") { this.write("\n"); - this.write(this.cyan(`─ Step ${stepId} [${stepType}]`)); + this.write(this.cyan(`─ Step ${stepIndex} [${stepType}]`)); this.write("\n"); } - private onStepEnd(stepId: string) { - this.write(this.dim(`✓ Step ${stepId} finished\n`)); + private onStepEnd(stepIndex: number) { + this.write(this.dim(`✓ Step ${stepIndex} finished\n`)); } - private onStepMessage(stepId: string, message: any) { + private onStepMessage(stepIndex: number, message: any) { const role = message?.role ?? "message"; const content = message?.content; this.write(this.bold(`${role}: `));