diff --git a/apps/cli/bin/app.js b/apps/cli/bin/app.js index 172d56ed..457f2ab7 100755 --- a/apps/cli/bin/app.js +++ b/apps/cli/bin/app.js @@ -20,12 +20,18 @@ yargs(hideBin(process.argv)) .option("input", { type: "string", description: "The input to the agent", + }) + .option("no-interactive", { + type: "boolean", + description: "Do not interact with the user", + default: false, }), (argv) => { app({ agent: argv.agent, runId: argv.run_id, input: argv.input, + noInteractive: argv.noInteractive, }); } ) diff --git a/apps/cli/src/app.ts b/apps/cli/src/app.ts index 061bdcb2..eb00e35d 100644 --- a/apps/cli/src/app.ts +++ b/apps/cli/src/app.ts @@ -1,19 +1,137 @@ -import { streamAgent } from "./application/lib/agent.js"; +import { loadAgent, RunLogger, streamAgentTurn } from "./application/lib/agent.js"; import { StreamRenderer } from "./application/lib/stream-renderer.js"; +import { stdin as input, stdout as output } from "node:process"; +import fs from "fs"; +import path from "path"; +import { WorkDir } from "./application/config/config.js"; +import { RunEvent, RunStartEvent } from "./application/entities/run-events.js"; +import { createInterface, Interface } from "node:readline/promises"; +import { runIdGenerator } from "./application/lib/run-id-gen.js"; +import { Agent } from "./application/entities/agent.js"; +import { MessageList } from "./application/entities/message.js"; +import { z } from "zod"; +import { CopilotAgent } from "./application/assistant/agent.js"; export async function app(opts: { agent: string; runId?: string; input?: string; + noInteractive?: boolean; }) { + let inputCount = 0; + const messages: z.infer = []; const renderer = new StreamRenderer(); - for await (const event of streamAgent({ - ...opts, - interactive: true, - })) { - renderer.render(event); - if (event?.type === "error") { - process.exitCode = 1; + + // load existing and assemble state if required + let runId = opts.runId; + if (runId) { + console.error("loading run", runId); + let stream: fs.ReadStream | null = null; + let rl: Interface | null = null; + try { + const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); + stream = fs.createReadStream(logFile, { encoding: "utf8" }); + rl = createInterface({ input: stream, crlfDelay: Infinity }); + for await (const line of rl) { + if (line.trim() === "") { + continue; + } + const parsed = JSON.parse(line); + const event = RunEvent.parse(parsed); + switch (event.type) { + case "message": + messages.push(event.message); + break; + } + } + } finally { + stream?.close(); } } + + // add user input + if (opts.input) { + messages.push({ + role: "user", + content: opts.input, + }); + inputCount++; + } + + // create runId if not present + if (!runId) { + runId = runIdGenerator.next(); + } + const logger = new RunLogger(runId); + + // load agent data + let agent: z.infer | null = null; + if (opts.agent === "copilot") { + agent = CopilotAgent; + } else { + agent = await loadAgent(opts.agent); + } + if (!agent) { + throw new Error("unable to load agent"); + } + + // emit start event if first time run + if (!opts.runId) { + const ev = { + type: "start", + runId, + agent: agent.name, + } as z.infer; + logger.log(ev); + renderer.render(ev); + } + + // loop between user and agent + let rl: Interface | null = null; + if (!opts.noInteractive) { + rl = createInterface({ input, output }); + } + let firstPass = true; + try { + while (true) { + let askInput = false; + if (firstPass) { + if (!opts.input) { + askInput = true; + } + firstPass = false; + } else { + askInput = true; + } + if (rl && askInput) { + const userInput = await rl.question("You: "); + if (["quit", "exit", "q"].includes(userInput.trim().toLowerCase())) { + console.error("Bye!"); + return; + } + inputCount++; + messages.push({ + role: "user", + content: userInput, + }); + } + for await (const event of streamAgentTurn({ + agent, + messages, + })) { + logger.log(event); + renderer.render(event); + if (event?.type === "error") { + process.exitCode = 1; + } + } + + if (opts.noInteractive) { + break; + } + } + } finally { + logger.close(); + rl?.close(); + } } \ No newline at end of file diff --git a/apps/cli/src/application/entities/run-events.ts b/apps/cli/src/application/entities/run-events.ts index 670b93e3..1ce6a7a6 100644 --- a/apps/cli/src/application/entities/run-events.ts +++ b/apps/cli/src/application/entities/run-events.ts @@ -11,7 +11,6 @@ export const RunStartEvent = BaseRunEvent.extend({ type: z.literal("start"), runId: z.string(), agent: z.string(), - interactive: z.boolean(), }); export const RunStepStartEvent = BaseRunEvent.extend({ diff --git a/apps/cli/src/application/lib/agent.ts b/apps/cli/src/application/lib/agent.ts index 1a612370..3661acaf 100644 --- a/apps/cli/src/application/lib/agent.ts +++ b/apps/cli/src/application/lib/agent.ts @@ -4,7 +4,6 @@ import path from "path"; import { ModelConfig, WorkDir } from "../config/config.js"; import { Agent, ToolAttachment } from "../entities/agent.js"; import { createInterface, Interface } from "node:readline/promises"; -import { stdin as input, stdout as output } from "node:process"; import { AssistantContentPart, AssistantMessage, Message, MessageList, ToolCallPart, ToolMessage, UserMessage } from "../entities/message.js"; import { runIdGenerator } from "./run-id-gen.js"; import { LanguageModel, stepCountIs, streamText, tool, Tool, ToolSet } from "ai"; @@ -79,23 +78,6 @@ export class RunLogger { } } -export class LogAndYield { - private logger: RunLogger - - constructor(logger: RunLogger) { - this.logger = logger; - } - - async *logAndYield(event: z.infer): AsyncGenerator, void, unknown> { - const ev = { - ...event, - ts: new Date().toISOString(), - } - this.logger.log(ev); - yield ev; - } -} - export class StreamStepMessageBuilder { private parts: z.infer[] = []; private textBuffer: string = ""; @@ -218,56 +200,11 @@ export function convertFromMessages(messages: z.infer[]): ModelM } -export async function* streamAgent(opts: { - agent: string; - runId?: string; - input?: string; - interactive?: boolean; +export async function* streamAgentTurn(opts: { + agent: z.infer; + messages: z.infer; }): AsyncGenerator, void, unknown> { - const messages: z.infer = []; - - // load existing and assemble state if required - let runId = opts.runId; - if (runId) { - console.error("loading run", runId); - let stream: fs.ReadStream | null = null; - let rl: Interface | null = null; - try { - const logFile = path.join(WorkDir, "runs", `${runId}.jsonl`); - stream = fs.createReadStream(logFile, { encoding: "utf8" }); - rl = createInterface({ input: stream, crlfDelay: Infinity }); - for await (const line of rl) { - if (line.trim() === "") { - continue; - } - const parsed = JSON.parse(line); - const event = RunEvent.parse(parsed); - switch (event.type) { - case "message": - messages.push(event.message); - break; - } - } - } finally { - stream?.close(); - } - } - - // create runId if not present - if (!runId) { - runId = runIdGenerator.next(); - } - - // load agent data - let agent: z.infer | null = null; - if (opts.agent === "copilot") { - agent = CopilotAgent; - } else { - agent = await loadAgent(opts.agent); - } - if (!agent) { - throw new Error("unable to load agent"); - } + const { agent, messages } = opts; // set up tools const tools: ToolSet = {}; @@ -281,149 +218,87 @@ export async function* streamAgent(opts: { } // set up - const logger = new RunLogger(runId); - const ly = new LogAndYield(logger); const provider = getProvider(agent.provider); const model = provider(agent.model || ModelConfig.defaults.model); - // emit start event if first time run - if (!opts.runId) { - yield* ly.logAndYield({ - type: "start", - runId, - agent: opts.agent, - interactive: opts.interactive ?? false, - }); - } - - // get first input if needed - let rl: Interface | null = null; - if (opts.interactive) { - rl = createInterface({ input, output }); - } - if (opts.input) { - const m: z.infer = { - role: "user", - content: opts.input, - }; - messages.push(m); - yield *ly.logAndYield({ - type: "message", - message: m, - }); - } - try { - // loop b/w user and agent - while (true) { - // get input in interactive mode when last message is not user - if (opts.interactive && (messages.length === 0 || messages[messages.length - 1].role !== "user")) { - const input = await rl!.question("You: "); - // Exit condition - if (["q", "quit", "exit"].includes(input.toLowerCase())) { - console.log("\nšŸ‘‹ Goodbye!"); - return; - } - - const m: z.infer = { - role: "user", - content: input, - }; - messages.push(m); - yield* ly.logAndYield({ - type: "message", - message: m, - }); - } - - // inner loop to handle tool calls - while (true) { - // stream agent response and build message - const messageBuilder = new StreamStepMessageBuilder(); - for await (const event of streamLlm( - model, - messages, - agent.instructions, - tools, - )) { - messageBuilder.ingest(event); - yield* ly.logAndYield({ - type: "stream-event", - event: event, - }); - } - - // build and emit final message from agent response - const msg = messageBuilder.get(); - messages.push(msg); - yield* ly.logAndYield({ - type: "message", - message: msg, - }); - - // handle tool calls - const mappedToolCalls: z.infer[] = []; - let msgToolCallParts: z.infer[] = []; - if (msg.content instanceof Array) { - msgToolCallParts = msg.content.filter(part => part.type === "tool-call"); - } - const hasToolCalls = msgToolCallParts.length > 0; - console.log(msgToolCallParts); - - // validate and map tool calls - for (const part of msgToolCallParts) { - const agentTool = tools[part.toolName]; - if (!agentTool) { - throw new Error(`Tool ${part.toolName} not found`); - } - mappedToolCalls.push({ - toolCall: part, - agentTool: agent.tools![part.toolName], - }); - } - - for (const call of mappedToolCalls) { - const { agentTool, toolCall } = call; - yield* ly.logAndYield({ - type: "tool-invocation", - toolName: toolCall.toolName, - input: JSON.stringify(toolCall.arguments), - }); - const result = await execTool(agentTool, toolCall.arguments); - const resultMsg: z.infer = { - role: "tool", - content: JSON.stringify(result), - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - }; - messages.push(resultMsg); - yield* ly.logAndYield({ - type: "tool-result", - toolName: toolCall.toolName, - result: result, - }); - yield* ly.logAndYield({ - type: "message", - message: resultMsg, - }); - } - - // if the agent response had tool calls, replay this agent - if (hasToolCalls) { - continue; - } - - // otherwise, break - break; - } - - // if not interactive, return - if (!opts.interactive) { - break; - } + // run one turn + while (true) { + // stream agent response and build message + const messageBuilder = new StreamStepMessageBuilder(); + for await (const event of streamLlm( + model, + messages, + agent.instructions, + tools, + )) { + messageBuilder.ingest(event); + yield { + type: "stream-event", + event: event, + }; } - } finally { - rl?.close(); - logger.close(); + + // build and emit final message from agent response + const msg = messageBuilder.get(); + messages.push(msg); + yield { + type: "message", + message: msg, + }; + + // handle tool calls + const mappedToolCalls: z.infer[] = []; + let msgToolCallParts: z.infer[] = []; + if (msg.content instanceof Array) { + msgToolCallParts = msg.content.filter(part => part.type === "tool-call"); + } + const hasToolCalls = msgToolCallParts.length > 0; + + // validate and map tool calls + for (const part of msgToolCallParts) { + const agentTool = tools[part.toolName]; + if (!agentTool) { + throw new Error(`Tool ${part.toolName} not found`); + } + mappedToolCalls.push({ + toolCall: part, + agentTool: agent.tools![part.toolName], + }); + } + + for (const call of mappedToolCalls) { + const { agentTool, toolCall } = call; + yield { + type: "tool-invocation", + toolName: toolCall.toolName, + input: JSON.stringify(toolCall.arguments), + }; + const result = await execTool(agentTool, toolCall.arguments); + const resultMsg: z.infer = { + role: "tool", + content: JSON.stringify(result), + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + }; + messages.push(resultMsg); + yield { + type: "tool-result", + toolName: toolCall.toolName, + result: result, + }; + yield { + type: "message", + message: resultMsg, + }; + } + + // if the agent response had tool calls, replay this agent + if (hasToolCalls) { + continue; + } + + // otherwise, break + break; } } diff --git a/apps/cli/src/application/lib/exec-tool.ts b/apps/cli/src/application/lib/exec-tool.ts index d0409365..3c78e619 100644 --- a/apps/cli/src/application/lib/exec-tool.ts +++ b/apps/cli/src/application/lib/exec-tool.ts @@ -8,7 +8,8 @@ import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { Client } from "@modelcontextprotocol/sdk/client"; import { AssistantMessage } from "../entities/message.js"; import { BuiltinTools } from "./builtin-tools.js"; -import { streamAgent } from "./agent.js"; +import { loadAgent, streamAgentTurn } from "./agent.js"; +import { app } from "@/app.js"; async function execMcpTool(agentTool: z.infer & { type: "mcp" }, input: any): Promise { // load mcp configuration from the tool @@ -55,9 +56,13 @@ async function execMcpTool(agentTool: z.infer & { type: " async function execAgentTool(agentTool: z.infer & { type: "agent" }, input: any): Promise { let lastMsg: z.infer | null = null; - for await (const event of streamAgent({ - agent: agentTool.name, - input: JSON.stringify(input), + const agent = await loadAgent(agentTool.name); + for await (const event of streamAgentTurn({ + agent, + messages: [{ + role: "user", + content: JSON.stringify(input), + }], })) { if (event.type === "message" && event.message.role === "assistant") { lastMsg = event.message; diff --git a/apps/cli/src/application/lib/stream-renderer.ts b/apps/cli/src/application/lib/stream-renderer.ts index cfd7ab1a..341be492 100644 --- a/apps/cli/src/application/lib/stream-renderer.ts +++ b/apps/cli/src/application/lib/stream-renderer.ts @@ -27,7 +27,7 @@ export class StreamRenderer { render(event: z.infer) { switch (event.type) { case "start": { - this.onStart(event.agent, event.runId, event.interactive); + this.onStart(event.agent, event.runId); break; } case "step-start": { @@ -94,10 +94,9 @@ export class StreamRenderer { } } - private onStart(agent: string, runId: string, interactive: boolean) { + private onStart(agent: string, runId: string) { this.write("\n"); this.write(this.bold(`ā–¶ Agent ${agent} (run ${runId})`)); - if (!interactive) this.write(this.dim(" (--no-interactive)")); this.write("\n"); }