diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index deaa9d77..53d28f42 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -195,6 +195,9 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void io.stdout.write(`Report: ${report.id}\n`); io.stdout.write(`Run: ${report.runId}\n`); io.stdout.write(`Job: ${report.jobId}\n`); + if (report.body.tracePath) { + io.stdout.write(`Trace: ${report.body.tracePath}\n`); + } io.stdout.write(`Status: ${reportStatus(report)}\n`); io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`); io.stdout.write(`Connection: ${report.connectionId}\n`); diff --git a/packages/context/src/ingest/ingest-trace.test.ts b/packages/context/src/ingest/ingest-trace.test.ts new file mode 100644 index 00000000..88b56a37 --- /dev/null +++ b/packages/context/src/ingest/ingest-trace.test.ts @@ -0,0 +1,85 @@ +import { mkdtemp, readFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it, vi } from 'vitest'; +import { FileIngestTraceWriter, ingestTracePathForJob, traceTimed } from './ingest-trace.js'; + +describe('FileIngestTraceWriter', () => { + it('persists structured trace events as JSONL', async () => { + const root = await mkdtemp(join(tmpdir(), 'ktx-trace-')); + const tracePath = ingestTracePathForJob(root, 'job-1'); + const trace = new FileIngestTraceWriter({ + tracePath, + jobId: 'job-1', + connectionId: 'metabase-main', + sourceKey: 'metabase', + level: 'debug', + }); + + await trace.event('debug', 'snapshot', 'input_snapshot', { + baseSha: 'abc123', + rawFileCount: 2, + diffSummary: { added: 1, modified: 1, deleted: 0, unchanged: 3 }, + }); + + const lines = (await readFile(tracePath, 'utf-8')) + .trim() + .split('\n') + .map((line) => JSON.parse(line)); + expect(lines).toHaveLength(1); + expect(lines[0]).toMatchObject({ + schemaVersion: 1, + jobId: 'job-1', + connectionId: 'metabase-main', + sourceKey: 'metabase', + level: 'debug', + phase: 'snapshot', + event: 'input_snapshot', + data: { + baseSha: 'abc123', + rawFileCount: 2, + diffSummary: { added: 1, modified: 1, deleted: 0, unchanged: 3 }, + }, + }); + expect(typeof lines[0].at).toBe('string'); + }); + + it('records timing and error context for postmortem inspection', async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date('2026-05-17T12:00:00.000Z')); + const root = await mkdtemp(join(tmpdir(), 'ktx-trace-')); + const tracePath = ingestTracePathForJob(root, 'job-2'); + const trace = new FileIngestTraceWriter({ + tracePath, + jobId: 'job-2', + connectionId: 'c1', + sourceKey: 'fake', + level: 'trace', + }); + + await expect( + traceTimed(trace, 'integration', 'apply_patch', { unitKey: 'wu-1' }, async () => { + vi.advanceTimersByTime(17); + throw new Error('patch conflict'); + }), + ).rejects.toThrow('patch conflict'); + + const lines = (await readFile(tracePath, 'utf-8')) + .trim() + .split('\n') + .map((line) => JSON.parse(line)); + expect(lines.map((line) => line.event)).toEqual(['apply_patch_started', 'apply_patch_failed']); + expect(lines[1]).toMatchObject({ + level: 'error', + phase: 'integration', + data: { unitKey: 'wu-1' }, + error: { name: 'Error', message: 'patch conflict' }, + }); + expect(lines[1].durationMs).toBe(17); + vi.useRealTimers(); + }); + + it('uses the documented trace path layout', () => { + expect(ingestTracePathForJob('/project/.ktx', 'job-3')).toBe('/project/.ktx/ingest-traces/job-3/trace.jsonl'); + }); +}); diff --git a/packages/context/src/ingest/ingest-trace.ts b/packages/context/src/ingest/ingest-trace.ts new file mode 100644 index 00000000..eed0cfd5 --- /dev/null +++ b/packages/context/src/ingest/ingest-trace.ts @@ -0,0 +1,158 @@ +import { appendFile, mkdir } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; + +export type IngestTraceLevel = 'info' | 'debug' | 'trace' | 'error'; + +const TRACE_LEVEL_RANK: Record = { + error: 0, + info: 1, + debug: 2, + trace: 3, +}; + +export interface IngestTraceContext { + tracePath: string; + jobId: string; + connectionId: string; + sourceKey: string; + runId?: string; + syncId?: string; + level?: IngestTraceLevel; +} + +export interface IngestTraceEvent { + schemaVersion: 1; + at: string; + level: IngestTraceLevel; + jobId: string; + connectionId: string; + sourceKey: string; + runId?: string; + syncId?: string; + phase: string; + event: string; + durationMs?: number; + data?: Record; + error?: { + name: string; + message: string; + stack?: string; + }; +} + +export interface IngestTraceWriter { + readonly tracePath: string; + readonly context: IngestTraceContext; + withContext(context: Partial>): IngestTraceWriter; + event( + level: IngestTraceLevel, + phase: string, + event: string, + data?: Record, + error?: unknown, + durationMs?: number, + ): Promise; +} + +export function ingestTracePathForJob(homeDir: string, jobId: string): string { + return join(homeDir, 'ingest-traces', jobId, 'trace.jsonl'); +} + +function serializeError(error: unknown): IngestTraceEvent['error'] | undefined { + if (error === undefined || error === null) { + return undefined; + } + if (error instanceof Error) { + return { + name: error.name, + message: error.message, + ...(error.stack ? { stack: error.stack } : {}), + }; + } + return { name: 'Error', message: String(error) }; +} + +function shouldWrite(configured: IngestTraceLevel, incoming: IngestTraceLevel): boolean { + return TRACE_LEVEL_RANK[incoming] <= TRACE_LEVEL_RANK[configured]; +} + +export class FileIngestTraceWriter implements IngestTraceWriter { + readonly tracePath: string; + readonly context: IngestTraceContext; + + constructor(context: IngestTraceContext) { + this.context = { ...context, level: context.level ?? 'debug' }; + this.tracePath = context.tracePath; + } + + withContext(context: Partial>): IngestTraceWriter { + return new FileIngestTraceWriter({ ...this.context, ...context, tracePath: this.tracePath }); + } + + async event( + level: IngestTraceLevel, + phase: string, + event: string, + data?: Record, + error?: unknown, + durationMs?: number, + ): Promise { + if (!shouldWrite(this.context.level ?? 'debug', level)) { + return; + } + const serializedError = serializeError(error); + const payload: IngestTraceEvent = { + schemaVersion: 1, + at: new Date().toISOString(), + level, + jobId: this.context.jobId, + connectionId: this.context.connectionId, + sourceKey: this.context.sourceKey, + ...(this.context.runId ? { runId: this.context.runId } : {}), + ...(this.context.syncId ? { syncId: this.context.syncId } : {}), + phase, + event, + ...(durationMs !== undefined ? { durationMs } : {}), + ...(data ? { data } : {}), + ...(serializedError ? { error: serializedError } : {}), + }; + await mkdir(dirname(this.tracePath), { recursive: true }); + await appendFile(this.tracePath, `${JSON.stringify(payload)}\n`, 'utf-8'); + } +} + +export class NoopIngestTraceWriter implements IngestTraceWriter { + readonly tracePath = ''; + readonly context: IngestTraceContext = { + tracePath: '', + jobId: '', + connectionId: '', + sourceKey: '', + level: 'error', + }; + + withContext(): IngestTraceWriter { + return this; + } + + async event(): Promise {} +} + +export async function traceTimed( + trace: IngestTraceWriter, + phase: string, + event: string, + data: Record, + fn: () => Promise, +): Promise { + await trace.event('debug', phase, `${event}_started`, data); + const started = Date.now(); + try { + const result = await fn(); + await trace.event('debug', phase, `${event}_finished`, data, undefined, Date.now() - started); + return result; + } catch (error) { + await trace.event('error', phase, `${event}_failed`, data, error, Date.now() - started); + throw error; + } +} diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index 4f52684e..d8fffc3b 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -76,6 +76,7 @@ import { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evide import { HistoricSqlProjectionPostProcessor } from './adapters/historic-sql/post-processor.js'; import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './context-evidence/index.js'; import { DiffSetService } from './diff-set.service.js'; +import { ingestTracePathForJob } from './ingest-trace.js'; import { IngestBundleRunner } from './ingest-bundle.runner.js'; import { PageTriageService } from './page-triage/index.js'; import { createWarehouseVerificationTools } from './tools/warehouse-verification/index.js'; @@ -151,6 +152,10 @@ class LocalIngestStorage implements IngestStoragePort { resolveTranscriptDir(jobId: string): string { return join(this.project.projectDir, '.ktx/ingest-transcripts', jobId); } + + resolveTracePath(jobId: string): string { + return ingestTracePathForJob(this.homeDir, jobId); + } } class LocalIngestLock implements IngestLockPort { @@ -671,6 +676,8 @@ export function createLocalBundleIngestRuntime( workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency, workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget, workUnitFailureMode: options.project.config.ingest.workUnits.failureMode, + isolatedDiffSourceKeys: ['metabase'], + ingestTraceLevel: 'debug', }, skillsRegistry: new SkillsRegistryService({ skillsDir, logger }), promptService, diff --git a/packages/context/src/ingest/ports.ts b/packages/context/src/ingest/ports.ts index 6f0e9f1e..3527d29c 100644 --- a/packages/context/src/ingest/ports.ts +++ b/packages/context/src/ingest/ports.ts @@ -16,6 +16,7 @@ import type { import type { ToolContext, ToolSession, TouchedSlSource } from '../tools/index.js'; import type { KnowledgeIndexPort, KnowledgeWikiService } from '../wiki/index.js'; import type { CanonicalPin } from './canonical-pins.js'; +import type { IngestTraceLevel } from './ingest-trace.js'; import type { IngestReportSnapshot } from './reports.js'; import type { ReconcileCandidateForPrompt, @@ -142,6 +143,8 @@ export interface IngestSettingsPort { workUnitMaxConcurrency?: number; workUnitStepBudget?: number; workUnitFailureMode?: 'abort' | 'continue'; + isolatedDiffSourceKeys?: string[]; + ingestTraceLevel?: IngestTraceLevel; } export interface IngestGitAuthor { @@ -155,6 +158,7 @@ export interface IngestStoragePort { resolveUploadDir(uploadId: string): string; resolvePullDir(jobId: string): string; resolveTranscriptDir(jobId: string): string; + resolveTracePath(jobId: string): string; } export interface IngestCommitMessagePort { diff --git a/packages/context/src/ingest/report-snapshot.ts b/packages/context/src/ingest/report-snapshot.ts index de377dd5..9d2e1062 100644 --- a/packages/context/src/ingest/report-snapshot.ts +++ b/packages/context/src/ingest/report-snapshot.ts @@ -137,6 +137,18 @@ export const ingestReportSnapshotSchema = z diffSummary: ingestDiffSummarySchema, fetch: sourceFetchReportSchema.optional(), commitSha: z.string().nullable(), + tracePath: z.string().optional(), + isolatedDiff: z + .object({ + enabled: z.boolean(), + integrationWorktreePath: z.string().optional(), + ingestionBaseSha: z.string().optional(), + projectionSha: z.string().nullable().optional(), + acceptedPatches: z.number().int().min(0), + textualConflicts: z.number().int().min(0), + semanticConflicts: z.number().int().min(0), + }) + .optional(), workUnits: z.array( z.object({ unitKey: z.string().min(1), diff --git a/packages/context/src/ingest/reports.ts b/packages/context/src/ingest/reports.ts index 672c5bfb..95d610f2 100644 --- a/packages/context/src/ingest/reports.ts +++ b/packages/context/src/ingest/reports.ts @@ -53,6 +53,16 @@ export interface IngestReportBody { diffSummary: IngestDiffSummary; fetch?: SourceFetchReport; commitSha: string | null; + tracePath?: string; + isolatedDiff?: { + enabled: boolean; + integrationWorktreePath?: string; + ingestionBaseSha?: string; + projectionSha?: string | null; + acceptedPatches: number; + textualConflicts: number; + semanticConflicts: number; + }; workUnits: IngestReportWorkUnit[]; failedWorkUnits: string[]; reconciliationSkipped: boolean;