feat: persist ingest trace events

This commit is contained in:
Andrey Avtomonov 2026-05-17 21:21:23 +02:00
parent 89760b52c9
commit 01b7f54253
7 changed files with 279 additions and 0 deletions

View file

@ -195,6 +195,9 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
io.stdout.write(`Report: ${report.id}\n`);
io.stdout.write(`Run: ${report.runId}\n`);
io.stdout.write(`Job: ${report.jobId}\n`);
if (report.body.tracePath) {
io.stdout.write(`Trace: ${report.body.tracePath}\n`);
}
io.stdout.write(`Status: ${reportStatus(report)}\n`);
io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`);
io.stdout.write(`Connection: ${report.connectionId}\n`);

View file

@ -0,0 +1,85 @@
import { mkdtemp, readFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { FileIngestTraceWriter, ingestTracePathForJob, traceTimed } from './ingest-trace.js';
describe('FileIngestTraceWriter', () => {
it('persists structured trace events as JSONL', async () => {
const root = await mkdtemp(join(tmpdir(), 'ktx-trace-'));
const tracePath = ingestTracePathForJob(root, 'job-1');
const trace = new FileIngestTraceWriter({
tracePath,
jobId: 'job-1',
connectionId: 'metabase-main',
sourceKey: 'metabase',
level: 'debug',
});
await trace.event('debug', 'snapshot', 'input_snapshot', {
baseSha: 'abc123',
rawFileCount: 2,
diffSummary: { added: 1, modified: 1, deleted: 0, unchanged: 3 },
});
const lines = (await readFile(tracePath, 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(lines).toHaveLength(1);
expect(lines[0]).toMatchObject({
schemaVersion: 1,
jobId: 'job-1',
connectionId: 'metabase-main',
sourceKey: 'metabase',
level: 'debug',
phase: 'snapshot',
event: 'input_snapshot',
data: {
baseSha: 'abc123',
rawFileCount: 2,
diffSummary: { added: 1, modified: 1, deleted: 0, unchanged: 3 },
},
});
expect(typeof lines[0].at).toBe('string');
});
it('records timing and error context for postmortem inspection', async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date('2026-05-17T12:00:00.000Z'));
const root = await mkdtemp(join(tmpdir(), 'ktx-trace-'));
const tracePath = ingestTracePathForJob(root, 'job-2');
const trace = new FileIngestTraceWriter({
tracePath,
jobId: 'job-2',
connectionId: 'c1',
sourceKey: 'fake',
level: 'trace',
});
await expect(
traceTimed(trace, 'integration', 'apply_patch', { unitKey: 'wu-1' }, async () => {
vi.advanceTimersByTime(17);
throw new Error('patch conflict');
}),
).rejects.toThrow('patch conflict');
const lines = (await readFile(tracePath, 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(lines.map((line) => line.event)).toEqual(['apply_patch_started', 'apply_patch_failed']);
expect(lines[1]).toMatchObject({
level: 'error',
phase: 'integration',
data: { unitKey: 'wu-1' },
error: { name: 'Error', message: 'patch conflict' },
});
expect(lines[1].durationMs).toBe(17);
vi.useRealTimers();
});
it('uses the documented trace path layout', () => {
expect(ingestTracePathForJob('/project/.ktx', 'job-3')).toBe('/project/.ktx/ingest-traces/job-3/trace.jsonl');
});
});

View file

@ -0,0 +1,158 @@
import { appendFile, mkdir } from 'node:fs/promises';
import { dirname, join } from 'node:path';
export type IngestTraceLevel = 'info' | 'debug' | 'trace' | 'error';
const TRACE_LEVEL_RANK: Record<IngestTraceLevel, number> = {
error: 0,
info: 1,
debug: 2,
trace: 3,
};
export interface IngestTraceContext {
tracePath: string;
jobId: string;
connectionId: string;
sourceKey: string;
runId?: string;
syncId?: string;
level?: IngestTraceLevel;
}
export interface IngestTraceEvent {
schemaVersion: 1;
at: string;
level: IngestTraceLevel;
jobId: string;
connectionId: string;
sourceKey: string;
runId?: string;
syncId?: string;
phase: string;
event: string;
durationMs?: number;
data?: Record<string, unknown>;
error?: {
name: string;
message: string;
stack?: string;
};
}
export interface IngestTraceWriter {
readonly tracePath: string;
readonly context: IngestTraceContext;
withContext(context: Partial<Pick<IngestTraceContext, 'runId' | 'syncId'>>): IngestTraceWriter;
event(
level: IngestTraceLevel,
phase: string,
event: string,
data?: Record<string, unknown>,
error?: unknown,
durationMs?: number,
): Promise<void>;
}
export function ingestTracePathForJob(homeDir: string, jobId: string): string {
return join(homeDir, 'ingest-traces', jobId, 'trace.jsonl');
}
function serializeError(error: unknown): IngestTraceEvent['error'] | undefined {
if (error === undefined || error === null) {
return undefined;
}
if (error instanceof Error) {
return {
name: error.name,
message: error.message,
...(error.stack ? { stack: error.stack } : {}),
};
}
return { name: 'Error', message: String(error) };
}
function shouldWrite(configured: IngestTraceLevel, incoming: IngestTraceLevel): boolean {
return TRACE_LEVEL_RANK[incoming] <= TRACE_LEVEL_RANK[configured];
}
export class FileIngestTraceWriter implements IngestTraceWriter {
readonly tracePath: string;
readonly context: IngestTraceContext;
constructor(context: IngestTraceContext) {
this.context = { ...context, level: context.level ?? 'debug' };
this.tracePath = context.tracePath;
}
withContext(context: Partial<Pick<IngestTraceContext, 'runId' | 'syncId'>>): IngestTraceWriter {
return new FileIngestTraceWriter({ ...this.context, ...context, tracePath: this.tracePath });
}
async event(
level: IngestTraceLevel,
phase: string,
event: string,
data?: Record<string, unknown>,
error?: unknown,
durationMs?: number,
): Promise<void> {
if (!shouldWrite(this.context.level ?? 'debug', level)) {
return;
}
const serializedError = serializeError(error);
const payload: IngestTraceEvent = {
schemaVersion: 1,
at: new Date().toISOString(),
level,
jobId: this.context.jobId,
connectionId: this.context.connectionId,
sourceKey: this.context.sourceKey,
...(this.context.runId ? { runId: this.context.runId } : {}),
...(this.context.syncId ? { syncId: this.context.syncId } : {}),
phase,
event,
...(durationMs !== undefined ? { durationMs } : {}),
...(data ? { data } : {}),
...(serializedError ? { error: serializedError } : {}),
};
await mkdir(dirname(this.tracePath), { recursive: true });
await appendFile(this.tracePath, `${JSON.stringify(payload)}\n`, 'utf-8');
}
}
export class NoopIngestTraceWriter implements IngestTraceWriter {
readonly tracePath = '';
readonly context: IngestTraceContext = {
tracePath: '',
jobId: '',
connectionId: '',
sourceKey: '',
level: 'error',
};
withContext(): IngestTraceWriter {
return this;
}
async event(): Promise<void> {}
}
export async function traceTimed<T>(
trace: IngestTraceWriter,
phase: string,
event: string,
data: Record<string, unknown>,
fn: () => Promise<T>,
): Promise<T> {
await trace.event('debug', phase, `${event}_started`, data);
const started = Date.now();
try {
const result = await fn();
await trace.event('debug', phase, `${event}_finished`, data, undefined, Date.now() - started);
return result;
} catch (error) {
await trace.event('error', phase, `${event}_failed`, data, error, Date.now() - started);
throw error;
}
}

View file

@ -76,6 +76,7 @@ import { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evide
import { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js';
import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './context-evidence/index.js';
import { DiffSetService } from './diff-set.service.js';
import { ingestTracePathForJob } from './ingest-trace.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
import { PageTriageService } from './page-triage/index.js';
import { createWarehouseVerificationTools } from './tools/warehouse-verification/index.js';
@ -151,6 +152,10 @@ class LocalIngestStorage implements IngestStoragePort {
resolveTranscriptDir(jobId: string): string {
return join(this.project.projectDir, '.ktx/ingest-transcripts', jobId);
}
resolveTracePath(jobId: string): string {
return ingestTracePathForJob(this.homeDir, jobId);
}
}
class LocalIngestLock implements IngestLockPort {
@ -671,6 +676,8 @@ export function createLocalBundleIngestRuntime(
workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget,
workUnitFailureMode: options.project.config.ingest.workUnits.failureMode,
isolatedDiffSourceKeys: ['metabase'],
ingestTraceLevel: 'debug',
},
skillsRegistry: new SkillsRegistryService({ skillsDir, logger }),
promptService,

View file

@ -16,6 +16,7 @@ import type {
import type { ToolContext, ToolSession, TouchedSlSource } from '../tools/index.js';
import type { KnowledgeIndexPort, KnowledgeWikiService } from '../wiki/index.js';
import type { CanonicalPin } from './canonical-pins.js';
import type { IngestTraceLevel } from './ingest-trace.js';
import type { IngestReportSnapshot } from './reports.js';
import type {
ReconcileCandidateForPrompt,
@ -142,6 +143,8 @@ export interface IngestSettingsPort {
workUnitMaxConcurrency?: number;
workUnitStepBudget?: number;
workUnitFailureMode?: 'abort' | 'continue';
isolatedDiffSourceKeys?: string[];
ingestTraceLevel?: IngestTraceLevel;
}
export interface IngestGitAuthor {
@ -155,6 +158,7 @@ export interface IngestStoragePort {
resolveUploadDir(uploadId: string): string;
resolvePullDir(jobId: string): string;
resolveTranscriptDir(jobId: string): string;
resolveTracePath(jobId: string): string;
}
export interface IngestCommitMessagePort {

View file

@ -137,6 +137,18 @@ export const ingestReportSnapshotSchema = z
diffSummary: ingestDiffSummarySchema,
fetch: sourceFetchReportSchema.optional(),
commitSha: z.string().nullable(),
tracePath: z.string().optional(),
isolatedDiff: z
.object({
enabled: z.boolean(),
integrationWorktreePath: z.string().optional(),
ingestionBaseSha: z.string().optional(),
projectionSha: z.string().nullable().optional(),
acceptedPatches: z.number().int().min(0),
textualConflicts: z.number().int().min(0),
semanticConflicts: z.number().int().min(0),
})
.optional(),
workUnits: z.array(
z.object({
unitKey: z.string().min(1),

View file

@ -53,6 +53,16 @@ export interface IngestReportBody {
diffSummary: IngestDiffSummary;
fetch?: SourceFetchReport;
commitSha: string | null;
tracePath?: string;
isolatedDiff?: {
enabled: boolean;
integrationWorktreePath?: string;
ingestionBaseSha?: string;
projectionSha?: string | null;
acceptedPatches: number;
textualConflicts: number;
semanticConflicts: number;
};
workUnits: IngestReportWorkUnit[];
failedWorkUnits: string[];
reconciliationSkipped: boolean;