This commit is contained in:
arkml 2026-04-21 13:04:46 +05:30 committed by GitHub
commit b5dc5c9420
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 197 additions and 103 deletions

View file

@ -99,6 +99,7 @@ export class AgentRuntime implements IAgentRuntime {
private modelConfigRepo: IModelConfigRepo; private modelConfigRepo: IModelConfigRepo;
private runsLock: IRunsLock; private runsLock: IRunsLock;
private abortRegistry: IAbortRegistry; private abortRegistry: IAbortRegistry;
private rerunRequested: Set<string> = new Set();
constructor({ constructor({
runsRepo, runsRepo,
@ -129,9 +130,11 @@ export class AgentRuntime implements IAgentRuntime {
async trigger(runId: string): Promise<void> { async trigger(runId: string): Promise<void> {
if (!await this.runsLock.lock(runId)) { if (!await this.runsLock.lock(runId)) {
console.log(`unable to acquire lock on run ${runId}`); console.log(`unable to acquire lock on run ${runId}`);
this.rerunRequested.add(runId);
return; return;
} }
const signal = this.abortRegistry.createForRun(runId); const signal = this.abortRegistry.createForRun(runId);
let shouldRerun = false;
try { try {
await this.bus.publish({ await this.bus.publish({
runId, runId,
@ -194,6 +197,28 @@ export class AgentRuntime implements IAgentRuntime {
await this.runsRepo.appendEvents(runId, [stoppedEvent]); await this.runsRepo.appendEvents(runId, [stoppedEvent]);
await this.bus.publish(stoppedEvent); await this.bus.publish(stoppedEvent);
} }
} catch (error) {
if (isAbortError(error) || signal.aborted) {
const stoppedEvent: z.infer<typeof RunEvent> = {
runId,
type: "run-stopped",
reason: "user-requested",
subflow: [],
};
await this.runsRepo.appendEvents(runId, [stoppedEvent]);
await this.bus.publish(stoppedEvent);
} else {
const message = formatRunError(error);
console.error(`Run ${runId} failed:`, error);
const errorEvent: z.infer<typeof RunEvent> = {
runId,
type: "error",
error: message,
subflow: [],
};
await this.runsRepo.appendEvents(runId, [errorEvent]);
await this.bus.publish(errorEvent);
}
} finally { } finally {
this.abortRegistry.cleanup(runId); this.abortRegistry.cleanup(runId);
await this.runsLock.release(runId); await this.runsLock.release(runId);
@ -202,6 +227,10 @@ export class AgentRuntime implements IAgentRuntime {
type: "run-processing-end", type: "run-processing-end",
subflow: [], subflow: [],
}); });
shouldRerun = this.rerunRequested.delete(runId);
}
if (shouldRerun) {
void this.trigger(runId);
} }
} }
} }
@ -346,28 +375,70 @@ export class StreamStepMessageBuilder {
function formatLlmStreamError(rawError: unknown): string { function formatLlmStreamError(rawError: unknown): string {
let name: string | undefined; let name: string | undefined;
let message: string | undefined;
let responseBody: string | undefined; let responseBody: string | undefined;
if (rawError && typeof rawError === "object") { if (rawError && typeof rawError === "object") {
const err = rawError as Record<string, unknown>; const err = rawError as Record<string, unknown>;
const nested = (err.error && typeof err.error === "object") ? err.error as Record<string, unknown> : null; const nested = (err.error && typeof err.error === "object") ? err.error as Record<string, unknown> : null;
const nameValue = err.name ?? nested?.name; const nameValue = err.name ?? nested?.name;
const messageValue = err.message ?? nested?.message;
const responseBodyValue = err.responseBody ?? nested?.responseBody; const responseBodyValue = err.responseBody ?? nested?.responseBody;
if (nameValue !== undefined) { if (nameValue !== undefined) {
name = String(nameValue); name = String(nameValue);
} }
if (messageValue !== undefined) {
message = String(messageValue);
}
if (responseBodyValue !== undefined) { if (responseBodyValue !== undefined) {
responseBody = String(responseBodyValue); responseBody = String(responseBodyValue);
} }
} else if (typeof rawError === "string") { } else if (typeof rawError === "string") {
responseBody = rawError; message = rawError;
} }
const lines: string[] = []; const lines: string[] = [];
if (name) lines.push(`name: ${name}`); if (name) lines.push(`name: ${name}`);
if (message) lines.push(`message: ${message}`);
if (responseBody) lines.push(`responseBody: ${responseBody}`); if (responseBody) lines.push(`responseBody: ${responseBody}`);
return lines.length ? lines.join("\n") : "Model stream error"; return lines.length ? lines.join("\n") : "Model stream error";
} }
function formatRunError(error: unknown): string {
if (error instanceof Error) {
return error.stack || error.message || error.name;
}
if (typeof error === "string") {
return error;
}
try {
return JSON.stringify(error);
} catch {
return String(error);
}
}
function isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
function toJsonCompatible(value: unknown): unknown {
if (value === undefined) {
return null;
}
try {
const serialized = JSON.stringify(value);
if (serialized === undefined) {
return null;
}
return JSON.parse(serialized);
} catch (error) {
return {
success: false,
error: `Tool returned a non-serializable result: ${formatRunError(error)}`,
};
}
}
export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> { export async function loadAgent(id: string): Promise<z.infer<typeof Agent>> {
if (id === "copilot" || id === "rowboatx") { if (id === "copilot" || id === "rowboatx") {
return buildCopilotAgent(); return buildCopilotAgent();
@ -942,29 +1013,42 @@ export async function* streamAgent({
subflow: [], subflow: [],
}); });
let result: unknown = null; let result: unknown = null;
if (agent.tools![toolCall.toolName].type === "agent") { try {
const subflowState = state.subflowStates[toolCallId]; if (agent.tools![toolCall.toolName].type === "agent") {
for await (const event of streamAgent({ const subflowState = state.subflowStates[toolCallId];
state: subflowState, for await (const event of streamAgent({
idGenerator, state: subflowState,
runId, idGenerator,
messageQueue, runId,
modelConfigRepo, messageQueue,
signal, modelConfigRepo,
abortRegistry, signal,
})) { abortRegistry,
yield* processEvent({ })) {
...event, yield* processEvent({
subflow: [toolCallId, ...event.subflow], ...event,
}); subflow: [toolCallId, ...event.subflow],
});
}
if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) {
result = subflowState.finalResponse();
}
} else {
result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry });
} }
if (!subflowState.getPendingAskHumans().length && !subflowState.getPendingPermissions().length) { } catch (error) {
result = subflowState.finalResponse(); if (isAbortError(error) || signal.aborted) {
throw error;
} }
} else { const message = formatRunError(error);
result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry }); _logger.log('tool failed', message);
result = {
success: false,
error: message,
toolName: toolCall.toolName,
};
} }
const resultPayload = result === undefined ? null : result; const resultPayload = toJsonCompatible(result);
const resultMsg: z.infer<typeof ToolMessage> = { const resultMsg: z.infer<typeof ToolMessage> = {
role: "tool", role: "tool",
content: JSON.stringify(resultPayload), content: JSON.stringify(resultPayload),
@ -1189,85 +1273,95 @@ async function* streamLlm(
signal?: AbortSignal, signal?: AbortSignal,
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> { ): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
const converted = convertFromMessages(messages); const converted = convertFromMessages(messages);
console.log(`! SENDING payload to model: `, JSON.stringify(converted)) console.log(`! SENDING payload to model: `, JSON.stringify(converted));
const { fullStream } = streamText({ try {
model, const { fullStream } = streamText({
messages: converted, model,
system: instructions, messages: converted,
tools, system: instructions,
stopWhen: stepCountIs(1), tools,
abortSignal: signal, stopWhen: stepCountIs(1),
}); abortSignal: signal,
for await (const event of fullStream) { });
// Check abort on every chunk for responsiveness for await (const event of fullStream) {
signal?.throwIfAborted(); // Check abort on every chunk for responsiveness
console.log("-> \t\tstream event", JSON.stringify(event)); signal?.throwIfAborted();
switch (event.type) { console.log("-> \t\tstream event", JSON.stringify(event));
case "error": switch (event.type) {
yield { case "error":
type: "error", yield {
error: formatLlmStreamError((event as { error?: unknown }).error ?? event), type: "error",
}; error: formatLlmStreamError((event as { error?: unknown }).error ?? event),
return; };
case "reasoning-start": return;
yield { case "reasoning-start":
type: "reasoning-start", yield {
providerOptions: event.providerMetadata, type: "reasoning-start",
}; providerOptions: event.providerMetadata,
break; };
case "reasoning-delta": break;
yield { case "reasoning-delta":
type: "reasoning-delta", yield {
delta: event.text, type: "reasoning-delta",
providerOptions: event.providerMetadata, delta: event.text,
}; providerOptions: event.providerMetadata,
break; };
case "reasoning-end": break;
yield { case "reasoning-end":
type: "reasoning-end", yield {
providerOptions: event.providerMetadata, type: "reasoning-end",
}; providerOptions: event.providerMetadata,
break; };
case "text-start": break;
yield { case "text-start":
type: "text-start", yield {
providerOptions: event.providerMetadata, type: "text-start",
}; providerOptions: event.providerMetadata,
break; };
case "text-end": break;
yield { case "text-end":
type: "text-end", yield {
providerOptions: event.providerMetadata, type: "text-end",
}; providerOptions: event.providerMetadata,
break; };
case "text-delta": break;
yield { case "text-delta":
type: "text-delta", yield {
delta: event.text, type: "text-delta",
providerOptions: event.providerMetadata, delta: event.text,
}; providerOptions: event.providerMetadata,
break; };
case "tool-call": break;
yield { case "tool-call":
type: "tool-call", yield {
toolCallId: event.toolCallId, type: "tool-call",
toolName: event.toolName, toolCallId: event.toolCallId,
input: event.input, toolName: event.toolName,
providerOptions: event.providerMetadata, input: event.input,
}; providerOptions: event.providerMetadata,
break; };
case "finish-step": break;
yield { case "finish-step":
type: "finish-step", yield {
usage: event.usage, type: "finish-step",
finishReason: event.finishReason, usage: event.usage,
providerOptions: event.providerMetadata, finishReason: event.finishReason,
}; providerOptions: event.providerMetadata,
break; };
default: break;
console.log('unknown stream event:', JSON.stringify(event)); default:
continue; console.log('unknown stream event:', JSON.stringify(event));
continue;
}
} }
} catch (error) {
if (isAbortError(error) || signal?.aborted) {
throw error;
}
yield {
type: "error",
error: formatLlmStreamError(error),
};
} }
} }
export const MappedToolCall = z.object({ export const MappedToolCall = z.object({

View file

@ -23,7 +23,7 @@ export async function createMessage(runId: string, message: UserMessageContentTy
const queue = container.resolve<IMessageQueue>('messageQueue'); const queue = container.resolve<IMessageQueue>('messageQueue');
const id = await queue.enqueue(runId, message, voiceInput, voiceOutput, searchEnabled, middlePaneContext); const id = await queue.enqueue(runId, message, voiceInput, voiceOutput, searchEnabled, middlePaneContext);
const runtime = container.resolve<IAgentRuntime>('agentRuntime'); const runtime = container.resolve<IAgentRuntime>('agentRuntime');
runtime.trigger(runId); void runtime.trigger(runId);
return id; return id;
} }
@ -57,7 +57,7 @@ export async function authorizePermission(runId: string, ev: z.infer<typeof Tool
}; };
await repo.appendEvents(runId, [event]); await repo.appendEvents(runId, [event]);
const runtime = container.resolve<IAgentRuntime>('agentRuntime'); const runtime = container.resolve<IAgentRuntime>('agentRuntime');
runtime.trigger(runId); void runtime.trigger(runId);
} }
export async function replyToHumanInputRequest(runId: string, ev: z.infer<typeof AskHumanResponsePayload>): Promise<void> { export async function replyToHumanInputRequest(runId: string, ev: z.infer<typeof AskHumanResponsePayload>): Promise<void> {
@ -69,7 +69,7 @@ export async function replyToHumanInputRequest(runId: string, ev: z.infer<typeof
}; };
await repo.appendEvents(runId, [event]); await repo.appendEvents(runId, [event]);
const runtime = container.resolve<IAgentRuntime>('agentRuntime'); const runtime = container.resolve<IAgentRuntime>('agentRuntime');
runtime.trigger(runId); void runtime.trigger(runId);
} }
export async function stop(runId: string, force: boolean = false): Promise<void> { export async function stop(runId: string, force: boolean = false): Promise<void> {
@ -110,4 +110,4 @@ export async function fetchRun(runId: string): Promise<z.infer<typeof Run>> {
export async function listRuns(cursor?: string): Promise<z.infer<typeof ListRunsResponse>> { export async function listRuns(cursor?: string): Promise<z.infer<typeof ListRunsResponse>> {
const repo = container.resolve<IRunsRepo>('runsRepo'); const repo = container.resolve<IRunsRepo>('runsRepo');
return repo.list(cursor); return repo.list(cursor);
} }