diff --git a/apps/cli/src/agents/runtime.ts b/apps/cli/src/agents/runtime.ts index b665730b..92c3fea7 100644 --- a/apps/cli/src/agents/runtime.ts +++ b/apps/cli/src/agents/runtime.ts @@ -21,6 +21,7 @@ import { IBus } from "../application/lib/bus.js"; import { IMessageQueue } from "../application/lib/message-queue.js"; import { IRunsRepo } from "../runs/repo.js"; import { IRunsLock } from "../runs/lock.js"; +import { PrefixLogger } from "../shared/prefix-logger.js"; export interface IAgentRuntime { trigger(runId: string): Promise; @@ -63,6 +64,11 @@ export class AgentRuntime implements IAgentRuntime { return; } try { + await this.bus.publish({ + runId, + type: "run-processing-start", + subflow: [], + }); while (true) { let eventCount = 0; const run = await this.runsRepo.fetch(runId); @@ -94,6 +100,11 @@ export class AgentRuntime implements IAgentRuntime { } } finally { await this.runsLock.release(runId); + await this.bus.publish({ + runId, + type: "run-processing-end", + subflow: [], + }); } } } @@ -499,6 +510,8 @@ export async function* streamAgent({ messageQueue: IMessageQueue; modelConfigRepo: IModelConfigRepo; }): AsyncGenerator, void, unknown> { + const logger = new PrefixLogger(`run-${runId}-${state.agentName}`); + async function* processEvent(event: z.infer): AsyncGenerator, void, unknown> { state.ingest(event); yield event; @@ -518,61 +531,29 @@ export async function* streamAgent({ // set up provider + model const provider = await getProvider(agent.provider); const model = provider.languageModel(agent.model || modelConfig.defaults.model); + let loopCounter = 0; - - console.log('here'); - - async function pendingMsgs() { - const pendingMsgs = []; - - } - while (true) { - // console.error(`loop counter: ${loopCounter++}`) - // if last response is from assistant and text, get any pending msgs - const lastMessage = state.messages[state.messages.length - 1]; - if (lastMessage - && lastMessage.role === "assistant" - && (typeof lastMessage.content === "string" - || !lastMessage.content.some(part => part.type === "tool-call") - ) - ) { - let pending = 0; - while(true) { - const msg = await messageQueue.dequeue(runId); - if (!msg) { - break; - } - pending++; - yield *processEvent({ - runId, - type: "message", - messageId: msg.messageId, - message: { - role: "user", - content: msg.message, - }, - subflow: [], - }); - } - // if no msgs found, return - if (!pending) { - return; - } - } + loopCounter++; + let loopLogger = logger.child(`iter-${loopCounter}`); + loopLogger.log('starting loop iteration'); // execute any pending tool calls for (const toolCallId of Object.keys(state.pendingToolCalls)) { const toolCall = state.toolCallIdMap[toolCallId]; + let _logger = loopLogger.child(`tc-${toolCallId}-${toolCall.toolName}`); + _logger.log('processing'); // if ask-human, skip if (toolCall.toolName === "ask-human") { + _logger.log('skipping, reason: ask-human'); continue; } // if tool has been denied, deny if (state.deniedToolCallIds[toolCallId]) { - yield *processEvent({ + _logger.log('returning denied tool message, reason: tool has been denied'); + yield* processEvent({ runId, messageId: await idGenerator.next(), type: "message", @@ -587,13 +568,15 @@ export async function* streamAgent({ continue; } - // if permission is pending on this tool call, allow execution + // if permission is pending on this tool call, skip execution if (state.pendingToolPermissionRequests[toolCallId]) { + _logger.log('skipping, reason: permission is pending'); continue; } // execute approved tool - yield *processEvent({ + _logger.log('executing tool'); + yield* processEvent({ runId, type: "tool-invocation", toolCallId, @@ -611,7 +594,7 @@ export async function* streamAgent({ messageQueue, modelConfigRepo, })) { - yield *processEvent({ + yield* processEvent({ ...event, subflow: [toolCallId, ...event.subflow], }); @@ -637,7 +620,7 @@ export async function* streamAgent({ result: result, subflow: [], }); - yield *processEvent({ + yield* processEvent({ runId, messageId: await idGenerator.next(), type: "message", @@ -647,26 +630,20 @@ export async function* streamAgent({ } } - // if pending state, exit + // if waiting on user permission or ask-human, exit if (state.getPendingAskHumans().length || state.getPendingPermissions().length) { - // console.error("pending asks or permissions, exiting (b.)") + loopLogger.log('exiting loop, reason: pending asks or permissions'); return; } - // if current message state isn't runnable, exit - /* - if (state.messages.length === 0 || state.messages[state.messages.length - 1].role === "assistant") { - // console.error("current message state isn't runnable, exiting (c.)") - return; - } - */ - - while(true) { + // get any queued user messages + while (true) { const msg = await messageQueue.dequeue(runId); if (!msg) { break; } - yield *processEvent({ + loopLogger.log('dequeued user message', msg.messageId); + yield* processEvent({ runId, type: "message", messageId: msg.messageId, @@ -678,7 +655,20 @@ export async function* streamAgent({ }); } + // if last response is from assistant and text, exit + const lastMessage = state.messages[state.messages.length - 1]; + if (lastMessage + && lastMessage.role === "assistant" + && (typeof lastMessage.content === "string" + || !lastMessage.content.some(part => part.type === "tool-call") + ) + ) { + loopLogger.log('exiting loop, reason: last message is from assistant and text'); + return; + } + // run one LLM turn. + loopLogger.log('running llm turn'); // stream agent response and build message const messageBuilder = new StreamStepMessageBuilder(); for await (const event of streamLlm( @@ -687,8 +677,9 @@ export async function* streamAgent({ agent.instructions, tools, )) { + loopLogger.log('got llm-stream-event:', event.type) messageBuilder.ingest(event); - yield *processEvent({ + yield* processEvent({ runId, type: "llm-stream-event", event: event, @@ -698,7 +689,7 @@ export async function* streamAgent({ // build and emit final message from agent response const message = messageBuilder.get(); - yield *processEvent({ + yield* processEvent({ runId, messageId: await idGenerator.next(), type: "message", @@ -712,7 +703,8 @@ export async function* streamAgent({ if (part.type === "tool-call") { const underlyingTool = agent.tools![part.toolName]; if (underlyingTool.type === "builtin" && underlyingTool.name === "ask-human") { - yield *processEvent({ + loopLogger.log('emitting ask-human-request, toolCallId:', part.toolCallId); + yield* processEvent({ runId, type: "ask-human-request", toolCallId: part.toolCallId, @@ -723,7 +715,8 @@ export async function* streamAgent({ if (underlyingTool.type === "builtin" && underlyingTool.name === "executeCommand") { // if command is blocked, then seek permission if (isBlocked(part.arguments.command)) { - yield *processEvent({ + loopLogger.log('emitting tool-permission-request, toolCallId:', part.toolCallId); + yield* processEvent({ runId, type: "tool-permission-request", toolCall: part, @@ -732,14 +725,15 @@ export async function* streamAgent({ } } if (underlyingTool.type === "agent" && underlyingTool.name) { - yield *processEvent({ + loopLogger.log('emitting spawn-subflow, toolCallId:', part.toolCallId); + yield* processEvent({ runId, type: "spawn-subflow", agentName: underlyingTool.name, toolCallId: part.toolCallId, subflow: [], }); - yield *processEvent({ + yield* processEvent({ runId, messageId: await idGenerator.next(), type: "message", diff --git a/apps/cli/src/entities/run-events.ts b/apps/cli/src/entities/run-events.ts index cfbd0e45..63180354 100644 --- a/apps/cli/src/entities/run-events.ts +++ b/apps/cli/src/entities/run-events.ts @@ -8,6 +8,14 @@ const BaseRunEvent = z.object({ subflow: z.array(z.string()), }); +export const RunProcessingStartEvent = BaseRunEvent.extend({ + type: z.literal("run-processing-start"), +}); + +export const RunProcessingEndEvent = BaseRunEvent.extend({ + type: z.literal("run-processing-end"), +}); + export const StartEvent = BaseRunEvent.extend({ type: z.literal("start"), agentName: z.string(), @@ -73,6 +81,8 @@ export const RunErrorEvent = BaseRunEvent.extend({ }); export const RunEvent = z.union([ + RunProcessingStartEvent, + RunProcessingEndEvent, StartEvent, SpawnSubFlowEvent, LlmStreamEvent, diff --git a/apps/cli/src/shared/prefix-logger.ts b/apps/cli/src/shared/prefix-logger.ts new file mode 100644 index 00000000..8d199b56 --- /dev/null +++ b/apps/cli/src/shared/prefix-logger.ts @@ -0,0 +1,26 @@ +// create a PrefixLogger class that wraps console.log with a prefix +// and allows chaining with a parent logger +export class PrefixLogger { + private prefix: string; + private parent: PrefixLogger | null; + + constructor(prefix: string, parent: PrefixLogger | null = null) { + this.prefix = prefix; + this.parent = parent; + } + + log(...args: any[]) { + const timestamp = new Date().toISOString(); + const prefix = '[' + this.prefix + ']'; + + if (this.parent) { + this.parent.log(prefix, ...args); + } else { + console.log(timestamp, prefix, ...args); + } + } + + child(childPrefix: string): PrefixLogger { + return new PrefixLogger(childPrefix, this); + } +} \ No newline at end of file