mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-04-26 00:46:23 +02:00
refactor agent execution
This commit is contained in:
parent
92b702d039
commit
a76cb6089c
6 changed files with 224 additions and 222 deletions
|
|
@ -1,19 +1,137 @@
|
|||
import { streamAgent } from "./application/lib/agent.js";
|
||||
import { loadAgent, RunLogger, streamAgentTurn } from "./application/lib/agent.js";
|
||||
import { StreamRenderer } from "./application/lib/stream-renderer.js";
|
||||
import { stdin as input, stdout as output } from "node:process";
|
||||
import fs from "fs";
|
||||
import path from "path";
|
||||
import { WorkDir } from "./application/config/config.js";
|
||||
import { RunEvent, RunStartEvent } from "./application/entities/run-events.js";
|
||||
import { createInterface, Interface } from "node:readline/promises";
|
||||
import { runIdGenerator } from "./application/lib/run-id-gen.js";
|
||||
import { Agent } from "./application/entities/agent.js";
|
||||
import { MessageList } from "./application/entities/message.js";
|
||||
import { z } from "zod";
|
||||
import { CopilotAgent } from "./application/assistant/agent.js";
|
||||
|
||||
export async function app(opts: {
|
||||
agent: string;
|
||||
runId?: string;
|
||||
input?: string;
|
||||
noInteractive?: boolean;
|
||||
}) {
|
||||
let inputCount = 0;
|
||||
const messages: z.infer<typeof MessageList> = [];
|
||||
const renderer = new StreamRenderer();
|
||||
for await (const event of streamAgent({
|
||||
...opts,
|
||||
interactive: true,
|
||||
})) {
|
||||
renderer.render(event);
|
||||
if (event?.type === "error") {
|
||||
process.exitCode = 1;
|
||||
|
||||
// load existing and assemble state if required
|
||||
let runId = opts.runId;
|
||||
if (runId) {
|
||||
console.error("loading run", runId);
|
||||
let stream: fs.ReadStream | null = null;
|
||||
let rl: Interface | null = null;
|
||||
try {
|
||||
const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
|
||||
stream = fs.createReadStream(logFile, { encoding: "utf8" });
|
||||
rl = createInterface({ input: stream, crlfDelay: Infinity });
|
||||
for await (const line of rl) {
|
||||
if (line.trim() === "") {
|
||||
continue;
|
||||
}
|
||||
const parsed = JSON.parse(line);
|
||||
const event = RunEvent.parse(parsed);
|
||||
switch (event.type) {
|
||||
case "message":
|
||||
messages.push(event.message);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
stream?.close();
|
||||
}
|
||||
}
|
||||
|
||||
// add user input
|
||||
if (opts.input) {
|
||||
messages.push({
|
||||
role: "user",
|
||||
content: opts.input,
|
||||
});
|
||||
inputCount++;
|
||||
}
|
||||
|
||||
// create runId if not present
|
||||
if (!runId) {
|
||||
runId = runIdGenerator.next();
|
||||
}
|
||||
const logger = new RunLogger(runId);
|
||||
|
||||
// load agent data
|
||||
let agent: z.infer<typeof Agent> | null = null;
|
||||
if (opts.agent === "copilot") {
|
||||
agent = CopilotAgent;
|
||||
} else {
|
||||
agent = await loadAgent(opts.agent);
|
||||
}
|
||||
if (!agent) {
|
||||
throw new Error("unable to load agent");
|
||||
}
|
||||
|
||||
// emit start event if first time run
|
||||
if (!opts.runId) {
|
||||
const ev = {
|
||||
type: "start",
|
||||
runId,
|
||||
agent: agent.name,
|
||||
} as z.infer<typeof RunStartEvent>;
|
||||
logger.log(ev);
|
||||
renderer.render(ev);
|
||||
}
|
||||
|
||||
// loop between user and agent
|
||||
let rl: Interface | null = null;
|
||||
if (!opts.noInteractive) {
|
||||
rl = createInterface({ input, output });
|
||||
}
|
||||
let firstPass = true;
|
||||
try {
|
||||
while (true) {
|
||||
let askInput = false;
|
||||
if (firstPass) {
|
||||
if (!opts.input) {
|
||||
askInput = true;
|
||||
}
|
||||
firstPass = false;
|
||||
} else {
|
||||
askInput = true;
|
||||
}
|
||||
if (rl && askInput) {
|
||||
const userInput = await rl.question("You: ");
|
||||
if (["quit", "exit", "q"].includes(userInput.trim().toLowerCase())) {
|
||||
console.error("Bye!");
|
||||
return;
|
||||
}
|
||||
inputCount++;
|
||||
messages.push({
|
||||
role: "user",
|
||||
content: userInput,
|
||||
});
|
||||
}
|
||||
for await (const event of streamAgentTurn({
|
||||
agent,
|
||||
messages,
|
||||
})) {
|
||||
logger.log(event);
|
||||
renderer.render(event);
|
||||
if (event?.type === "error") {
|
||||
process.exitCode = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (opts.noInteractive) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
logger.close();
|
||||
rl?.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -11,7 +11,6 @@ export const RunStartEvent = BaseRunEvent.extend({
|
|||
type: z.literal("start"),
|
||||
runId: z.string(),
|
||||
agent: z.string(),
|
||||
interactive: z.boolean(),
|
||||
});
|
||||
|
||||
export const RunStepStartEvent = BaseRunEvent.extend({
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import path from "path";
|
|||
import { ModelConfig, WorkDir } from "../config/config.js";
|
||||
import { Agent, ToolAttachment } from "../entities/agent.js";
|
||||
import { createInterface, Interface } from "node:readline/promises";
|
||||
import { stdin as input, stdout as output } from "node:process";
|
||||
import { AssistantContentPart, AssistantMessage, Message, MessageList, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js";
|
||||
import { runIdGenerator } from "./run-id-gen.js";
|
||||
import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai";
|
||||
|
|
@ -79,23 +78,6 @@ export class RunLogger {
|
|||
}
|
||||
}
|
||||
|
||||
export class LogAndYield {
|
||||
private logger: RunLogger
|
||||
|
||||
constructor(logger: RunLogger) {
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
async *logAndYield(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
|
||||
const ev = {
|
||||
...event,
|
||||
ts: new Date().toISOString(),
|
||||
}
|
||||
this.logger.log(ev);
|
||||
yield ev;
|
||||
}
|
||||
}
|
||||
|
||||
export class StreamStepMessageBuilder {
|
||||
private parts: z.infer<typeof AssistantContentPart>[] = [];
|
||||
private textBuffer: string = "";
|
||||
|
|
@ -218,56 +200,11 @@ export function convertFromMessages(messages: z.infer<typeof Message>[]): ModelM
|
|||
}
|
||||
|
||||
|
||||
export async function* streamAgent(opts: {
|
||||
agent: string;
|
||||
runId?: string;
|
||||
input?: string;
|
||||
interactive?: boolean;
|
||||
export async function* streamAgentTurn(opts: {
|
||||
agent: z.infer<typeof Agent>;
|
||||
messages: z.infer<typeof MessageList>;
|
||||
}): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
|
||||
const messages: z.infer<typeof MessageList> = [];
|
||||
|
||||
// load existing and assemble state if required
|
||||
let runId = opts.runId;
|
||||
if (runId) {
|
||||
console.error("loading run", runId);
|
||||
let stream: fs.ReadStream | null = null;
|
||||
let rl: Interface | null = null;
|
||||
try {
|
||||
const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
|
||||
stream = fs.createReadStream(logFile, { encoding: "utf8" });
|
||||
rl = createInterface({ input: stream, crlfDelay: Infinity });
|
||||
for await (const line of rl) {
|
||||
if (line.trim() === "") {
|
||||
continue;
|
||||
}
|
||||
const parsed = JSON.parse(line);
|
||||
const event = RunEvent.parse(parsed);
|
||||
switch (event.type) {
|
||||
case "message":
|
||||
messages.push(event.message);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
stream?.close();
|
||||
}
|
||||
}
|
||||
|
||||
// create runId if not present
|
||||
if (!runId) {
|
||||
runId = runIdGenerator.next();
|
||||
}
|
||||
|
||||
// load agent data
|
||||
let agent: z.infer<typeof Agent> | null = null;
|
||||
if (opts.agent === "copilot") {
|
||||
agent = CopilotAgent;
|
||||
} else {
|
||||
agent = await loadAgent(opts.agent);
|
||||
}
|
||||
if (!agent) {
|
||||
throw new Error("unable to load agent");
|
||||
}
|
||||
const { agent, messages } = opts;
|
||||
|
||||
// set up tools
|
||||
const tools: ToolSet = {};
|
||||
|
|
@ -281,149 +218,87 @@ export async function* streamAgent(opts: {
|
|||
}
|
||||
|
||||
// set up
|
||||
const logger = new RunLogger(runId);
|
||||
const ly = new LogAndYield(logger);
|
||||
const provider = getProvider(agent.provider);
|
||||
const model = provider(agent.model || ModelConfig.defaults.model);
|
||||
|
||||
// emit start event if first time run
|
||||
if (!opts.runId) {
|
||||
yield* ly.logAndYield({
|
||||
type: "start",
|
||||
runId,
|
||||
agent: opts.agent,
|
||||
interactive: opts.interactive ?? false,
|
||||
});
|
||||
}
|
||||
|
||||
// get first input if needed
|
||||
let rl: Interface | null = null;
|
||||
if (opts.interactive) {
|
||||
rl = createInterface({ input, output });
|
||||
}
|
||||
if (opts.input) {
|
||||
const m: z.infer<typeof UserMessage> = {
|
||||
role: "user",
|
||||
content: opts.input,
|
||||
};
|
||||
messages.push(m);
|
||||
yield *ly.logAndYield({
|
||||
type: "message",
|
||||
message: m,
|
||||
});
|
||||
}
|
||||
try {
|
||||
// loop b/w user and agent
|
||||
while (true) {
|
||||
// get input in interactive mode when last message is not user
|
||||
if (opts.interactive && (messages.length === 0 || messages[messages.length - 1].role !== "user")) {
|
||||
const input = await rl!.question("You: ");
|
||||
// Exit condition
|
||||
if (["q", "quit", "exit"].includes(input.toLowerCase())) {
|
||||
console.log("\n👋 Goodbye!");
|
||||
return;
|
||||
}
|
||||
|
||||
const m: z.infer<typeof UserMessage> = {
|
||||
role: "user",
|
||||
content: input,
|
||||
};
|
||||
messages.push(m);
|
||||
yield* ly.logAndYield({
|
||||
type: "message",
|
||||
message: m,
|
||||
});
|
||||
}
|
||||
|
||||
// inner loop to handle tool calls
|
||||
while (true) {
|
||||
// stream agent response and build message
|
||||
const messageBuilder = new StreamStepMessageBuilder();
|
||||
for await (const event of streamLlm(
|
||||
model,
|
||||
messages,
|
||||
agent.instructions,
|
||||
tools,
|
||||
)) {
|
||||
messageBuilder.ingest(event);
|
||||
yield* ly.logAndYield({
|
||||
type: "stream-event",
|
||||
event: event,
|
||||
});
|
||||
}
|
||||
|
||||
// build and emit final message from agent response
|
||||
const msg = messageBuilder.get();
|
||||
messages.push(msg);
|
||||
yield* ly.logAndYield({
|
||||
type: "message",
|
||||
message: msg,
|
||||
});
|
||||
|
||||
// handle tool calls
|
||||
const mappedToolCalls: z.infer<typeof MappedToolCall>[] = [];
|
||||
let msgToolCallParts: z.infer<typeof ToolCallPart>[] = [];
|
||||
if (msg.content instanceof Array) {
|
||||
msgToolCallParts = msg.content.filter(part => part.type === "tool-call");
|
||||
}
|
||||
const hasToolCalls = msgToolCallParts.length > 0;
|
||||
console.log(msgToolCallParts);
|
||||
|
||||
// validate and map tool calls
|
||||
for (const part of msgToolCallParts) {
|
||||
const agentTool = tools[part.toolName];
|
||||
if (!agentTool) {
|
||||
throw new Error(`Tool ${part.toolName} not found`);
|
||||
}
|
||||
mappedToolCalls.push({
|
||||
toolCall: part,
|
||||
agentTool: agent.tools![part.toolName],
|
||||
});
|
||||
}
|
||||
|
||||
for (const call of mappedToolCalls) {
|
||||
const { agentTool, toolCall } = call;
|
||||
yield* ly.logAndYield({
|
||||
type: "tool-invocation",
|
||||
toolName: toolCall.toolName,
|
||||
input: JSON.stringify(toolCall.arguments),
|
||||
});
|
||||
const result = await execTool(agentTool, toolCall.arguments);
|
||||
const resultMsg: z.infer<typeof ToolMessage> = {
|
||||
role: "tool",
|
||||
content: JSON.stringify(result),
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
};
|
||||
messages.push(resultMsg);
|
||||
yield* ly.logAndYield({
|
||||
type: "tool-result",
|
||||
toolName: toolCall.toolName,
|
||||
result: result,
|
||||
});
|
||||
yield* ly.logAndYield({
|
||||
type: "message",
|
||||
message: resultMsg,
|
||||
});
|
||||
}
|
||||
|
||||
// if the agent response had tool calls, replay this agent
|
||||
if (hasToolCalls) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// otherwise, break
|
||||
break;
|
||||
}
|
||||
|
||||
// if not interactive, return
|
||||
if (!opts.interactive) {
|
||||
break;
|
||||
}
|
||||
// run one turn
|
||||
while (true) {
|
||||
// stream agent response and build message
|
||||
const messageBuilder = new StreamStepMessageBuilder();
|
||||
for await (const event of streamLlm(
|
||||
model,
|
||||
messages,
|
||||
agent.instructions,
|
||||
tools,
|
||||
)) {
|
||||
messageBuilder.ingest(event);
|
||||
yield {
|
||||
type: "stream-event",
|
||||
event: event,
|
||||
};
|
||||
}
|
||||
} finally {
|
||||
rl?.close();
|
||||
logger.close();
|
||||
|
||||
// build and emit final message from agent response
|
||||
const msg = messageBuilder.get();
|
||||
messages.push(msg);
|
||||
yield {
|
||||
type: "message",
|
||||
message: msg,
|
||||
};
|
||||
|
||||
// handle tool calls
|
||||
const mappedToolCalls: z.infer<typeof MappedToolCall>[] = [];
|
||||
let msgToolCallParts: z.infer<typeof ToolCallPart>[] = [];
|
||||
if (msg.content instanceof Array) {
|
||||
msgToolCallParts = msg.content.filter(part => part.type === "tool-call");
|
||||
}
|
||||
const hasToolCalls = msgToolCallParts.length > 0;
|
||||
|
||||
// validate and map tool calls
|
||||
for (const part of msgToolCallParts) {
|
||||
const agentTool = tools[part.toolName];
|
||||
if (!agentTool) {
|
||||
throw new Error(`Tool ${part.toolName} not found`);
|
||||
}
|
||||
mappedToolCalls.push({
|
||||
toolCall: part,
|
||||
agentTool: agent.tools![part.toolName],
|
||||
});
|
||||
}
|
||||
|
||||
for (const call of mappedToolCalls) {
|
||||
const { agentTool, toolCall } = call;
|
||||
yield {
|
||||
type: "tool-invocation",
|
||||
toolName: toolCall.toolName,
|
||||
input: JSON.stringify(toolCall.arguments),
|
||||
};
|
||||
const result = await execTool(agentTool, toolCall.arguments);
|
||||
const resultMsg: z.infer<typeof ToolMessage> = {
|
||||
role: "tool",
|
||||
content: JSON.stringify(result),
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
};
|
||||
messages.push(resultMsg);
|
||||
yield {
|
||||
type: "tool-result",
|
||||
toolName: toolCall.toolName,
|
||||
result: result,
|
||||
};
|
||||
yield {
|
||||
type: "message",
|
||||
message: resultMsg,
|
||||
};
|
||||
}
|
||||
|
||||
// if the agent response had tool calls, replay this agent
|
||||
if (hasToolCalls) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// otherwise, break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,8 @@ import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
|
|||
import { Client } from "@modelcontextprotocol/sdk/client";
|
||||
import { AssistantMessage } from "../entities/message.js";
|
||||
import { BuiltinTools } from "./builtin-tools.js";
|
||||
import { streamAgent } from "./agent.js";
|
||||
import { loadAgent, streamAgentTurn } from "./agent.js";
|
||||
import { app } from "@/app.js";
|
||||
|
||||
async function execMcpTool(agentTool: z.infer<typeof ToolAttachment> & { type: "mcp" }, input: any): Promise<any> {
|
||||
// load mcp configuration from the tool
|
||||
|
|
@ -55,9 +56,13 @@ async function execMcpTool(agentTool: z.infer<typeof ToolAttachment> & { type: "
|
|||
|
||||
async function execAgentTool(agentTool: z.infer<typeof ToolAttachment> & { type: "agent" }, input: any): Promise<any> {
|
||||
let lastMsg: z.infer<typeof AssistantMessage> | null = null;
|
||||
for await (const event of streamAgent({
|
||||
agent: agentTool.name,
|
||||
input: JSON.stringify(input),
|
||||
const agent = await loadAgent(agentTool.name);
|
||||
for await (const event of streamAgentTurn({
|
||||
agent,
|
||||
messages: [{
|
||||
role: "user",
|
||||
content: JSON.stringify(input),
|
||||
}],
|
||||
})) {
|
||||
if (event.type === "message" && event.message.role === "assistant") {
|
||||
lastMsg = event.message;
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ export class StreamRenderer {
|
|||
render(event: z.infer<typeof RunEvent>) {
|
||||
switch (event.type) {
|
||||
case "start": {
|
||||
this.onStart(event.agent, event.runId, event.interactive);
|
||||
this.onStart(event.agent, event.runId);
|
||||
break;
|
||||
}
|
||||
case "step-start": {
|
||||
|
|
@ -94,10 +94,9 @@ export class StreamRenderer {
|
|||
}
|
||||
}
|
||||
|
||||
private onStart(agent: string, runId: string, interactive: boolean) {
|
||||
private onStart(agent: string, runId: string) {
|
||||
this.write("\n");
|
||||
this.write(this.bold(`▶ Agent ${agent} (run ${runId})`));
|
||||
if (!interactive) this.write(this.dim(" (--no-interactive)"));
|
||||
this.write("\n");
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue