mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-04-26 08:56:22 +02:00
server for rowboatx
This commit is contained in:
parent
ae877e70ae
commit
9ad6331fbc
38 changed files with 2223 additions and 1088 deletions
35
apps/cli/src/agents/agents.ts
Normal file
35
apps/cli/src/agents/agents.ts
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
import { z } from "zod";
|
||||
|
||||
export const BaseTool = z.object({
|
||||
name: z.string(),
|
||||
});
|
||||
|
||||
export const BuiltinTool = BaseTool.extend({
|
||||
type: z.literal("builtin"),
|
||||
});
|
||||
|
||||
export const McpTool = BaseTool.extend({
|
||||
type: z.literal("mcp"),
|
||||
description: z.string(),
|
||||
inputSchema: z.any(),
|
||||
mcpServerName: z.string(),
|
||||
});
|
||||
|
||||
export const AgentAsATool = BaseTool.extend({
|
||||
type: z.literal("agent"),
|
||||
});
|
||||
|
||||
export const ToolAttachment = z.discriminatedUnion("type", [
|
||||
BuiltinTool,
|
||||
McpTool,
|
||||
AgentAsATool,
|
||||
]);
|
||||
|
||||
export const Agent = z.object({
|
||||
name: z.string(),
|
||||
provider: z.string().optional(),
|
||||
model: z.string().optional(),
|
||||
description: z.string(),
|
||||
instructions: z.string(),
|
||||
tools: z.record(z.string(), ToolAttachment).optional(),
|
||||
});
|
||||
45
apps/cli/src/agents/repo.ts
Normal file
45
apps/cli/src/agents/repo.ts
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
import { WorkDir } from "../config/config.js";
|
||||
import fs from "fs/promises";
|
||||
import path from "path";
|
||||
import z from "zod";
|
||||
import { Agent } from "./agents.js";
|
||||
|
||||
export interface IAgentsRepo {
|
||||
list(): Promise<z.infer<typeof Agent>[]>;
|
||||
fetch(id: string): Promise<z.infer<typeof Agent>>;
|
||||
create(agent: z.infer<typeof Agent>): Promise<void>;
|
||||
update(id: string, agent: z.infer<typeof Agent>): Promise<void>;
|
||||
delete(id: string): Promise<void>;
|
||||
}
|
||||
|
||||
export class FSAgentsRepo implements IAgentsRepo {
|
||||
async list(): Promise<z.infer<typeof Agent>[]> {
|
||||
const result: z.infer<typeof Agent>[] = [];
|
||||
// list all json files in workdir/agents/
|
||||
const agentsDir = path.join(WorkDir, "agents");
|
||||
const files = await fs.readdir(agentsDir);
|
||||
|
||||
for (const file of files) {
|
||||
const contents = await fs.readFile(path.join(agentsDir, file), "utf8");
|
||||
result.push(Agent.parse(JSON.parse(contents)));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async fetch(id: string): Promise<z.infer<typeof Agent>> {
|
||||
const contents = await fs.readFile(path.join(WorkDir, "agents", `${id}.json`), "utf8");
|
||||
return Agent.parse(JSON.parse(contents));
|
||||
}
|
||||
|
||||
async create(agent: z.infer<typeof Agent>): Promise<void> {
|
||||
await fs.writeFile(path.join(WorkDir, "agents", `${agent.name}.json`), JSON.stringify(agent, null, 2));
|
||||
}
|
||||
|
||||
async update(id: string, agent: z.infer<typeof Agent>): Promise<void> {
|
||||
await fs.writeFile(path.join(WorkDir, "agents", `${id}.json`), JSON.stringify(agent, null, 2));
|
||||
}
|
||||
|
||||
async delete(id: string): Promise<void> {
|
||||
await fs.unlink(path.join(WorkDir, "agents", `${id}.json`));
|
||||
}
|
||||
}
|
||||
824
apps/cli/src/agents/runtime.ts
Normal file
824
apps/cli/src/agents/runtime.ts
Normal file
|
|
@ -0,0 +1,824 @@
|
|||
import { jsonSchema, ModelMessage, modelMessageSchema } from "ai";
|
||||
import fs from "fs";
|
||||
import path from "path";
|
||||
import { WorkDir } from "../config/config.js";
|
||||
import { Agent, ToolAttachment } from "./agents.js";
|
||||
import { AssistantContentPart, AssistantMessage, Message, MessageList, ProviderOptions, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js";
|
||||
import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai";
|
||||
import { z } from "zod";
|
||||
import { LlmStepStreamEvent } from "../entities/llm-step-events.js";
|
||||
import { execTool } from "../application/lib/exec-tool.js";
|
||||
import { MessageEvent, AskHumanRequestEvent, RunEvent, ToolInvocationEvent, ToolPermissionRequestEvent, ToolPermissionResponseEvent } from "../entities/run-events.js";
|
||||
import { BuiltinTools } from "../application/lib/builtin-tools.js";
|
||||
import { CopilotAgent } from "../application/assistant/agent.js";
|
||||
import { isBlocked } from "../application/lib/command-executor.js";
|
||||
import container from "../di/container.js";
|
||||
import { IModelConfigRepo } from "../models/repo.js";
|
||||
import { getProvider } from "../models/models.js";
|
||||
import { IAgentsRepo } from "./repo.js";
|
||||
import { IdGen, IMonotonicallyIncreasingIdGenerator } from "../application/lib/id-gen.js";
|
||||
import { IBus } from "../application/lib/bus.js";
|
||||
import { IMessageQueue } from "../application/lib/message-queue.js";
|
||||
import { IRunsRepo } from "../runs/repo.js";
|
||||
import { IRunsLock } from "../runs/lock.js";
|
||||
|
||||
export interface IAgentRuntime {
|
||||
trigger(runId: string): Promise<void>;
|
||||
}
|
||||
|
||||
export class AgentRuntime implements IAgentRuntime {
|
||||
private runsRepo: IRunsRepo;
|
||||
private idGenerator: IMonotonicallyIncreasingIdGenerator;
|
||||
private bus: IBus;
|
||||
private messageQueue: IMessageQueue;
|
||||
private modelConfigRepo: IModelConfigRepo;
|
||||
private runsLock: IRunsLock;
|
||||
|
||||
constructor({
|
||||
runsRepo,
|
||||
idGenerator,
|
||||
bus,
|
||||
messageQueue,
|
||||
modelConfigRepo,
|
||||
runsLock,
|
||||
}: {
|
||||
runsRepo: IRunsRepo;
|
||||
idGenerator: IMonotonicallyIncreasingIdGenerator;
|
||||
bus: IBus;
|
||||
messageQueue: IMessageQueue;
|
||||
modelConfigRepo: IModelConfigRepo;
|
||||
runsLock: IRunsLock;
|
||||
}) {
|
||||
this.runsRepo = runsRepo;
|
||||
this.idGenerator = idGenerator;
|
||||
this.bus = bus;
|
||||
this.messageQueue = messageQueue;
|
||||
this.modelConfigRepo = modelConfigRepo;
|
||||
this.runsLock = runsLock;
|
||||
}
|
||||
|
||||
async trigger(runId: string): Promise<void> {
|
||||
if (!await this.runsLock.lock(runId)) {
|
||||
console.log(`unable to acquire lock on run ${runId}`);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
while (true) {
|
||||
let eventCount = 0;
|
||||
const run = await this.runsRepo.fetch(runId);
|
||||
if (!run) {
|
||||
throw new Error(`Run ${runId} not found`);
|
||||
}
|
||||
const state = new AgentState();
|
||||
for (const event of run.log) {
|
||||
state.ingest(event);
|
||||
}
|
||||
for await (const event of streamAgent({
|
||||
state,
|
||||
idGenerator: this.idGenerator,
|
||||
runId,
|
||||
messageQueue: this.messageQueue,
|
||||
modelConfigRepo: this.modelConfigRepo,
|
||||
})) {
|
||||
eventCount++;
|
||||
if (event.type !== "llm-stream-event") {
|
||||
await this.runsRepo.appendEvents(runId, [event]);
|
||||
}
|
||||
await this.bus.publish(event);
|
||||
}
|
||||
|
||||
// if no events, break
|
||||
if (!eventCount) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await this.runsLock.release(runId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function mapAgentTool(t: z.infer<typeof ToolAttachment>): Promise<Tool> {
|
||||
switch (t.type) {
|
||||
case "mcp":
|
||||
return tool({
|
||||
name: t.name,
|
||||
description: t.description,
|
||||
inputSchema: jsonSchema(t.inputSchema),
|
||||
});
|
||||
case "agent":
|
||||
const agent = await loadAgent(t.name);
|
||||
if (!agent) {
|
||||
throw new Error(`Agent ${t.name} not found`);
|
||||
}
|
||||
return tool({
|
||||
name: t.name,
|
||||
description: agent.description,
|
||||
inputSchema: z.object({
|
||||
message: z.string().describe("The message to send to the workflow"),
|
||||
}),
|
||||
});
|
||||
case "builtin":
|
||||
if (t.name === "ask-human") {
|
||||
return tool({
|
||||
description: "Ask a human before proceeding",
|
||||
inputSchema: z.object({
|
||||
question: z.string().describe("The question to ask the human"),
|
||||
}),
|
||||
});
|
||||
}
|
||||
const match = BuiltinTools[t.name];
|
||||
if (!match) {
|
||||
throw new Error(`Unknown builtin tool: ${t.name}`);
|
||||
}
|
||||
return tool({
|
||||
description: match.description,
|
||||
inputSchema: match.inputSchema,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export class RunLogger {
|
||||
private logFile: string;
|
||||
private fileHandle: fs.WriteStream;
|
||||
|
||||
ensureRunsDir() {
|
||||
const runsDir = path.join(WorkDir, "runs");
|
||||
if (!fs.existsSync(runsDir)) {
|
||||
fs.mkdirSync(runsDir, { recursive: true });
|
||||
}
|
||||
}
|
||||
|
||||
constructor(runId: string) {
|
||||
this.ensureRunsDir();
|
||||
this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`);
|
||||
this.fileHandle = fs.createWriteStream(this.logFile, {
|
||||
flags: "a",
|
||||
encoding: "utf8",
|
||||
});
|
||||
}
|
||||
|
||||
log(event: z.infer<typeof RunEvent>) {
|
||||
if (event.type !== "llm-stream-event") {
|
||||
this.fileHandle.write(JSON.stringify(event) + "\n");
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
this.fileHandle.close();
|
||||
}
|
||||
}
|
||||
|
||||
export class StreamStepMessageBuilder {
|
||||
private parts: z.infer<typeof AssistantContentPart>[] = [];
|
||||
private textBuffer: string = "";
|
||||
private reasoningBuffer: string = "";
|
||||
private providerOptions: z.infer<typeof ProviderOptions> | undefined = undefined;
|
||||
|
||||
flushBuffers() {
|
||||
// skip reasoning
|
||||
// if (this.reasoningBuffer) {
|
||||
// this.parts.push({ type: "reasoning", text: this.reasoningBuffer });
|
||||
// this.reasoningBuffer = "";
|
||||
// }
|
||||
if (this.textBuffer) {
|
||||
this.parts.push({ type: "text", text: this.textBuffer });
|
||||
this.textBuffer = "";
|
||||
}
|
||||
}
|
||||
|
||||
ingest(event: z.infer<typeof LlmStepStreamEvent>) {
|
||||
switch (event.type) {
|
||||
case "reasoning-start":
|
||||
case "reasoning-end":
|
||||
case "text-start":
|
||||
case "text-end":
|
||||
this.flushBuffers();
|
||||
break;
|
||||
case "reasoning-delta":
|
||||
this.reasoningBuffer += event.delta;
|
||||
break;
|
||||
case "text-delta":
|
||||
this.textBuffer += event.delta;
|
||||
break;
|
||||
case "tool-call":
|
||||
this.parts.push({
|
||||
type: "tool-call",
|
||||
toolCallId: event.toolCallId,
|
||||
toolName: event.toolName,
|
||||
arguments: event.input,
|
||||
providerOptions: event.providerOptions,
|
||||
});
|
||||
break;
|
||||
case "finish-step":
|
||||
this.providerOptions = event.providerOptions;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
get(): z.infer<typeof AssistantMessage> {
|
||||
this.flushBuffers();
|
||||
return {
|
||||
role: "assistant",
|
||||
content: this.parts,
|
||||
providerOptions: this.providerOptions,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function normaliseAskHumanToolCall(message: z.infer<typeof AssistantMessage>) {
|
||||
if (typeof message.content === "string") {
|
||||
return;
|
||||
}
|
||||
let askHumanToolCall: z.infer<typeof ToolCallPart> | null = null;
|
||||
const newParts = [];
|
||||
for (const part of message.content as z.infer<typeof AssistantContentPart>[]) {
|
||||
if (part.type === "tool-call" && part.toolName === "ask-human") {
|
||||
if (!askHumanToolCall) {
|
||||
askHumanToolCall = part;
|
||||
} else {
|
||||
(askHumanToolCall as z.infer<typeof ToolCallPart>).arguments += "\n" + part.arguments;
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
newParts.push(part);
|
||||
}
|
||||
}
|
||||
if (askHumanToolCall) {
|
||||
newParts.push(askHumanToolCall);
|
||||
}
|
||||
message.content = newParts;
|
||||
}
|
||||
|
||||
export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> {
|
||||
if (id === "copilot") {
|
||||
return CopilotAgent;
|
||||
}
|
||||
const repo = container.resolve<IAgentsRepo>('agentsRepo');
|
||||
return await repo.fetch(id);
|
||||
}
|
||||
|
||||
export function convertFromMessages(messages: z.infer<typeof Message>[]): ModelMessage[] {
|
||||
const result: ModelMessage[] = [];
|
||||
for (const msg of messages) {
|
||||
const { providerOptions } = msg;
|
||||
switch (msg.role) {
|
||||
case "assistant":
|
||||
if (typeof msg.content === 'string') {
|
||||
result.push({
|
||||
role: "assistant",
|
||||
content: msg.content,
|
||||
providerOptions,
|
||||
});
|
||||
} else {
|
||||
result.push({
|
||||
role: "assistant",
|
||||
content: msg.content.map(part => {
|
||||
switch (part.type) {
|
||||
case 'text':
|
||||
return part;
|
||||
case 'reasoning':
|
||||
return part;
|
||||
case 'tool-call':
|
||||
return {
|
||||
type: 'tool-call',
|
||||
toolCallId: part.toolCallId,
|
||||
toolName: part.toolName,
|
||||
input: part.arguments,
|
||||
providerOptions: part.providerOptions,
|
||||
};
|
||||
}
|
||||
}),
|
||||
providerOptions,
|
||||
});
|
||||
}
|
||||
break;
|
||||
case "system":
|
||||
result.push({
|
||||
role: "system",
|
||||
content: msg.content,
|
||||
providerOptions,
|
||||
});
|
||||
break;
|
||||
case "user":
|
||||
result.push({
|
||||
role: "user",
|
||||
content: msg.content,
|
||||
providerOptions,
|
||||
});
|
||||
break;
|
||||
case "tool":
|
||||
result.push({
|
||||
role: "tool",
|
||||
content: [
|
||||
{
|
||||
type: "tool-result",
|
||||
toolCallId: msg.toolCallId,
|
||||
toolName: msg.toolName,
|
||||
output: {
|
||||
type: "text",
|
||||
value: msg.content,
|
||||
},
|
||||
},
|
||||
],
|
||||
providerOptions,
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
// doing this because: https://github.com/OpenRouterTeam/ai-sdk-provider/issues/262
|
||||
return JSON.parse(JSON.stringify(result));
|
||||
}
|
||||
|
||||
async function buildTools(agent: z.infer<typeof Agent>): Promise<ToolSet> {
|
||||
const tools: ToolSet = {};
|
||||
for (const [name, tool] of Object.entries(agent.tools ?? {})) {
|
||||
try {
|
||||
tools[name] = await mapAgentTool(tool);
|
||||
} catch (error) {
|
||||
console.error(`Error mapping tool ${name}:`, error);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return tools;
|
||||
}
|
||||
|
||||
export class AgentState {
|
||||
runId: string | null = null;
|
||||
agent: z.infer<typeof Agent> | null = null;
|
||||
agentName: string | null = null;
|
||||
messages: z.infer<typeof MessageList> = [];
|
||||
lastAssistantMsg: z.infer<typeof AssistantMessage> | null = null;
|
||||
subflowStates: Record<string, AgentState> = {};
|
||||
toolCallIdMap: Record<string, z.infer<typeof ToolCallPart>> = {};
|
||||
pendingToolCalls: Record<string, true> = {};
|
||||
pendingToolPermissionRequests: Record<string, z.infer<typeof ToolPermissionRequestEvent>> = {};
|
||||
pendingAskHumanRequests: Record<string, z.infer<typeof AskHumanRequestEvent>> = {};
|
||||
allowedToolCallIds: Record<string, true> = {};
|
||||
deniedToolCallIds: Record<string, true> = {};
|
||||
|
||||
getPendingPermissions(): z.infer<typeof ToolPermissionRequestEvent>[] {
|
||||
const response: z.infer<typeof ToolPermissionRequestEvent>[] = [];
|
||||
for (const [id, subflowState] of Object.entries(this.subflowStates)) {
|
||||
for (const perm of subflowState.getPendingPermissions()) {
|
||||
response.push({
|
||||
...perm,
|
||||
subflow: [id, ...perm.subflow],
|
||||
});
|
||||
}
|
||||
}
|
||||
for (const perm of Object.values(this.pendingToolPermissionRequests)) {
|
||||
response.push({
|
||||
...perm,
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
getPendingAskHumans(): z.infer<typeof AskHumanRequestEvent>[] {
|
||||
const response: z.infer<typeof AskHumanRequestEvent>[] = [];
|
||||
for (const [id, subflowState] of Object.entries(this.subflowStates)) {
|
||||
for (const ask of subflowState.getPendingAskHumans()) {
|
||||
response.push({
|
||||
...ask,
|
||||
subflow: [id, ...ask.subflow],
|
||||
});
|
||||
}
|
||||
}
|
||||
for (const ask of Object.values(this.pendingAskHumanRequests)) {
|
||||
response.push({
|
||||
...ask,
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
finalResponse(): string {
|
||||
if (!this.lastAssistantMsg) {
|
||||
return '';
|
||||
}
|
||||
if (typeof this.lastAssistantMsg.content === "string") {
|
||||
return this.lastAssistantMsg.content;
|
||||
}
|
||||
return this.lastAssistantMsg.content.reduce((acc, part) => {
|
||||
if (part.type === "text") {
|
||||
return acc + part.text;
|
||||
}
|
||||
return acc;
|
||||
}, "");
|
||||
}
|
||||
|
||||
ingest(event: z.infer<typeof RunEvent>) {
|
||||
if (event.subflow.length > 0) {
|
||||
const { subflow, ...rest } = event;
|
||||
if (!this.subflowStates[subflow[0]]) {
|
||||
this.subflowStates[subflow[0]] = new AgentState();
|
||||
}
|
||||
this.subflowStates[subflow[0]].ingest({
|
||||
...rest,
|
||||
subflow: subflow.slice(1),
|
||||
});
|
||||
return;
|
||||
}
|
||||
switch (event.type) {
|
||||
case "start":
|
||||
this.runId = event.runId;
|
||||
this.agentName = event.agentName;
|
||||
break;
|
||||
case "message":
|
||||
this.messages.push(event.message);
|
||||
if (event.message.content instanceof Array) {
|
||||
for (const part of event.message.content) {
|
||||
if (part.type === "tool-call") {
|
||||
this.toolCallIdMap[part.toolCallId] = part;
|
||||
this.pendingToolCalls[part.toolCallId] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (event.message.role === "tool") {
|
||||
const message = event.message as z.infer<typeof ToolMessage>;
|
||||
delete this.pendingToolCalls[message.toolCallId];
|
||||
}
|
||||
if (event.message.role === "assistant") {
|
||||
this.lastAssistantMsg = event.message;
|
||||
}
|
||||
break;
|
||||
case "tool-permission-request":
|
||||
this.pendingToolPermissionRequests[event.toolCall.toolCallId] = event;
|
||||
break;
|
||||
case "tool-permission-response":
|
||||
switch (event.response) {
|
||||
case "approve":
|
||||
this.allowedToolCallIds[event.toolCallId] = true;
|
||||
break;
|
||||
case "deny":
|
||||
this.deniedToolCallIds[event.toolCallId] = true;
|
||||
break;
|
||||
}
|
||||
delete this.pendingToolPermissionRequests[event.toolCallId];
|
||||
break;
|
||||
case "ask-human-request":
|
||||
this.pendingAskHumanRequests[event.toolCallId] = event;
|
||||
break;
|
||||
case "ask-human-response":
|
||||
// console.error('im here', this.agentName, this.runId, event.subflow);
|
||||
const ogEvent = this.pendingAskHumanRequests[event.toolCallId];
|
||||
this.messages.push({
|
||||
role: "tool",
|
||||
content: JSON.stringify({
|
||||
userResponse: event.response,
|
||||
}),
|
||||
toolCallId: ogEvent.toolCallId,
|
||||
toolName: this.toolCallIdMap[ogEvent.toolCallId]!.toolName,
|
||||
});
|
||||
delete this.pendingAskHumanRequests[ogEvent.toolCallId];
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function* streamAgent({
|
||||
state,
|
||||
idGenerator,
|
||||
runId,
|
||||
messageQueue,
|
||||
modelConfigRepo,
|
||||
}: {
|
||||
state: AgentState,
|
||||
idGenerator: IMonotonicallyIncreasingIdGenerator;
|
||||
runId: string;
|
||||
messageQueue: IMessageQueue;
|
||||
modelConfigRepo: IModelConfigRepo;
|
||||
}): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
|
||||
async function* processEvent(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
|
||||
state.ingest(event);
|
||||
yield event;
|
||||
}
|
||||
|
||||
const modelConfig = await modelConfigRepo.getConfig();
|
||||
if (!modelConfig) {
|
||||
throw new Error("Model config not found");
|
||||
}
|
||||
|
||||
// set up agent
|
||||
const agent = await loadAgent(state.agentName!);
|
||||
|
||||
// set up tools
|
||||
const tools = await buildTools(agent);
|
||||
|
||||
// set up provider + model
|
||||
const provider = await getProvider(agent.provider);
|
||||
const model = provider.languageModel(agent.model || modelConfig.defaults.model);
|
||||
let loopCounter = 0;
|
||||
|
||||
console.log('here');
|
||||
|
||||
async function pendingMsgs() {
|
||||
const pendingMsgs = [];
|
||||
|
||||
}
|
||||
|
||||
while (true) {
|
||||
// console.error(`loop counter: ${loopCounter++}`)
|
||||
// if last response is from assistant and text, get any pending msgs
|
||||
const lastMessage = state.messages[state.messages.length - 1];
|
||||
if (lastMessage
|
||||
&& lastMessage.role === "assistant"
|
||||
&& (typeof lastMessage.content === "string"
|
||||
|| !lastMessage.content.some(part => part.type === "tool-call")
|
||||
)
|
||||
) {
|
||||
let pending = 0;
|
||||
while(true) {
|
||||
const msg = await messageQueue.dequeue(runId);
|
||||
if (!msg) {
|
||||
break;
|
||||
}
|
||||
pending++;
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "message",
|
||||
messageId: msg.messageId,
|
||||
message: {
|
||||
role: "user",
|
||||
content: msg.message,
|
||||
},
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
// if no msgs found, return
|
||||
if (!pending) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// execute any pending tool calls
|
||||
for (const toolCallId of Object.keys(state.pendingToolCalls)) {
|
||||
const toolCall = state.toolCallIdMap[toolCallId];
|
||||
|
||||
// if ask-human, skip
|
||||
if (toolCall.toolName === "ask-human") {
|
||||
continue;
|
||||
}
|
||||
|
||||
// if tool has been denied, deny
|
||||
if (state.deniedToolCallIds[toolCallId]) {
|
||||
yield *processEvent({
|
||||
runId,
|
||||
messageId: await idGenerator.next(),
|
||||
type: "message",
|
||||
message: {
|
||||
role: "tool",
|
||||
content: "Unable to execute this tool: Permission was denied.",
|
||||
toolCallId: toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
},
|
||||
subflow: [],
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// if permission is pending on this tool call, allow execution
|
||||
if (state.pendingToolPermissionRequests[toolCallId]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// execute approved tool
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "tool-invocation",
|
||||
toolName: toolCall.toolName,
|
||||
input: JSON.stringify(toolCall.arguments),
|
||||
subflow: [],
|
||||
});
|
||||
let result: any = null;
|
||||
if (agent.tools![toolCall.toolName].type === "agent") {
|
||||
let subflowState = state.subflowStates[toolCallId];
|
||||
for await (const event of streamAgent({
|
||||
state: subflowState,
|
||||
idGenerator,
|
||||
runId,
|
||||
messageQueue,
|
||||
modelConfigRepo,
|
||||
})) {
|
||||
yield *processEvent({
|
||||
...event,
|
||||
subflow: [toolCallId, ...event.subflow],
|
||||
});
|
||||
}
|
||||
if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) {
|
||||
result = subflowState.finalResponse();
|
||||
}
|
||||
} else {
|
||||
result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments);
|
||||
}
|
||||
if (result) {
|
||||
const resultMsg: z.infer<typeof ToolMessage> = {
|
||||
role: "tool",
|
||||
content: JSON.stringify(result),
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
};
|
||||
yield* processEvent({
|
||||
runId,
|
||||
type: "tool-result",
|
||||
toolName: toolCall.toolName,
|
||||
result: result,
|
||||
subflow: [],
|
||||
});
|
||||
yield *processEvent({
|
||||
runId,
|
||||
messageId: await idGenerator.next(),
|
||||
type: "message",
|
||||
message: resultMsg,
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// if pending state, exit
|
||||
if (state.getPendingAskHumans().length || state.getPendingPermissions().length) {
|
||||
// console.error("pending asks or permissions, exiting (b.)")
|
||||
return;
|
||||
}
|
||||
|
||||
// if current message state isn't runnable, exit
|
||||
/*
|
||||
if (state.messages.length === 0 || state.messages[state.messages.length - 1].role === "assistant") {
|
||||
// console.error("current message state isn't runnable, exiting (c.)")
|
||||
return;
|
||||
}
|
||||
*/
|
||||
|
||||
while(true) {
|
||||
const msg = await messageQueue.dequeue(runId);
|
||||
if (!msg) {
|
||||
break;
|
||||
}
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "message",
|
||||
messageId: msg.messageId,
|
||||
message: {
|
||||
role: "user",
|
||||
content: msg.message,
|
||||
},
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
|
||||
// run one LLM turn.
|
||||
// stream agent response and build message
|
||||
const messageBuilder = new StreamStepMessageBuilder();
|
||||
for await (const event of streamLlm(
|
||||
model,
|
||||
state.messages,
|
||||
agent.instructions,
|
||||
tools,
|
||||
)) {
|
||||
messageBuilder.ingest(event);
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "llm-stream-event",
|
||||
event: event,
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
|
||||
// build and emit final message from agent response
|
||||
const message = messageBuilder.get();
|
||||
yield *processEvent({
|
||||
runId,
|
||||
messageId: await idGenerator.next(),
|
||||
type: "message",
|
||||
message,
|
||||
subflow: [],
|
||||
});
|
||||
|
||||
// if there were any ask-human calls, emit those events
|
||||
if (message.content instanceof Array) {
|
||||
for (const part of message.content) {
|
||||
if (part.type === "tool-call") {
|
||||
const underlyingTool = agent.tools![part.toolName];
|
||||
if (underlyingTool.type === "builtin" && underlyingTool.name === "ask-human") {
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "ask-human-request",
|
||||
toolCallId: part.toolCallId,
|
||||
query: part.arguments.question,
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
if (underlyingTool.type === "builtin" && underlyingTool.name === "executeCommand") {
|
||||
// if command is blocked, then seek permission
|
||||
if (isBlocked(part.arguments.command)) {
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "tool-permission-request",
|
||||
toolCall: part,
|
||||
subflow: [],
|
||||
});
|
||||
}
|
||||
}
|
||||
if (underlyingTool.type === "agent" && underlyingTool.name) {
|
||||
yield *processEvent({
|
||||
runId,
|
||||
type: "spawn-subflow",
|
||||
agentName: underlyingTool.name,
|
||||
toolCallId: part.toolCallId,
|
||||
subflow: [],
|
||||
});
|
||||
yield *processEvent({
|
||||
runId,
|
||||
messageId: await idGenerator.next(),
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: part.arguments.message,
|
||||
},
|
||||
subflow: [part.toolCallId],
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function* streamLlm(
|
||||
model: LanguageModel,
|
||||
messages: z.infer<typeof MessageList>,
|
||||
instructions: string,
|
||||
tools: ToolSet,
|
||||
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
|
||||
const { fullStream } = streamText({
|
||||
model,
|
||||
messages: convertFromMessages(messages),
|
||||
system: instructions,
|
||||
tools,
|
||||
stopWhen: stepCountIs(1),
|
||||
});
|
||||
for await (const event of fullStream) {
|
||||
// console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event));
|
||||
switch (event.type) {
|
||||
case "reasoning-start":
|
||||
yield {
|
||||
type: "reasoning-start",
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
case "reasoning-delta":
|
||||
yield {
|
||||
type: "reasoning-delta",
|
||||
delta: event.text,
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
case "reasoning-end":
|
||||
yield {
|
||||
type: "reasoning-end",
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
case "text-start":
|
||||
yield {
|
||||
type: "text-start",
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
case "text-delta":
|
||||
yield {
|
||||
type: "text-delta",
|
||||
delta: event.text,
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
case "tool-call":
|
||||
yield {
|
||||
type: "tool-call",
|
||||
toolCallId: event.toolCallId,
|
||||
toolName: event.toolName,
|
||||
input: event.input,
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
case "finish-step":
|
||||
yield {
|
||||
type: "finish-step",
|
||||
usage: event.usage,
|
||||
finishReason: event.finishReason,
|
||||
providerOptions: event.providerMetadata,
|
||||
};
|
||||
break;
|
||||
default:
|
||||
// console.warn("Unknown event type", event);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
export const MappedToolCall = z.object({
|
||||
toolCall: ToolCallPart,
|
||||
agentTool: ToolAttachment,
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue