From f422d377727975a56a4ca438b5f6fe48cdb299c9 Mon Sep 17 00:00:00 2001 From: Arjun <6592213+arkml@users.noreply.github.com> Date: Tue, 21 Apr 2026 12:53:35 +0530 Subject: [PATCH] fix runtime issue --- apps/x/packages/core/src/agents/runtime.ts | 292 ++++++++++++++------- apps/x/packages/core/src/runs/runs.ts | 8 +- 2 files changed, 197 insertions(+), 103 deletions(-) diff --git a/apps/x/packages/core/src/agents/runtime.ts b/apps/x/packages/core/src/agents/runtime.ts index f978449b..42e142bb 100644 --- a/apps/x/packages/core/src/agents/runtime.ts +++ b/apps/x/packages/core/src/agents/runtime.ts @@ -99,6 +99,7 @@ export class AgentRuntime implements IAgentRuntime { private modelConfigRepo: IModelConfigRepo; private runsLock: IRunsLock; private abortRegistry: IAbortRegistry; + private rerunRequested: Set = new Set(); constructor({ runsRepo, @@ -129,9 +130,11 @@ export class AgentRuntime implements IAgentRuntime { async trigger(runId: string): Promise { if (!await this.runsLock.lock(runId)) { console.log(`unable to acquire lock on run ${runId}`); + this.rerunRequested.add(runId); return; } const signal = this.abortRegistry.createForRun(runId); + let shouldRerun = false; try { await this.bus.publish({ runId, @@ -194,6 +197,28 @@ export class AgentRuntime implements IAgentRuntime { await this.runsRepo.appendEvents(runId, [stoppedEvent]); await this.bus.publish(stoppedEvent); } + } catch (error) { + if (isAbortError(error) || signal.aborted) { + const stoppedEvent: z.infer = { + runId, + type: "run-stopped", + reason: "user-requested", + subflow: [], + }; + await this.runsRepo.appendEvents(runId, [stoppedEvent]); + await this.bus.publish(stoppedEvent); + } else { + const message = formatRunError(error); + console.error(`Run ${runId} failed:`, error); + const errorEvent: z.infer = { + runId, + type: "error", + error: message, + subflow: [], + }; + await this.runsRepo.appendEvents(runId, [errorEvent]); + await this.bus.publish(errorEvent); + } } finally { this.abortRegistry.cleanup(runId); await this.runsLock.release(runId); @@ -202,6 +227,10 @@ export class AgentRuntime implements IAgentRuntime { type: "run-processing-end", subflow: [], }); + shouldRerun = this.rerunRequested.delete(runId); + } + if (shouldRerun) { + void this.trigger(runId); } } } @@ -346,28 +375,70 @@ export class StreamStepMessageBuilder { function formatLlmStreamError(rawError: unknown): string { let name: string | undefined; + let message: string | undefined; let responseBody: string | undefined; if (rawError && typeof rawError === "object") { const err = rawError as Record; const nested = (err.error && typeof err.error === "object") ? err.error as Record : null; const nameValue = err.name ?? nested?.name; + const messageValue = err.message ?? nested?.message; const responseBodyValue = err.responseBody ?? nested?.responseBody; if (nameValue !== undefined) { name = String(nameValue); } + if (messageValue !== undefined) { + message = String(messageValue); + } if (responseBodyValue !== undefined) { responseBody = String(responseBodyValue); } } else if (typeof rawError === "string") { - responseBody = rawError; + message = rawError; } const lines: string[] = []; if (name) lines.push(`name: ${name}`); + if (message) lines.push(`message: ${message}`); if (responseBody) lines.push(`responseBody: ${responseBody}`); return lines.length ? lines.join("\n") : "Model stream error"; } +function formatRunError(error: unknown): string { + if (error instanceof Error) { + return error.stack || error.message || error.name; + } + if (typeof error === "string") { + return error; + } + try { + return JSON.stringify(error); + } catch { + return String(error); + } +} + +function isAbortError(error: unknown): boolean { + return error instanceof Error && error.name === "AbortError"; +} + +function toJsonCompatible(value: unknown): unknown { + if (value === undefined) { + return null; + } + try { + const serialized = JSON.stringify(value); + if (serialized === undefined) { + return null; + } + return JSON.parse(serialized); + } catch (error) { + return { + success: false, + error: `Tool returned a non-serializable result: ${formatRunError(error)}`, + }; + } +} + export async function loadAgent(id: string): Promise> { if (id === "copilot" || id === "rowboatx") { return buildCopilotAgent(); @@ -942,29 +1013,42 @@ export async function* streamAgent({ subflow: [], }); let result: unknown = null; - if (agent.tools![toolCall.toolName].type === "agent") { - const subflowState = state.subflowStates[toolCallId]; - for await (const event of streamAgent({ - state: subflowState, - idGenerator, - runId, - messageQueue, - modelConfigRepo, - signal, - abortRegistry, - })) { - yield* processEvent({ - ...event, - subflow: [toolCallId, ...event.subflow], - }); + try { + if (agent.tools![toolCall.toolName].type === "agent") { + const subflowState = state.subflowStates[toolCallId]; + for await (const event of streamAgent({ + state: subflowState, + idGenerator, + runId, + messageQueue, + modelConfigRepo, + signal, + abortRegistry, + })) { + yield* processEvent({ + ...event, + subflow: [toolCallId, ...event.subflow], + }); + } + if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) { + result = subflowState.finalResponse(); + } + } else { + result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry }); } - if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) { - result = subflowState.finalResponse(); + } catch (error) { + if (isAbortError(error) || signal.aborted) { + throw error; } - } else { - result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry }); + const message = formatRunError(error); + _logger.log('tool failed', message); + result = { + success: false, + error: message, + toolName: toolCall.toolName, + }; } - const resultPayload = result === undefined ? null : result; + const resultPayload = toJsonCompatible(result); const resultMsg: z.infer = { role: "tool", content: JSON.stringify(resultPayload), @@ -1189,85 +1273,95 @@ async function* streamLlm( signal?: AbortSignal, ): AsyncGenerator, void, unknown> { const converted = convertFromMessages(messages); - console.log(`! SENDING payload to model: `, JSON.stringify(converted)) - const { fullStream } = streamText({ - model, - messages: converted, - system: instructions, - tools, - stopWhen: stepCountIs(1), - abortSignal: signal, - }); - for await (const event of fullStream) { - // Check abort on every chunk for responsiveness - signal?.throwIfAborted(); - console.log("-> \t\tstream event", JSON.stringify(event)); - switch (event.type) { - case "error": - yield { - type: "error", - error: formatLlmStreamError((event as { error?: unknown }).error ?? event), - }; - return; - case "reasoning-start": - yield { - type: "reasoning-start", - providerOptions: event.providerMetadata, - }; - break; - case "reasoning-delta": - yield { - type: "reasoning-delta", - delta: event.text, - providerOptions: event.providerMetadata, - }; - break; - case "reasoning-end": - yield { - type: "reasoning-end", - providerOptions: event.providerMetadata, - }; - break; - case "text-start": - yield { - type: "text-start", - providerOptions: event.providerMetadata, - }; - break; - case "text-end": - yield { - type: "text-end", - providerOptions: event.providerMetadata, - }; - break; - case "text-delta": - yield { - type: "text-delta", - delta: event.text, - providerOptions: event.providerMetadata, - }; - break; - case "tool-call": - yield { - type: "tool-call", - toolCallId: event.toolCallId, - toolName: event.toolName, - input: event.input, - providerOptions: event.providerMetadata, - }; - break; - case "finish-step": - yield { - type: "finish-step", - usage: event.usage, - finishReason: event.finishReason, - providerOptions: event.providerMetadata, - }; - break; - default: - console.log('unknown stream event:', JSON.stringify(event)); - continue; + console.log(`! SENDING payload to model: `, JSON.stringify(converted)); + try { + const { fullStream } = streamText({ + model, + messages: converted, + system: instructions, + tools, + stopWhen: stepCountIs(1), + abortSignal: signal, + }); + for await (const event of fullStream) { + // Check abort on every chunk for responsiveness + signal?.throwIfAborted(); + console.log("-> \t\tstream event", JSON.stringify(event)); + switch (event.type) { + case "error": + yield { + type: "error", + error: formatLlmStreamError((event as { error?: unknown }).error ?? event), + }; + return; + case "reasoning-start": + yield { + type: "reasoning-start", + providerOptions: event.providerMetadata, + }; + break; + case "reasoning-delta": + yield { + type: "reasoning-delta", + delta: event.text, + providerOptions: event.providerMetadata, + }; + break; + case "reasoning-end": + yield { + type: "reasoning-end", + providerOptions: event.providerMetadata, + }; + break; + case "text-start": + yield { + type: "text-start", + providerOptions: event.providerMetadata, + }; + break; + case "text-end": + yield { + type: "text-end", + providerOptions: event.providerMetadata, + }; + break; + case "text-delta": + yield { + type: "text-delta", + delta: event.text, + providerOptions: event.providerMetadata, + }; + break; + case "tool-call": + yield { + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + input: event.input, + providerOptions: event.providerMetadata, + }; + break; + case "finish-step": + yield { + type: "finish-step", + usage: event.usage, + finishReason: event.finishReason, + providerOptions: event.providerMetadata, + }; + break; + default: + console.log('unknown stream event:', JSON.stringify(event)); + continue; + } } + } catch (error) { + if (isAbortError(error) || signal?.aborted) { + throw error; + } + yield { + type: "error", + error: formatLlmStreamError(error), + }; } } export const MappedToolCall = z.object({ diff --git a/apps/x/packages/core/src/runs/runs.ts b/apps/x/packages/core/src/runs/runs.ts index 8ea4688b..aa6f4883 100644 --- a/apps/x/packages/core/src/runs/runs.ts +++ b/apps/x/packages/core/src/runs/runs.ts @@ -23,7 +23,7 @@ export async function createMessage(runId: string, message: UserMessageContentTy const queue = container.resolve('messageQueue'); const id = await queue.enqueue(runId, message, voiceInput, voiceOutput, searchEnabled, middlePaneContext); const runtime = container.resolve('agentRuntime'); - runtime.trigger(runId); + void runtime.trigger(runId); return id; } @@ -57,7 +57,7 @@ export async function authorizePermission(runId: string, ev: z.infer('agentRuntime'); - runtime.trigger(runId); + void runtime.trigger(runId); } export async function replyToHumanInputRequest(runId: string, ev: z.infer): Promise { @@ -69,7 +69,7 @@ export async function replyToHumanInputRequest(runId: string, ev: z.infer('agentRuntime'); - runtime.trigger(runId); + void runtime.trigger(runId); } export async function stop(runId: string, force: boolean = false): Promise { @@ -110,4 +110,4 @@ export async function fetchRun(runId: string): Promise> { export async function listRuns(cursor?: string): Promise> { const repo = container.resolve('runsRepo'); return repo.list(cursor); -} \ No newline at end of file +}