show errors in activity tab for knowledge graph

This commit is contained in:
Arjun 2026-05-05 19:21:32 +05:30
parent c382e3ee8a
commit c6083de054
6 changed files with 171 additions and 34 deletions

View file

@ -1,6 +1,35 @@
import { bus } from "../runs/bus.js";
import { fetchRun } from "../runs/runs.js";
type RunRecord = Awaited<ReturnType<typeof fetchRun>>;
function extractRunErrors(run: RunRecord): string[] {
return run.log.flatMap((event) => event.type === "error" ? [event.error] : []);
}
export class RunFailedError extends Error {
readonly runId: string;
readonly errors: string[];
constructor(runId: string, errors: string[]) {
const firstError = errors.find(Boolean) ?? null;
super(firstError ? `Run ${runId} failed: ${firstError}` : `Run ${runId} failed`);
this.name = "RunFailedError";
this.runId = runId;
this.errors = errors;
}
}
export function getErrorDetails(error: unknown): string {
if (error instanceof RunFailedError) {
return error.errors.join("\n\n");
}
if (error instanceof Error) {
return error.message;
}
return String(error);
}
/**
* Extract the assistant's final text response from a run's log.
* @param runId
@ -28,13 +57,28 @@ export async function extractAgentResponse(runId: string): Promise<string | null
/**
* Wait for a run to complete by listening for run-processing-end event
*/
export async function waitForRunCompletion(runId: string): Promise<void> {
return new Promise(async (resolve) => {
const unsubscribe = await bus.subscribe('*', async (event) => {
if (event.type === 'run-processing-end' && event.runId === runId) {
unsubscribe();
resolve();
}
});
export async function waitForRunCompletion(
runId: string,
opts: { throwOnError?: boolean } = {},
): Promise<RunRecord> {
return new Promise((resolve, reject) => {
void (async () => {
const unsubscribe = await bus.subscribe('*', async (event) => {
if (event.type === 'run-processing-end' && event.runId === runId) {
unsubscribe();
try {
const run = await fetchRun(runId);
const errors = extractRunErrors(run);
if (opts.throwOnError && errors.length > 0) {
reject(new RunFailedError(runId, errors));
return;
}
resolve(run);
} catch (error) {
reject(error);
}
}
});
})().catch(reject);
});
}
}

View file

@ -4,7 +4,7 @@ import { google } from 'googleapis';
import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { getKgModel } from '../models/defaults.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger } from '../services/service_logger.js';
import { loadUserConfig, updateUserEmail } from '../config/user_config.js';
import { GoogleClientFactory } from './google-client-factory.js';
@ -288,7 +288,7 @@ async function processAgentNotes(): Promise<void> {
subUseCase: 'agent_notes',
});
await createMessage(agentRun.id, message);
await waitForRunCompletion(agentRun.id);
await waitForRunCompletion(agentRun.id, { throwOnError: true });
// Mark everything as processed
for (const p of emailPaths) {
@ -326,7 +326,16 @@ async function processAgentNotes(): Promise<void> {
runId: serviceRun.runId,
level: 'error',
message: 'Error processing agent notes',
error: error instanceof Error ? error.message : String(error),
error: getErrorDetails(error),
});
await serviceLogger.log({
type: 'run_complete',
service: serviceRun.service,
runId: serviceRun.runId,
level: 'error',
message: 'Agent notes processing failed',
durationMs: Date.now() - serviceRun.startedAt,
outcome: 'error',
});
}
}

View file

@ -3,7 +3,7 @@ import path from 'path';
import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { bus } from '../runs/bus.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger, type ServiceRunContext } from '../services/service_logger.js';
import {
loadState,
@ -312,8 +312,11 @@ async function createNotesFromBatch(
await createMessage(run.id, message);
// Wait for the run to complete
await waitForRunCompletion(run.id);
unsubscribe();
try {
await waitForRunCompletion(run.id, { throwOnError: true });
} finally {
unsubscribe();
}
return { runId: run.id, notesCreated, notesModified };
}
@ -428,7 +431,7 @@ async function buildGraphWithFiles(
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
error: getErrorDetails(error),
context: { batchNumber },
});
}
@ -600,7 +603,7 @@ async function processVoiceMemosForKnowledge(): Promise<boolean> {
runId: run.runId,
level: 'error',
message: `Error processing voice memo batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
error: getErrorDetails(error),
context: { batchNumber },
});
}

View file

@ -4,7 +4,7 @@ import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { getKgModel } from '../models/defaults.js';
import { bus } from '../runs/bus.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import {
@ -112,8 +112,11 @@ async function labelEmailBatch(
});
await createMessage(run.id, message);
await waitForRunCompletion(run.id);
unsubscribe();
try {
await waitForRunCompletion(run.id, { throwOnError: true });
} finally {
unsubscribe();
}
return { runId: run.id, filesEdited };
}
@ -175,6 +178,7 @@ export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCU
const totalBatches = batches.length;
let totalEdited = 0;
let hadError = false;
let failedBatches = 0;
// Process batches with concurrency limit
for (let i = 0; i < batches.length; i += concurrency) {
@ -209,14 +213,16 @@ export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCU
return result.filesEdited.size;
} catch (error) {
hadError = true;
failedBatches++;
const errorDetails = getErrorDetails(error);
console.error(`[EmailLabeling] Error processing batch ${batchNumber}:`, error);
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
message: `Email labeling batch ${batchNumber}/${totalBatches} failed`,
error: errorDetails,
context: { batchNumber },
});
return 0;
@ -238,12 +244,15 @@ export async function processUnlabeledEmails(concurrency: number = DEFAULT_CONCU
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Email labeling complete: ${totalEdited} files labeled`,
message: hadError
? `Email labeling finished with errors: ${totalEdited} files labeled`
: `Email labeling complete: ${totalEdited} files labeled`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: {
totalEmails: unlabeled.length,
filesLabeled: totalEdited,
failedBatches,
},
});

View file

@ -4,7 +4,7 @@ import { WorkDir } from '../config/config.js';
import { createRun, createMessage } from '../runs/runs.js';
import { getKgModel } from '../models/defaults.js';
import { bus } from '../runs/bus.js';
import { waitForRunCompletion } from '../agents/utils.js';
import { getErrorDetails, waitForRunCompletion } from '../agents/utils.js';
import { serviceLogger } from '../services/service_logger.js';
import { limitEventItems } from './limit_event_items.js';
import {
@ -125,8 +125,11 @@ async function tagNoteBatch(
});
await createMessage(run.id, message);
await waitForRunCompletion(run.id);
unsubscribe();
try {
await waitForRunCompletion(run.id, { throwOnError: true });
} finally {
unsubscribe();
}
return { runId: run.id, filesEdited };
}
@ -169,6 +172,7 @@ export async function processUntaggedNotes(): Promise<void> {
const totalBatches = Math.ceil(untagged.length / BATCH_SIZE);
let totalEdited = 0;
let hadError = false;
let failedBatches = 0;
for (let i = 0; i < untagged.length; i += BATCH_SIZE) {
const batchPaths = untagged.slice(i, i + BATCH_SIZE);
@ -217,14 +221,16 @@ export async function processUntaggedNotes(): Promise<void> {
console.log(`[NoteTagging] Batch ${batchNumber}/${totalBatches} complete, ${result.filesEdited.size} files tagged`);
} catch (error) {
hadError = true;
failedBatches++;
const errorDetails = getErrorDetails(error);
console.error(`[NoteTagging] Error processing batch ${batchNumber}:`, error);
await serviceLogger.log({
type: 'error',
service: run.service,
runId: run.runId,
level: 'error',
message: `Error processing batch ${batchNumber}`,
error: error instanceof Error ? error.message : String(error),
message: `Note tagging batch ${batchNumber}/${totalBatches} failed`,
error: errorDetails,
context: { batchNumber },
});
}
@ -238,12 +244,15 @@ export async function processUntaggedNotes(): Promise<void> {
service: run.service,
runId: run.runId,
level: hadError ? 'error' : 'info',
message: `Note tagging complete: ${totalEdited} notes tagged`,
message: hadError
? `Note tagging finished with errors: ${totalEdited} notes tagged`
: `Note tagging complete: ${totalEdited} notes tagged`,
durationMs: Date.now() - run.startedAt,
outcome: hadError ? 'error' : 'ok',
summary: {
totalNotes: untagged.length,
notesTagged: totalEdited,
failedBatches,
},
});