import { jsonSchema, ModelMessage, modelMessageSchema } from "ai"; import fs from "fs"; import path from "path"; import { WorkDir } from "../config/config.js"; import { Agent, ToolAttachment } from "./agents.js"; import { AssistantContentPart, AssistantMessage, Message, MessageList, ProviderOptions, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js"; import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai"; import { z } from "zod"; import { LlmStepStreamEvent } from "../entities/llm-step-events.js"; import { execTool } from "../application/lib/exec-tool.js"; import { MessageEvent, AskHumanRequestEvent, RunEvent, ToolInvocationEvent, ToolPermissionRequestEvent, ToolPermissionResponseEvent } from "../entities/run-events.js"; import { BuiltinTools } from "../application/lib/builtin-tools.js"; import { CopilotAgent } from "../application/assistant/agent.js"; import { isBlocked } from "../application/lib/command-executor.js"; import container from "../di/container.js"; import { IModelConfigRepo } from "../models/repo.js"; import { getProvider } from "../models/models.js"; import { IAgentsRepo } from "./repo.js"; import { IdGen, IMonotonicallyIncreasingIdGenerator } from "../application/lib/id-gen.js"; import { IBus } from "../application/lib/bus.js"; import { IMessageQueue } from "../application/lib/message-queue.js"; import { IRunsRepo } from "../runs/repo.js"; import { IRunsLock } from "../runs/lock.js"; export interface IAgentRuntime { trigger(runId: string): Promise; } export class AgentRuntime implements IAgentRuntime { private runsRepo: IRunsRepo; private idGenerator: IMonotonicallyIncreasingIdGenerator; private bus: IBus; private messageQueue: IMessageQueue; private modelConfigRepo: IModelConfigRepo; private runsLock: IRunsLock; constructor({ runsRepo, idGenerator, bus, messageQueue, modelConfigRepo, runsLock, }: { runsRepo: IRunsRepo; idGenerator: IMonotonicallyIncreasingIdGenerator; bus: IBus; messageQueue: IMessageQueue; modelConfigRepo: IModelConfigRepo; runsLock: IRunsLock; }) { this.runsRepo = runsRepo; this.idGenerator = idGenerator; this.bus = bus; this.messageQueue = messageQueue; this.modelConfigRepo = modelConfigRepo; this.runsLock = runsLock; } async trigger(runId: string): Promise { if (!await this.runsLock.lock(runId)) { console.log(`unable to acquire lock on run ${runId}`); return; } try { while (true) { let eventCount = 0; const run = await this.runsRepo.fetch(runId); if (!run) { throw new Error(`Run ${runId} not found`); } const state = new AgentState(); for (const event of run.log) { state.ingest(event); } for await (const event of streamAgent({ state, idGenerator: this.idGenerator, runId, messageQueue: this.messageQueue, modelConfigRepo: this.modelConfigRepo, })) { eventCount++; if (event.type !== "llm-stream-event") { await this.runsRepo.appendEvents(runId, [event]); } await this.bus.publish(event); } // if no events, break if (!eventCount) { break; } } } finally { await this.runsLock.release(runId); } } } export async function mapAgentTool(t: z.infer): Promise { switch (t.type) { case "mcp": return tool({ name: t.name, description: t.description, inputSchema: jsonSchema(t.inputSchema), }); case "agent": const agent = await loadAgent(t.name); if (!agent) { throw new Error(`Agent ${t.name} not found`); } return tool({ name: t.name, description: agent.description, inputSchema: z.object({ message: z.string().describe("The message to send to the workflow"), }), }); case "builtin": if (t.name === "ask-human") { return tool({ description: "Ask a human before proceeding", inputSchema: z.object({ question: z.string().describe("The question to ask the human"), }), }); } const match = BuiltinTools[t.name]; if (!match) { throw new Error(`Unknown builtin tool: ${t.name}`); } return tool({ description: match.description, inputSchema: match.inputSchema, }); } } export class RunLogger { private logFile: string; private fileHandle: fs.WriteStream; ensureRunsDir() { const runsDir = path.join(WorkDir, "runs"); if (!fs.existsSync(runsDir)) { fs.mkdirSync(runsDir, { recursive: true }); } } constructor(runId: string) { this.ensureRunsDir(); this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); this.fileHandle = fs.createWriteStream(this.logFile, { flags: "a", encoding: "utf8", }); } log(event: z.infer) { if (event.type !== "llm-stream-event") { this.fileHandle.write(JSON.stringify(event) + "\n"); } } close() { this.fileHandle.close(); } } export class StreamStepMessageBuilder { private parts: z.infer[] = []; private textBuffer: string = ""; private reasoningBuffer: string = ""; private providerOptions: z.infer | undefined = undefined; flushBuffers() { // skip reasoning // if (this.reasoningBuffer) { // this.parts.push({ type: "reasoning", text: this.reasoningBuffer }); // this.reasoningBuffer = ""; // } if (this.textBuffer) { this.parts.push({ type: "text", text: this.textBuffer }); this.textBuffer = ""; } } ingest(event: z.infer) { switch (event.type) { case "reasoning-start": case "reasoning-end": case "text-start": case "text-end": this.flushBuffers(); break; case "reasoning-delta": this.reasoningBuffer += event.delta; break; case "text-delta": this.textBuffer += event.delta; break; case "tool-call": this.parts.push({ type: "tool-call", toolCallId: event.toolCallId, toolName: event.toolName, arguments: event.input, providerOptions: event.providerOptions, }); break; case "finish-step": this.providerOptions = event.providerOptions; break; } } get(): z.infer { this.flushBuffers(); return { role: "assistant", content: this.parts, providerOptions: this.providerOptions, }; } } function normaliseAskHumanToolCall(message: z.infer) { if (typeof message.content === "string") { return; } let askHumanToolCall: z.infer | null = null; const newParts = []; for (const part of message.content as z.infer[]) { if (part.type === "tool-call" && part.toolName === "ask-human") { if (!askHumanToolCall) { askHumanToolCall = part; } else { (askHumanToolCall as z.infer).arguments += "\n" + part.arguments; } break; } else { newParts.push(part); } } if (askHumanToolCall) { newParts.push(askHumanToolCall); } message.content = newParts; } export async function loadAgent(id: string): Promise> { if (id === "copilot" || id === "rowboatx") { return CopilotAgent; } const repo = container.resolve('agentsRepo'); return await repo.fetch(id); } export function convertFromMessages(messages: z.infer[]): ModelMessage[] { const result: ModelMessage[] = []; for (const msg of messages) { const { providerOptions } = msg; switch (msg.role) { case "assistant": if (typeof msg.content === 'string') { result.push({ role: "assistant", content: msg.content, providerOptions, }); } else { result.push({ role: "assistant", content: msg.content.map(part => { switch (part.type) { case 'text': return part; case 'reasoning': return part; case 'tool-call': return { type: 'tool-call', toolCallId: part.toolCallId, toolName: part.toolName, input: part.arguments, providerOptions: part.providerOptions, }; } }), providerOptions, }); } break; case "system": result.push({ role: "system", content: msg.content, providerOptions, }); break; case "user": result.push({ role: "user", content: msg.content, providerOptions, }); break; case "tool": result.push({ role: "tool", content: [ { type: "tool-result", toolCallId: msg.toolCallId, toolName: msg.toolName, output: { type: "text", value: msg.content, }, }, ], providerOptions, }); break; } } // doing this because: https://github.com/OpenRouterTeam/ai-sdk-provider/issues/262 return JSON.parse(JSON.stringify(result)); } async function buildTools(agent: z.infer): Promise { const tools: ToolSet = {}; for (const [name, tool] of Object.entries(agent.tools ?? {})) { try { tools[name] = await mapAgentTool(tool); } catch (error) { console.error(`Error mapping tool ${name}:`, error); continue; } } return tools; } export class AgentState { runId: string | null = null; agent: z.infer | null = null; agentName: string | null = null; messages: z.infer = []; lastAssistantMsg: z.infer | null = null; subflowStates: Record = {}; toolCallIdMap: Record> = {}; pendingToolCalls: Record = {}; pendingToolPermissionRequests: Record> = {}; pendingAskHumanRequests: Record> = {}; allowedToolCallIds: Record = {}; deniedToolCallIds: Record = {}; getPendingPermissions(): z.infer[] { const response: z.infer[] = []; for (const [id, subflowState] of Object.entries(this.subflowStates)) { for (const perm of subflowState.getPendingPermissions()) { response.push({ ...perm, subflow: [id, ...perm.subflow], }); } } for (const perm of Object.values(this.pendingToolPermissionRequests)) { response.push({ ...perm, subflow: [], }); } return response; } getPendingAskHumans(): z.infer[] { const response: z.infer[] = []; for (const [id, subflowState] of Object.entries(this.subflowStates)) { for (const ask of subflowState.getPendingAskHumans()) { response.push({ ...ask, subflow: [id, ...ask.subflow], }); } } for (const ask of Object.values(this.pendingAskHumanRequests)) { response.push({ ...ask, subflow: [], }); } return response; } finalResponse(): string { if (!this.lastAssistantMsg) { return ''; } if (typeof this.lastAssistantMsg.content === "string") { return this.lastAssistantMsg.content; } return this.lastAssistantMsg.content.reduce((acc, part) => { if (part.type === "text") { return acc + part.text; } return acc; }, ""); } ingest(event: z.infer) { if (event.subflow.length > 0) { const { subflow, ...rest } = event; if (!this.subflowStates[subflow[0]]) { this.subflowStates[subflow[0]] = new AgentState(); } this.subflowStates[subflow[0]].ingest({ ...rest, subflow: subflow.slice(1), }); return; } switch (event.type) { case "start": this.runId = event.runId; this.agentName = event.agentName; break; case "message": this.messages.push(event.message); if (event.message.content instanceof Array) { for (const part of event.message.content) { if (part.type === "tool-call") { this.toolCallIdMap[part.toolCallId] = part; this.pendingToolCalls[part.toolCallId] = true; } } } if (event.message.role === "tool") { const message = event.message as z.infer; delete this.pendingToolCalls[message.toolCallId]; } if (event.message.role === "assistant") { this.lastAssistantMsg = event.message; } break; case "tool-permission-request": this.pendingToolPermissionRequests[event.toolCall.toolCallId] = event; break; case "tool-permission-response": switch (event.response) { case "approve": this.allowedToolCallIds[event.toolCallId] = true; break; case "deny": this.deniedToolCallIds[event.toolCallId] = true; break; } delete this.pendingToolPermissionRequests[event.toolCallId]; break; case "ask-human-request": this.pendingAskHumanRequests[event.toolCallId] = event; break; case "ask-human-response": // console.error('im here', this.agentName, this.runId, event.subflow); const ogEvent = this.pendingAskHumanRequests[event.toolCallId]; this.messages.push({ role: "tool", content: JSON.stringify({ userResponse: event.response, }), toolCallId: ogEvent.toolCallId, toolName: this.toolCallIdMap[ogEvent.toolCallId]!.toolName, }); delete this.pendingAskHumanRequests[ogEvent.toolCallId]; break; } } } export async function* streamAgent({ state, idGenerator, runId, messageQueue, modelConfigRepo, }: { state: AgentState, idGenerator: IMonotonicallyIncreasingIdGenerator; runId: string; messageQueue: IMessageQueue; modelConfigRepo: IModelConfigRepo; }): AsyncGenerator, void, unknown> { async function* processEvent(event: z.infer): AsyncGenerator, void, unknown> { state.ingest(event); yield event; } const modelConfig = await modelConfigRepo.getConfig(); if (!modelConfig) { throw new Error("Model config not found"); } // set up agent const agent = await loadAgent(state.agentName!); // set up tools const tools = await buildTools(agent); // set up provider + model const provider = await getProvider(agent.provider); const model = provider.languageModel(agent.model || modelConfig.defaults.model); let loopCounter = 0; console.log('here'); async function pendingMsgs() { const pendingMsgs = []; } while (true) { // console.error(`loop counter: ${loopCounter++}`) // if last response is from assistant and text, get any pending msgs const lastMessage = state.messages[state.messages.length - 1]; if (lastMessage && lastMessage.role === "assistant" && (typeof lastMessage.content === "string" || !lastMessage.content.some(part => part.type === "tool-call") ) ) { let pending = 0; while(true) { const msg = await messageQueue.dequeue(runId); if (!msg) { break; } pending++; yield *processEvent({ runId, type: "message", messageId: msg.messageId, message: { role: "user", content: msg.message, }, subflow: [], }); } // if no msgs found, return if (!pending) { return; } } // execute any pending tool calls for (const toolCallId of Object.keys(state.pendingToolCalls)) { const toolCall = state.toolCallIdMap[toolCallId]; // if ask-human, skip if (toolCall.toolName === "ask-human") { continue; } // if tool has been denied, deny if (state.deniedToolCallIds[toolCallId]) { yield *processEvent({ runId, messageId: await idGenerator.next(), type: "message", message: { role: "tool", content: "Unable to execute this tool: Permission was denied.", toolCallId: toolCallId, toolName: toolCall.toolName, }, subflow: [], }); continue; } // if permission is pending on this tool call, allow execution if (state.pendingToolPermissionRequests[toolCallId]) { continue; } // execute approved tool yield *processEvent({ runId, type: "tool-invocation", toolCallId, toolName: toolCall.toolName, input: JSON.stringify(toolCall.arguments), subflow: [], }); let result: any = null; if (agent.tools![toolCall.toolName].type === "agent") { let subflowState = state.subflowStates[toolCallId]; for await (const event of streamAgent({ state: subflowState, idGenerator, runId, messageQueue, modelConfigRepo, })) { yield *processEvent({ ...event, subflow: [toolCallId, ...event.subflow], }); } if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) { result = subflowState.finalResponse(); } } else { result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments); } if (result) { const resultMsg: z.infer = { role: "tool", content: JSON.stringify(result), toolCallId: toolCall.toolCallId, toolName: toolCall.toolName, }; yield* processEvent({ runId, type: "tool-result", toolCallId: toolCall.toolCallId, toolName: toolCall.toolName, result: result, subflow: [], }); yield *processEvent({ runId, messageId: await idGenerator.next(), type: "message", message: resultMsg, subflow: [], }); } } // if pending state, exit if (state.getPendingAskHumans().length || state.getPendingPermissions().length) { // console.error("pending asks or permissions, exiting (b.)") return; } // if current message state isn't runnable, exit /* if (state.messages.length === 0 || state.messages[state.messages.length - 1].role === "assistant") { // console.error("current message state isn't runnable, exiting (c.)") return; } */ while(true) { const msg = await messageQueue.dequeue(runId); if (!msg) { break; } yield *processEvent({ runId, type: "message", messageId: msg.messageId, message: { role: "user", content: msg.message, }, subflow: [], }); } // run one LLM turn. // stream agent response and build message const messageBuilder = new StreamStepMessageBuilder(); for await (const event of streamLlm( model, state.messages, agent.instructions, tools, )) { messageBuilder.ingest(event); yield *processEvent({ runId, type: "llm-stream-event", event: event, subflow: [], }); } // build and emit final message from agent response const message = messageBuilder.get(); yield *processEvent({ runId, messageId: await idGenerator.next(), type: "message", message, subflow: [], }); // if there were any ask-human calls, emit those events if (message.content instanceof Array) { for (const part of message.content) { if (part.type === "tool-call") { const underlyingTool = agent.tools![part.toolName]; if (underlyingTool.type === "builtin" && underlyingTool.name === "ask-human") { yield *processEvent({ runId, type: "ask-human-request", toolCallId: part.toolCallId, query: part.arguments.question, subflow: [], }); } if (underlyingTool.type === "builtin" && underlyingTool.name === "executeCommand") { // if command is blocked, then seek permission if (isBlocked(part.arguments.command)) { yield *processEvent({ runId, type: "tool-permission-request", toolCall: part, subflow: [], }); } } if (underlyingTool.type === "agent" && underlyingTool.name) { yield *processEvent({ runId, type: "spawn-subflow", agentName: underlyingTool.name, toolCallId: part.toolCallId, subflow: [], }); yield *processEvent({ runId, messageId: await idGenerator.next(), type: "message", message: { role: "user", content: part.arguments.message, }, subflow: [part.toolCallId], }); } } } } } } async function* streamLlm( model: LanguageModel, messages: z.infer, instructions: string, tools: ToolSet, ): AsyncGenerator, void, unknown> { const { fullStream } = streamText({ model, messages: convertFromMessages(messages), system: instructions, tools, stopWhen: stepCountIs(1), }); for await (const event of fullStream) { // console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event)); switch (event.type) { case "reasoning-start": yield { type: "reasoning-start", providerOptions: event.providerMetadata, }; break; case "reasoning-delta": yield { type: "reasoning-delta", delta: event.text, providerOptions: event.providerMetadata, }; break; case "reasoning-end": yield { type: "reasoning-end", providerOptions: event.providerMetadata, }; break; case "text-start": yield { type: "text-start", providerOptions: event.providerMetadata, }; break; case "text-delta": yield { type: "text-delta", delta: event.text, providerOptions: event.providerMetadata, }; break; case "tool-call": yield { type: "tool-call", toolCallId: event.toolCallId, toolName: event.toolName, input: event.input, providerOptions: event.providerMetadata, }; break; case "finish-step": yield { type: "finish-step", usage: event.usage, finishReason: event.finishReason, providerOptions: event.providerMetadata, }; break; default: // console.warn("Unknown event type", event); continue; } } } export const MappedToolCall = z.object({ toolCall: ToolCallPart, agentTool: ToolAttachment, });