import { jsonSchema, ModelMessage } from "ai"; import fs from "fs"; import path from "path"; import { ModelConfig, WorkDir } from "../config/config.js"; import { Agent, ToolAttachment } from "../entities/agent.js"; import { createInterface, Interface } from "node:readline/promises"; import { stdin as input, stdout as output } from "node:process"; import { AssistantContentPart, AssistantMessage, Message, MessageList, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js"; import { runIdGenerator } from "./run-id-gen.js"; import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai"; import { z } from "zod"; import { getProvider } from "./models.js"; import { LlmStepStreamEvent } from "../entities/llm-step-events.js"; import { execTool } from "./exec-tool.js"; import { RunEvent } from "../entities/run-events.js"; import { CopilotAgent } from "../assistant/agent.js"; import { BuiltinTools } from "./builtin-tools.js"; export async function mapAgentTool(t: z.infer): Promise { switch (t.type) { case "mcp": return tool({ name: t.name, description: t.description, inputSchema: jsonSchema(t.inputSchema), }); case "agent": const agent = await loadAgent(t.name); if (!agent) { throw new Error(`Agent ${t.name} not found`); } return tool({ name: t.name, description: agent.description, inputSchema: z.object({ message: z.string().describe("The message to send to the workflow"), }), }); case "builtin": const match = BuiltinTools[t.name]; if (!match) { throw new Error(`Unknown builtin tool: ${t.name}`); } return tool({ description: match.description, inputSchema: match.inputSchema, }); } } export class RunLogger { private logFile: string; private fileHandle: fs.WriteStream; ensureRunsDir() { const runsDir = path.join(WorkDir, "runs"); if (!fs.existsSync(runsDir)) { fs.mkdirSync(runsDir, { recursive: true }); } } constructor(runId: string) { this.ensureRunsDir(); this.logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); this.fileHandle = fs.createWriteStream(this.logFile, { flags: "a", encoding: "utf8", }); } log(event: z.infer) { if (event.type !== "stream-event") { this.fileHandle.write(JSON.stringify(event) + "\n"); } } close() { this.fileHandle.close(); } } export class LogAndYield { private logger: RunLogger constructor(logger: RunLogger) { this.logger = logger; } async *logAndYield(event: z.infer): AsyncGenerator, void, unknown> { const ev = { ...event, ts: new Date().toISOString(), } this.logger.log(ev); yield ev; } } export class StreamStepMessageBuilder { private parts: z.infer[] = []; private textBuffer: string = ""; private reasoningBuffer: string = ""; flushBuffers() { // skip reasoning // if (this.reasoningBuffer) { // this.parts.push({ type: "reasoning", text: this.reasoningBuffer }); // this.reasoningBuffer = ""; // } if (this.textBuffer) { this.parts.push({ type: "text", text: this.textBuffer }); this.textBuffer = ""; } } ingest(event: z.infer) { switch (event.type) { case "reasoning-start": case "reasoning-end": case "text-start": case "text-end": this.flushBuffers(); break; case "reasoning-delta": this.reasoningBuffer += event.delta; break; case "text-delta": this.textBuffer += event.delta; break; case "tool-call": this.parts.push({ type: "tool-call", toolCallId: event.toolCallId, toolName: event.toolName, arguments: event.input, }); break; } } get(): z.infer { this.flushBuffers(); return { role: "assistant", content: this.parts, }; } } export async function loadAgent(id: string): Promise> { const agentPath = path.join(WorkDir, "agents", `${id}.json`); const agent = fs.readFileSync(agentPath, "utf8"); return Agent.parse(JSON.parse(agent)); } export function convertFromMessages(messages: z.infer[]): ModelMessage[] { const result: ModelMessage[] = []; for (const msg of messages) { switch (msg.role) { case "assistant": if (typeof msg.content === 'string') { result.push({ role: "assistant", content: msg.content, }); } else { result.push({ role: "assistant", content: msg.content.map(part => { switch (part.type) { case 'text': return part; case 'reasoning': return part; case 'tool-call': return { type: 'tool-call', toolCallId: part.toolCallId, toolName: part.toolName, input: part.arguments, }; } }), }); } break; case "system": result.push({ role: "system", content: msg.content, }); break; case "user": result.push({ role: "user", content: msg.content, }); break; case "tool": result.push({ role: "tool", content: [ { type: "tool-result", toolCallId: msg.toolCallId, toolName: msg.toolName, output: { type: "text", value: msg.content, }, }, ], }); break; } } return result; } export async function* streamAgent(opts: { agent: string; runId?: string; input?: string; interactive?: boolean; }): AsyncGenerator, void, unknown> { const messages: z.infer = []; // load existing and assemble state if required let runId = opts.runId; if (runId) { console.error("loading run", runId); let stream: fs.ReadStream | null = null; let rl: Interface | null = null; try { const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); stream = fs.createReadStream(logFile, { encoding: "utf8" }); rl = createInterface({ input: stream, crlfDelay: Infinity }); for await (const line of rl) { if (line.trim() === "") { continue; } const parsed = JSON.parse(line); const event = RunEvent.parse(parsed); switch (event.type) { case "message": messages.push(event.message); break; } } } finally { stream?.close(); } } // create runId if not present if (!runId) { runId = runIdGenerator.next(); } // load agent data let agent: z.infer | null = null; if (opts.agent === "copilot") { agent = CopilotAgent; } else { agent = await loadAgent(opts.agent); } if (!agent) { throw new Error("unable to load agent"); } // set up tools const tools: ToolSet = {}; for (const [name, tool] of Object.entries(agent.tools ?? {})) { try { tools[name] = await mapAgentTool(tool); } catch (error) { console.error(`Error mapping tool ${name}:`, error); continue; } } // set up const logger = new RunLogger(runId); const ly = new LogAndYield(logger); const provider = getProvider(agent.provider); const model = provider(agent.model || ModelConfig.defaults.model); // emit start event if first time run if (!opts.runId) { yield* ly.logAndYield({ type: "start", runId, agent: opts.agent, interactive: opts.interactive ?? false, }); } // get first input if needed let rl: Interface | null = null; if (opts.interactive) { rl = createInterface({ input, output }); } if (opts.input) { const m: z.infer = { role: "user", content: opts.input, }; messages.push(m); yield *ly.logAndYield({ type: "message", message: m, }); } try { // loop b/w user and agent while (true) { // get input in interactive mode when last message is not user if (opts.interactive && (messages.length === 0 || messages[messages.length - 1].role !== "user")) { const input = await rl!.question("You: "); // Exit condition if (["q", "quit", "exit"].includes(input.toLowerCase())) { console.log("\nšŸ‘‹ Goodbye!"); return; } const m: z.infer = { role: "user", content: input, }; messages.push(m); yield* ly.logAndYield({ type: "message", message: m, }); } // inner loop to handle tool calls while (true) { // stream agent response and build message const messageBuilder = new StreamStepMessageBuilder(); for await (const event of streamLlm( model, messages, agent.instructions, tools, )) { messageBuilder.ingest(event); yield* ly.logAndYield({ type: "stream-event", event: event, }); } // build and emit final message from agent response const msg = messageBuilder.get(); messages.push(msg); yield* ly.logAndYield({ type: "message", message: msg, }); // handle tool calls const mappedToolCalls: z.infer[] = []; let msgToolCallParts: z.infer[] = []; if (msg.content instanceof Array) { msgToolCallParts = msg.content.filter(part => part.type === "tool-call"); } const hasToolCalls = msgToolCallParts.length > 0; console.log(msgToolCallParts); // validate and map tool calls for (const part of msgToolCallParts) { const agentTool = tools[part.toolName]; if (!agentTool) { throw new Error(`Tool ${part.toolName} not found`); } mappedToolCalls.push({ toolCall: part, agentTool: agent.tools![part.toolName], }); } for (const call of mappedToolCalls) { const { agentTool, toolCall } = call; yield* ly.logAndYield({ type: "tool-invocation", toolName: toolCall.toolName, input: JSON.stringify(toolCall.arguments), }); const result = await execTool(agentTool, toolCall.arguments); const resultMsg: z.infer = { role: "tool", content: JSON.stringify(result), toolCallId: toolCall.toolCallId, toolName: toolCall.toolName, }; messages.push(resultMsg); yield* ly.logAndYield({ type: "tool-result", toolName: toolCall.toolName, result: result, }); yield* ly.logAndYield({ type: "message", message: resultMsg, }); } // if the agent response had tool calls, replay this agent if (hasToolCalls) { continue; } // otherwise, break break; } // if not interactive, return if (!opts.interactive) { break; } } } finally { rl?.close(); logger.close(); } } async function* streamLlm( model: LanguageModel, messages: z.infer, instructions: string, tools: ToolSet, ): AsyncGenerator, void, unknown> { const { fullStream } = streamText({ model, messages: convertFromMessages(messages), system: instructions, tools, stopWhen: stepCountIs(1), providerOptions: { openai: { reasoningEffort: "low", reasoningSummary: "auto", }, } }); for await (const event of fullStream) { // console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event)); switch (event.type) { case "reasoning-start": yield { type: "reasoning-start", }; break; case "reasoning-delta": yield { type: "reasoning-delta", delta: event.text, }; break; case "reasoning-end": yield { type: "reasoning-end", }; break; case "text-start": yield { type: "text-start", }; break; case "text-delta": yield { type: "text-delta", delta: event.text, }; break; case "tool-call": yield { type: "tool-call", toolCallId: event.toolCallId, toolName: event.toolName, input: event.input, }; break; case "finish": yield { type: "usage", usage: event.totalUsage, }; break; default: // console.warn("Unknown event type", event); continue; } } } export const MappedToolCall = z.object({ toolCall: ToolCallPart, agentTool: ToolAttachment, });