add processing start / end events

This commit is contained in:
Ramnique Singh 2025-12-17 11:43:34 +05:30
parent f1219db4ac
commit 22906894ea
3 changed files with 93 additions and 63 deletions

View file

@ -21,6 +21,7 @@ import { IBus } from "../application/lib/bus.js";
import { IMessageQueue } from "../application/lib/message-queue.js"; import { IMessageQueue } from "../application/lib/message-queue.js";
import { IRunsRepo } from "../runs/repo.js"; import { IRunsRepo } from "../runs/repo.js";
import { IRunsLock } from "../runs/lock.js"; import { IRunsLock } from "../runs/lock.js";
import { PrefixLogger } from "../shared/prefix-logger.js";
export interface IAgentRuntime { export interface IAgentRuntime {
trigger(runId: string): Promise<void>; trigger(runId: string): Promise<void>;
@ -63,6 +64,11 @@ export class AgentRuntime implements IAgentRuntime {
return; return;
} }
try { try {
await this.bus.publish({
runId,
type: "run-processing-start",
subflow: [],
});
while (true) { while (true) {
let eventCount = 0; let eventCount = 0;
const run = await this.runsRepo.fetch(runId); const run = await this.runsRepo.fetch(runId);
@ -94,6 +100,11 @@ export class AgentRuntime implements IAgentRuntime {
} }
} finally { } finally {
await this.runsLock.release(runId); await this.runsLock.release(runId);
await this.bus.publish({
runId,
type: "run-processing-end",
subflow: [],
});
} }
} }
} }
@ -499,6 +510,8 @@ export async function* streamAgent({
messageQueue: IMessageQueue; messageQueue: IMessageQueue;
modelConfigRepo: IModelConfigRepo; modelConfigRepo: IModelConfigRepo;
}): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> { }): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const logger = new PrefixLogger(`run-${runId}-${state.agentName}`);
async function* processEvent(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> { async function* processEvent(event: z.infer<typeof RunEvent>): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
state.ingest(event); state.ingest(event);
yield event; yield event;
@ -518,60 +531,28 @@ export async function* streamAgent({
// set up provider + model // set up provider + model
const provider = await getProvider(agent.provider); const provider = await getProvider(agent.provider);
const model = provider.languageModel(agent.model || modelConfig.defaults.model); const model = provider.languageModel(agent.model || modelConfig.defaults.model);
let loopCounter = 0; let loopCounter = 0;
console.log('here');
async function pendingMsgs() {
const pendingMsgs = [];
}
while (true) { while (true) {
// console.error(`loop counter: ${loopCounter++}`) loopCounter++;
// if last response is from assistant and text, get any pending msgs let loopLogger = logger.child(`iter-${loopCounter}`);
const lastMessage = state.messages[state.messages.length - 1]; loopLogger.log('starting loop iteration');
if (lastMessage
&& lastMessage.role === "assistant"
&& (typeof lastMessage.content === "string"
|| !lastMessage.content.some(part => part.type === "tool-call")
)
) {
let pending = 0;
while(true) {
const msg = await messageQueue.dequeue(runId);
if (!msg) {
break;
}
pending++;
yield *processEvent({
runId,
type: "message",
messageId: msg.messageId,
message: {
role: "user",
content: msg.message,
},
subflow: [],
});
}
// if no msgs found, return
if (!pending) {
return;
}
}
// execute any pending tool calls // execute any pending tool calls
for (const toolCallId of Object.keys(state.pendingToolCalls)) { for (const toolCallId of Object.keys(state.pendingToolCalls)) {
const toolCall = state.toolCallIdMap[toolCallId]; const toolCall = state.toolCallIdMap[toolCallId];
let _logger = loopLogger.child(`tc-${toolCallId}-${toolCall.toolName}`);
_logger.log('processing');
// if ask-human, skip // if ask-human, skip
if (toolCall.toolName === "ask-human") { if (toolCall.toolName === "ask-human") {
_logger.log('skipping, reason: ask-human');
continue; continue;
} }
// if tool has been denied, deny // if tool has been denied, deny
if (state.deniedToolCallIds[toolCallId]) { if (state.deniedToolCallIds[toolCallId]) {
_logger.log('returning denied tool message, reason: tool has been denied');
yield* processEvent({ yield* processEvent({
runId, runId,
messageId: await idGenerator.next(), messageId: await idGenerator.next(),
@ -587,12 +568,14 @@ export async function* streamAgent({
continue; continue;
} }
// if permission is pending on this tool call, allow execution // if permission is pending on this tool call, skip execution
if (state.pendingToolPermissionRequests[toolCallId]) { if (state.pendingToolPermissionRequests[toolCallId]) {
_logger.log('skipping, reason: permission is pending');
continue; continue;
} }
// execute approved tool // execute approved tool
_logger.log('executing tool');
yield* processEvent({ yield* processEvent({
runId, runId,
type: "tool-invocation", type: "tool-invocation",
@ -647,25 +630,19 @@ export async function* streamAgent({
} }
} }
// if pending state, exit // if waiting on user permission or ask-human, exit
if (state.getPendingAskHumans().length || state.getPendingPermissions().length) { if (state.getPendingAskHumans().length || state.getPendingPermissions().length) {
// console.error("pending asks or permissions, exiting (b.)") loopLogger.log('exiting loop, reason: pending asks or permissions');
return; return;
} }
// if current message state isn't runnable, exit // get any queued user messages
/*
if (state.messages.length === 0 || state.messages[state.messages.length - 1].role === "assistant") {
// console.error("current message state isn't runnable, exiting (c.)")
return;
}
*/
while (true) { while (true) {
const msg = await messageQueue.dequeue(runId); const msg = await messageQueue.dequeue(runId);
if (!msg) { if (!msg) {
break; break;
} }
loopLogger.log('dequeued user message', msg.messageId);
yield* processEvent({ yield* processEvent({
runId, runId,
type: "message", type: "message",
@ -678,7 +655,20 @@ export async function* streamAgent({
}); });
} }
// if last response is from assistant and text, exit
const lastMessage = state.messages[state.messages.length - 1];
if (lastMessage
&& lastMessage.role === "assistant"
&& (typeof lastMessage.content === "string"
|| !lastMessage.content.some(part => part.type === "tool-call")
)
) {
loopLogger.log('exiting loop, reason: last message is from assistant and text');
return;
}
// run one LLM turn. // run one LLM turn.
loopLogger.log('running llm turn');
// stream agent response and build message // stream agent response and build message
const messageBuilder = new StreamStepMessageBuilder(); const messageBuilder = new StreamStepMessageBuilder();
for await (const event of streamLlm( for await (const event of streamLlm(
@ -687,6 +677,7 @@ export async function* streamAgent({
agent.instructions, agent.instructions,
tools, tools,
)) { )) {
loopLogger.log('got llm-stream-event:', event.type)
messageBuilder.ingest(event); messageBuilder.ingest(event);
yield* processEvent({ yield* processEvent({
runId, runId,
@ -712,6 +703,7 @@ export async function* streamAgent({
if (part.type === "tool-call") { if (part.type === "tool-call") {
const underlyingTool = agent.tools![part.toolName]; const underlyingTool = agent.tools![part.toolName];
if (underlyingTool.type === "builtin" && underlyingTool.name === "ask-human") { if (underlyingTool.type === "builtin" && underlyingTool.name === "ask-human") {
loopLogger.log('emitting ask-human-request, toolCallId:', part.toolCallId);
yield* processEvent({ yield* processEvent({
runId, runId,
type: "ask-human-request", type: "ask-human-request",
@ -723,6 +715,7 @@ export async function* streamAgent({
if (underlyingTool.type === "builtin" && underlyingTool.name === "executeCommand") { if (underlyingTool.type === "builtin" && underlyingTool.name === "executeCommand") {
// if command is blocked, then seek permission // if command is blocked, then seek permission
if (isBlocked(part.arguments.command)) { if (isBlocked(part.arguments.command)) {
loopLogger.log('emitting tool-permission-request, toolCallId:', part.toolCallId);
yield* processEvent({ yield* processEvent({
runId, runId,
type: "tool-permission-request", type: "tool-permission-request",
@ -732,6 +725,7 @@ export async function* streamAgent({
} }
} }
if (underlyingTool.type === "agent" && underlyingTool.name) { if (underlyingTool.type === "agent" && underlyingTool.name) {
loopLogger.log('emitting spawn-subflow, toolCallId:', part.toolCallId);
yield* processEvent({ yield* processEvent({
runId, runId,
type: "spawn-subflow", type: "spawn-subflow",

View file

@ -8,6 +8,14 @@ const BaseRunEvent = z.object({
subflow: z.array(z.string()), subflow: z.array(z.string()),
}); });
export const RunProcessingStartEvent = BaseRunEvent.extend({
type: z.literal("run-processing-start"),
});
export const RunProcessingEndEvent = BaseRunEvent.extend({
type: z.literal("run-processing-end"),
});
export const StartEvent = BaseRunEvent.extend({ export const StartEvent = BaseRunEvent.extend({
type: z.literal("start"), type: z.literal("start"),
agentName: z.string(), agentName: z.string(),
@ -73,6 +81,8 @@ export const RunErrorEvent = BaseRunEvent.extend({
}); });
export const RunEvent = z.union([ export const RunEvent = z.union([
RunProcessingStartEvent,
RunProcessingEndEvent,
StartEvent, StartEvent,
SpawnSubFlowEvent, SpawnSubFlowEvent,
LlmStreamEvent, LlmStreamEvent,

View file

@ -0,0 +1,26 @@
// create a PrefixLogger class that wraps console.log with a prefix
// and allows chaining with a parent logger
export class PrefixLogger {
private prefix: string;
private parent: PrefixLogger | null;
constructor(prefix: string, parent: PrefixLogger | null = null) {
this.prefix = prefix;
this.parent = parent;
}
log(...args: any[]) {
const timestamp = new Date().toISOString();
const prefix = '[' + this.prefix + ']';
if (this.parent) {
this.parent.log(prefix, ...args);
} else {
console.log(timestamp, prefix, ...args);
}
}
child(childPrefix: string): PrefixLogger {
return new PrefixLogger(childPrefix, this);
}
}