complete ask-human implementation

This commit is contained in:
Ramnique Singh 2025-11-11 12:32:46 +05:30
parent 9e89a81c8d
commit 54bdbe73c0
5 changed files with 529 additions and 181 deletions

View file

@ -1,15 +1,117 @@
import { executeWorkflow } from "./application/lib/exec-workflow.js";
import { executeWorkflow, resumeWorkflow } from "./application/lib/exec-workflow.js";
import { StreamRenderer } from "./application/lib/stream-renderer.js";
import { createInterface } from "node:readline/promises";
import { stdin as input, stdout as output } from "node:process";
type ParsedArgs = {
command: "run" | "resume" | "help" | null;
id: string | null;
interactive: boolean;
message: string;
};
async function runWorkflow(id: string, userInput: string) {
const renderer = new StreamRenderer();
for await (const event of executeWorkflow(id, userInput)) {
renderer.render(event);
function parseArgs(argv: string[]): ParsedArgs {
const args = argv.slice(2);
if (args.length === 0) {
return { command: "help", id: null, interactive: true, message: "" };
}
let command: ParsedArgs["command"] = null;
let id: string | null = null;
let interactive = true;
const messageParts: string[] = [];
if (args[0] !== "run" && args[0] !== "resume") {
command = "help";
return { command, id: null, interactive, message: "" };
}
command = args[0];
for (let i = 1; i < args.length; i++) {
const a = args[i];
if (a.startsWith("--")) {
if (a === "--no-interactive") {
interactive = false;
} else if (a.startsWith("--interactive")) {
const [, value] = a.split("=");
if (value === undefined) {
interactive = true;
} else {
interactive = value !== "false";
}
}
continue;
}
if (!id) {
id = a;
continue;
}
messageParts.push(a);
}
return { command, id, interactive, message: messageParts.join(" ") };
}
function printUsage(): void {
console.log([
"Usage:",
" rowboatx run <workflow_id> [message...] [--interactive | --no-interactive]",
" rowboatx resume <run_id> [message...] [--interactive | --no-interactive]",
"",
"Flags:",
" --interactive Run interactively (default: true)",
" --no-interactive Disable interactive prompts",
].join("\n"));
}
async function promptForResumeInput(): Promise<string> {
const rl = createInterface({ input, output });
try {
const answer = await rl.question("Enter input to resume the run: ");
return answer;
} finally {
rl.close();
}
}
const workflowId = process.argv[2] ?? "example_workflow";
const userInputMsg = process.argv[3] ?? "";
async function render(generator: AsyncGenerator<any, void, unknown>): Promise<void> {
const renderer = new StreamRenderer();
for await (const event of generator) {
renderer.render(event);
if (event?.type === "error") {
process.exitCode = 1;
}
}
}
runWorkflow(workflowId, userInputMsg);
async function main() {
const { command, id, interactive, message } = parseArgs(process.argv);
if (command === "help" || !command) {
printUsage();
return;
}
if (!id) {
printUsage();
process.exitCode = 1;
return;
}
switch (command) {
case "run": {
const initialInput = message ?? "";
await render(executeWorkflow(id, initialInput, interactive));
break;
}
case "resume": {
const resumeInput = message !== "" ? message : (interactive ? await promptForResumeInput() : "");
await render(resumeWorkflow(id, resumeInput, interactive));
break;
}
}
}
main().catch((err) => {
console.error("Failed:", err instanceof Error ? err.message : String(err));
process.exitCode = 1;
});

View file

@ -3,67 +3,84 @@ import { LlmStepStreamEvent } from "./llm-step-event.js";
import { Workflow } from "./workflow.js";
import { Message } from "./message.js";
export const WorkflowStreamStartEvent = z.object({
type: z.literal("workflow-start"),
workflowId: z.string(),
workflow: Workflow,
background: z.boolean(),
const BaseRunEvent = z.object({
ts: z.iso.datetime().optional(),
});
export const WorkflowStreamStepStartEvent = z.object({
type: z.literal("workflow-step-start"),
export const RunStartEvent = BaseRunEvent.extend({
type: z.literal("start"),
runId: z.string(),
workflowId: z.string(),
workflow: Workflow,
interactive: z.boolean(),
});
export const RunStepStartEvent = BaseRunEvent.extend({
type: z.literal("step-start"),
stepIndex: z.number(),
stepId: z.string(),
stepType: z.enum(["agent", "function"]),
});
export const WorkflowStreamStepStreamEventEvent = z.object({
type: z.literal("workflow-step-stream-event"),
export const RunStreamEvent = BaseRunEvent.extend({
type: z.literal("stream-event"),
stepId: z.string(),
event: LlmStepStreamEvent,
});
export const WorkflowStreamStepMessageEvent = z.object({
type: z.literal("workflow-step-message"),
export const RunMessageEvent = BaseRunEvent.extend({
type: z.literal("message"),
stepId: z.string(),
message: Message,
});
export const WorkflowStreamStepToolInvocationEvent = z.object({
type: z.literal("workflow-step-tool-invocation"),
export const RunToolInvocationEvent = BaseRunEvent.extend({
type: z.literal("tool-invocation"),
stepId: z.string(),
toolName: z.string(),
input: z.string(),
});
export const WorkflowStreamStepToolResultEvent = z.object({
type: z.literal("workflow-step-tool-result"),
export const RunToolResultEvent = BaseRunEvent.extend({
type: z.literal("tool-result"),
stepId: z.string(),
toolName: z.string(),
result: z.any(),
});
export const WorkflowStreamStepEndEvent = z.object({
type: z.literal("workflow-step-end"),
stepId: z.string(),
export const RunStepEndEvent = BaseRunEvent.extend({
type: z.literal("step-end"),
stepIndex: z.number(),
});
export const WorkflowStreamEndEvent = z.object({
type: z.literal("workflow-end"),
export const RunEndEvent = BaseRunEvent.extend({
type: z.literal("end"),
});
export const WorkflowStreamErrorEvent = z.object({
type: z.literal("workflow-error"),
export const RunPauseEvent = BaseRunEvent.extend({
type: z.literal("pause-for-human-input"),
toolCallId: z.string(),
});
export const RunResumeEvent = BaseRunEvent.extend({
type: z.literal("resume"),
});
export const RunErrorEvent = BaseRunEvent.extend({
type: z.literal("error"),
error: z.string(),
});
export const WorkflowStreamEvent = z.union([
WorkflowStreamStartEvent,
WorkflowStreamStepStartEvent,
WorkflowStreamStepStreamEventEvent,
WorkflowStreamStepMessageEvent,
WorkflowStreamStepToolInvocationEvent,
WorkflowStreamStepToolResultEvent,
WorkflowStreamStepEndEvent,
WorkflowStreamEndEvent,
WorkflowStreamErrorEvent,
export const RunEvent = z.union([
RunStartEvent,
RunStepStartEvent,
RunStreamEvent,
RunMessageEvent,
RunToolInvocationEvent,
RunToolResultEvent,
RunStepEndEvent,
RunEndEvent,
RunPauseEvent,
RunResumeEvent,
RunErrorEvent,
]);

View file

@ -66,14 +66,14 @@ async function execBashTool(agentTool: z.infer<typeof AgentTool>, input: any): P
};
}
async function execAskHumanTool(agentTool: z.infer<typeof AgentTool>, input: any): Promise<any> {
export async function execAskHumanTool(agentTool: z.infer<typeof AgentTool>, question: string): Promise<string> {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
let p = new Promise<string>((resolve, reject) => {
rl.question(`>> Provide answer to: ${input.question}:\n\n`, (answer) => {
rl.question(`>> Provide answer to: ${question}:\n\n`, (answer) => {
resolve(answer);
rl.close();
});
@ -85,10 +85,10 @@ async function execAskHumanTool(agentTool: z.infer<typeof AgentTool>, input: any
async function execWorkflowTool(agentTool: z.infer<typeof AgentTool> & { type: "workflow" }, input: any): Promise<any> {
let lastMsg: z.infer<typeof AssistantMessage> | null = null;
for await (const event of executeWorkflow(agentTool.name, input.message)) {
if (event.type === "workflow-step-message" && event.message.role === "assistant") {
if (event.type === "message" && event.message.role === "assistant") {
lastMsg = event.message;
}
if (event.type === "workflow-error") {
if (event.type === "error") {
throw new Error(event.error);
}
}
@ -117,8 +117,6 @@ export async function execTool(agentTool: z.infer<typeof AgentTool>, input: any)
switch (agentTool.name) {
case "bash":
return execBashTool(agentTool, input);
case "ask-human":
return execAskHumanTool(agentTool, input);
default:
throw new Error(`Unknown builtin tool: ${agentTool.name}`);
}

View file

@ -1,15 +1,61 @@
import { loadWorkflow } from "./utils.js";
import { randomId } from "./random-id.js";
import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage } from "../entities/message.js";
import { MessageList, AssistantMessage, AssistantContentPart, Message, ToolMessage, ToolCallPart } from "../entities/message.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
import { AgentNode } from "./agent.js";
import { z } from "zod";
import path from "path";
import { WorkDir } from "../config/config.js";
import fs from "fs";
import { createInterface, Interface } from "node:readline/promises";
import { FunctionsRegistry } from "../registry/functions.js";
import { WorkflowStreamEvent } from "../entities/workflow-event.js";
import { execTool } from "./exec-tool.js";
import { RunEvent } from "../entities/workflow-event.js";
import { execAskHumanTool, execTool } from "./exec-tool.js";
import { AgentTool } from "../entities/agent.js";
import { runIdGenerator } from "./run-id-gen.js";
import { Workflow } from "../entities/workflow.js";
const MappedToolCall = z.object({
toolCall: ToolCallPart,
agentTool: AgentTool,
});
const State = z.object({
stepIndex: z.number(),
messages: MessageList,
workflow: Workflow.nullable(),
pendingToolCallId: z.string().nullable(),
});
class StateBuilder {
private state: z.infer<typeof State> = {
stepIndex: 0,
messages: [],
workflow: null,
pendingToolCallId: null,
};
ingest(event: z.infer<typeof RunEvent>) {
switch (event.type) {
case "start":
this.state.workflow = event.workflow;
break;
case "step-start":
this.state.stepIndex = event.stepIndex;
break;
case "message":
this.state.messages.push(event.message);
this.state.pendingToolCallId = null;
break;
case "pause-for-human-input":
this.state.pendingToolCallId = event.toolCallId;
break;
}
}
get(): z.infer<typeof State> {
return this.state;
}
}
class RunLogger {
private logFile: string;
@ -24,15 +70,15 @@ class RunLogger {
constructor(workflowId: string, runId: string) {
this.ensureRunsDir(workflowId);
this.logFile = path.join(WorkDir, "runs", `${workflowId}`, `${runId}.jsonl`);
this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
this.fileHandle = fs.createWriteStream(this.logFile, {
flags: "a",
encoding: "utf8",
});
}
log(message: z.infer<typeof Message>) {
this.fileHandle.write(JSON.stringify(message) + "\n");
log(event: z.infer<typeof RunEvent>) {
this.fileHandle.write(JSON.stringify(event) + "\n");
}
close() {
@ -40,6 +86,23 @@ class RunLogger {
}
}
class LogAndYield {
private logger: RunLogger
constructor(logger: RunLogger) {
this.logger = logger;
}
async *logAndYield(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const ev = {
...event,
ts: new Date().toISOString(),
}
this.logger.log(ev);
yield ev;
}
}
class StreamStepMessageBuilder {
private parts: z.infer<typeof AssistantContentPart>[] = [];
private textBuffer: string = "";
@ -98,118 +161,286 @@ function loadFunction(id: string) {
return func;
}
export async function* executeWorkflow(id: string, input: string, background: boolean = false): AsyncGenerator<z.infer<typeof WorkflowStreamEvent>, void, unknown> {
export async function* executeWorkflow(id: string, input: string, interactive: boolean = true): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const runId = runIdGenerator.next();
yield* runFromState({
id,
runId,
state: {
stepIndex: 0,
messages: [{
role: "user",
content: input,
}],
workflow: null,
pendingToolCallId: null,
},
interactive,
});
}
export async function* resumeWorkflow(runId: string, input: string, interactive: boolean = false): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
// read a run.jsonl file line by line and build state
const builder = new StateBuilder();
let rl: Interface | null = null;
let stream: fs.ReadStream | null = null;
try {
const workflow = loadWorkflow(id);
const runId = await randomId();
yield {
type: "workflow-start",
workflowId: id,
workflow: workflow,
background: background,
};
const logger = new RunLogger(id, runId);
const messages: z.infer<typeof MessageList> = [{
role: "user",
content: input ?? ""
}];
try {
let stepIndex = 0;
while (true) {
const step = workflow.steps[stepIndex];
const node = step.type === "agent" ? new AgentNode(step.id, background) : loadFunction(step.id);
const messageBuilder = new StreamStepMessageBuilder();
// stream response from agent
for await (const event of node.execute(messages)) {
// console.log(" - event", JSON.stringify(event));
messageBuilder.ingest(event);
yield {
type: "workflow-step-stream-event",
stepId: step.id,
event: event,
};
}
// build and emit final message from agent response
const msg = messageBuilder.get();
logger.log(msg);
messages.push(msg);
yield {
type: "workflow-step-message",
stepId: step.id,
message: msg,
};
// if the agent response contains tool calls, execute them
const tools = node.tools();
let hasToolCalls = false;
if (msg.content instanceof Array) {
for (const part of msg.content) {
if (part.type === "tool-call") {
hasToolCalls = true;
if (!(part.toolName in tools)) {
throw new Error(`Tool ${part.toolName} not found`);
}
yield {
type: "workflow-step-tool-invocation",
stepId: step.id,
toolName: part.toolName,
input: part.arguments,
}
const result = await execTool(tools[part.toolName], part.arguments);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: part.toolCallId,
toolName: part.toolName,
};
logger.log(resultMsg);
messages.push(resultMsg);
yield {
type: "workflow-step-tool-result",
stepId: step.id,
toolName: part.toolName,
result: result,
};
yield {
type: "workflow-step-message",
stepId: step.id,
message: resultMsg,
};
}
}
}
// if the agent response had tool calls, replay this agent
if (hasToolCalls) {
continue;
}
// otherwise, move to the next step
stepIndex++;
if (stepIndex >= workflow.steps.length) {
break;
}
const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
stream = fs.createReadStream(logFile, { encoding: "utf8" });
rl = createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
if (line.trim() === "") {
continue;
}
} finally {
logger.close();
// console.error('processing line', line);
const parsed = JSON.parse(line);
// console.error('parsed');
const event = RunEvent.parse(parsed);
// console.error('zod parsed');
builder.ingest(event);
}
} catch (error) {
// console.error("Failed to resume workflow:", error);
// yield {
// type: "error",
// error: error instanceof Error ? error.message : String(error),
// };
} finally {
rl?.close();
stream?.close();
}
const { workflow, messages, stepIndex, pendingToolCallId } = builder.get();
if (!workflow) {
throw new Error(`Workflow not found for run ${runId}`);
}
if (!pendingToolCallId) {
throw new Error(`No pending tool call found for run ${runId}`);
}
const stepId = workflow.steps[stepIndex].id;
// append user input as message
const logger = new RunLogger(workflow.name, runId);
const ly = new LogAndYield(logger);
yield *ly.logAndYield({
type: "resume"
});
// append user input as message
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(input),
toolCallId: pendingToolCallId,
toolName: "ask-human",
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
stepId,
toolName: "ask-human",
result: input,
});
yield* ly.logAndYield({
type: "message",
stepId,
message: resultMsg,
});
yield* runFromState({
id: workflow.name,
runId,
state: {
stepIndex,
messages,
workflow,
pendingToolCallId,
},
interactive,
});
}
async function* runFromState(opts: {
id: string;
runId: string;
state: z.infer<typeof State>;
interactive: boolean;
}) {
const { id, runId, state, interactive } = opts;
let stepIndex = state.stepIndex;
let messages = [...state.messages];
let workflow = state.workflow;
const logger = new RunLogger(id, runId);
const ly = new LogAndYield(logger);
try {
if (!workflow) {
workflow = loadWorkflow(id);
yield* ly.logAndYield({
type: "start",
runId,
workflowId: id,
workflow,
interactive,
});
}
while (true) {
const step = workflow.steps[stepIndex];
const node = step.type === "agent" ? new AgentNode(step.id, interactive) : loadFunction(step.id);
yield* ly.logAndYield({
type: "step-start",
stepIndex,
stepId: step.id,
stepType: step.type,
});
const messageBuilder = new StreamStepMessageBuilder();
// stream response from agent
for await (const event of node.execute(messages)) {
// console.log(" - event", JSON.stringify(event));
messageBuilder.ingest(event);
yield* ly.logAndYield({
type: "stream-event",
stepId: step.id,
event: event,
});
}
// build and emit final message from agent response
const msg = messageBuilder.get();
messages.push(msg);
yield* ly.logAndYield({
type: "message",
stepId: step.id,
message: msg,
});
// handle tool calls
const tools = node.tools();
const mappedToolCalls: z.infer<typeof MappedToolCall>[] = [];
let msgToolCallParts: z.infer<typeof ToolCallPart>[] = [];
if (msg.content instanceof Array) {
msgToolCallParts = msg.content.filter(part => part.type === "tool-call");
}
const hasToolCalls = msgToolCallParts.length > 0;
// validate and map tool calls
for (const part of msgToolCallParts) {
const agentTool = tools[part.toolName];
if (!agentTool) {
throw new Error(`Tool ${part.toolName} not found`);
}
mappedToolCalls.push({
toolCall: part,
agentTool: agentTool,
});
}
// first, exec all tool calls other than ask-human
for (const call of mappedToolCalls) {
const { agentTool, toolCall } = call;
if (agentTool.type === "builtin" && agentTool.name === "ask-human") {
continue;
}
yield* ly.logAndYield({
type: "tool-invocation",
stepId: step.id,
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
});
const result = await execTool(agentTool, toolCall.arguments);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
stepId: step.id,
toolName: toolCall.toolName,
result: result,
});
yield* ly.logAndYield({
type: "message",
stepId: step.id,
message: resultMsg,
});
}
// handle ask-tool call execution
for (const call of mappedToolCalls) {
const { agentTool, toolCall } = call;
if (agentTool.type !== "builtin" || agentTool.name !== "ask-human") {
continue;
}
yield* ly.logAndYield({
type: "tool-invocation",
stepId: step.id,
toolName: toolCall.toolName,
input: JSON.stringify(toolCall.arguments),
});
// if running in background mode, exit here
if (!interactive) {
yield* ly.logAndYield({
type: "pause-for-human-input",
toolCallId: toolCall.toolCallId,
});
return;
}
const result = await execAskHumanTool(agentTool, toolCall.arguments.question as string);
const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool",
content: JSON.stringify(result),
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
};
messages.push(resultMsg);
yield* ly.logAndYield({
type: "tool-result",
stepId: step.id,
toolName: toolCall.toolName,
result: result,
});
yield* ly.logAndYield({
type: "message",
stepId: step.id,
message: resultMsg,
});
}
yield* ly.logAndYield({
type: "step-end",
stepIndex,
});
// if the agent response had tool calls, replay this agent
if (hasToolCalls) {
continue;
}
// otherwise, move to the next step
stepIndex++;
if (stepIndex >= workflow.steps.length) {
yield* ly.logAndYield({
type: "end",
});
break;
}
}
// console.log('\n\n', JSON.stringify(messages, null, 2));
} catch (error) {
yield {
type: "workflow-error",
yield* ly.logAndYield({
type: "error",
error: error instanceof Error ? error.message : String(error),
};
});
} finally {
yield {
type: "workflow-end",
};
logger.close();
}
}
}

View file

@ -1,5 +1,5 @@
import { z } from "zod";
import { WorkflowStreamEvent } from "../entities/workflow-event.js";
import { RunEvent } from "../entities/workflow-event.js";
import { LlmStepStreamEvent } from "../entities/llm-step-event.js";
export interface StreamRendererOptions {
@ -24,41 +24,41 @@ export class StreamRenderer {
};
}
render(event: z.infer<typeof WorkflowStreamEvent>) {
render(event: z.infer<typeof RunEvent>) {
switch (event.type) {
case "workflow-start": {
this.onWorkflowStart(event.workflowId, event.background);
case "start": {
this.onWorkflowStart(event.workflowId, event.runId, event.interactive);
break;
}
case "workflow-step-start": {
this.onStepStart(event.stepId, event.stepType);
case "step-start": {
this.onStepStart(event.stepIndex, event.stepId, event.stepType);
break;
}
case "workflow-step-stream-event": {
case "stream-event": {
this.renderLlmEvent(event.event);
break;
}
case "workflow-step-message": {
case "message": {
// this.onStepMessage(event.stepId, event.message);
break;
}
case "workflow-step-tool-invocation": {
case "tool-invocation": {
this.onStepToolInvocation(event.stepId, event.toolName, event.input);
break;
}
case "workflow-step-tool-result": {
case "tool-result": {
this.onStepToolResult(event.stepId, event.toolName, event.result);
break;
}
case "workflow-step-end": {
this.onStepEnd(event.stepId);
case "step-end": {
this.onStepEnd(event.stepIndex);
break;
}
case "workflow-end": {
case "end": {
this.onWorkflowEnd();
break;
}
case "workflow-error": {
case "error": {
this.onWorkflowError(event.error);
break;
}
@ -94,10 +94,10 @@ export class StreamRenderer {
}
}
private onWorkflowStart(workflowId: string, background: boolean) {
private onWorkflowStart(workflowId: string, runId: string, interactive: boolean) {
this.write("\n");
this.write(this.bold(`▶ Workflow ${workflowId}`));
if (background) this.write(this.dim(" (background)"));
this.write(this.bold(`▶ Workflow ${workflowId} (run ${runId})`));
if (!interactive) this.write(this.dim(" (--no-interactive)"));
this.write("\n");
}
@ -109,17 +109,17 @@ export class StreamRenderer {
this.write(this.red(`\n✖ Workflow error: ${error}\n`));
}
private onStepStart(stepId: string, stepType: "agent" | "function") {
private onStepStart(stepIndex: number, stepId: string, stepType: "agent" | "function") {
this.write("\n");
this.write(this.cyan(`─ Step ${stepId} [${stepType}]`));
this.write(this.cyan(`─ Step ${stepIndex} [${stepType}]`));
this.write("\n");
}
private onStepEnd(stepId: string) {
this.write(this.dim(`✓ Step ${stepId} finished\n`));
private onStepEnd(stepIndex: number) {
this.write(this.dim(`✓ Step ${stepIndex} finished\n`));
}
private onStepMessage(stepId: string, message: any) {
private onStepMessage(stepIndex: number, message: any) {
const role = message?.role ?? "message";
const content = message?.content;
this.write(this.bold(`${role}: `));