fix(ingest): gate provenance before isolated diff squash

This commit is contained in:
Andrey Avtomonov 2026-05-17 22:02:32 +02:00
parent ea6dca4568
commit 977a610ea9
5 changed files with 440 additions and 110 deletions

View file

@ -609,6 +609,175 @@ describe('IngestBundleRunner isolated diff path', () => {
}
});
it('rejects invalid provenance raw paths before squash reaches main', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
const createdReports: any[] = [];
deps.reports.create = vi.fn(async (args: any) => {
createdReports.push(args);
return { id: `report-${createdReports.length}` };
});
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'card-valid-artifacts',
rawFiles: ['cards/source.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr`.\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Valid source',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Valid wiki with invalid provenance raw path',
rawPaths: ['cards/missing.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
'valid artifacts with invalid provenance',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
const preRunHead = await runtime.git.revParseHead();
await expect(
runner.run({
jobId: 'job-invalid-provenance',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/provenance row references raw path outside this snapshot: cards\/missing\.json/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
expect(deps.provenance.insertMany).not.toHaveBeenCalled();
const failureReport = createdReports.find((report) => report.body.status === 'failed');
expect(failureReport.body.tracePath).toContain('job-invalid-provenance/trace.jsonl');
expect(failureReport.body.failure).toMatchObject({
phase: 'provenance_validation',
message: expect.stringContaining('cards/missing.json'),
});
expect(failureReport.body.failure.details).toMatchObject({
invalidRawPaths: ['cards/missing.json'],
currentRawPaths: ['cards/source.json'],
invalidRows: expect.arrayContaining([
expect.objectContaining({
row: expect.objectContaining({
rawPath: 'cards/missing.json',
artifactKind: 'wiki',
artifactKey: 'account-segments',
actionType: 'wiki_written',
}),
origin: expect.objectContaining({
source: 'work_unit_action',
unitKey: 'card-valid-artifacts',
actionIndex: 1,
unitRawFiles: ['cards/source.json'],
action: expect.objectContaining({
target: 'wiki',
type: 'created',
key: 'account-segments',
rawPaths: ['cards/missing.json'],
}),
}),
}),
]),
});
expect(failureReport.body.provenanceRows).toEqual(
expect.arrayContaining([
expect.objectContaining({ rawPath: 'cards/source.json', artifactKind: 'sl', artifactKey: 'mart_account_segments' }),
expect.objectContaining({ rawPath: 'cards/missing.json', artifactKind: 'wiki', artifactKey: 'account-segments' }),
]),
);
expect(failureReport.body.workUnits).toEqual(
expect.arrayContaining([
expect.objectContaining({
unitKey: 'card-valid-artifacts',
rawFiles: ['cards/source.json'],
actions: expect.arrayContaining([
expect.objectContaining({
target: 'wiki',
key: 'account-segments',
rawPaths: ['cards/missing.json'],
}),
]),
}),
]),
);
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-invalid-provenance/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'final_artifact_gates_finished',
'provenance_rows_validation_failed',
'ingest_failed',
'failure_report_created',
]),
);
expect(events.map((event) => event.event)).not.toContain('squash_finished');
const validationFailure = events.find((event) => event.event === 'provenance_rows_validation_failed');
expect(validationFailure).toMatchObject({
phase: 'provenance',
data: {
invalidRawPaths: ['cards/missing.json'],
currentRawPaths: ['cards/source.json'],
invalidRows: expect.arrayContaining([
expect.objectContaining({
row: expect.objectContaining({ rawPath: 'cards/missing.json' }),
origin: expect.objectContaining({
source: 'work_unit_action',
unitKey: 'card-valid-artifacts',
actionIndex: 1,
}),
}),
]),
},
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects slDisallowed patches that touch semantic-layer files', async () => {
const runtime = await makeRealGitRuntime();
try {

View file

@ -167,10 +167,11 @@ const makeDeps = () => {
loadPrompt: vi.fn().mockResolvedValue('base-framing'),
};
const wikiService = {
forWorktree: vi.fn().mockReturnValue({}),
forWorktree: vi.fn(),
readPage: vi.fn().mockResolvedValue(null),
syncFromCommit: vi.fn().mockResolvedValue(undefined),
};
wikiService.forWorktree.mockReturnValue(wikiService);
const knowledgeSlRefs = {
syncFromWiki: vi.fn().mockResolvedValue({ inserted: 1, deleted: 0 }),
};
@ -178,7 +179,7 @@ const makeDeps = () => {
listPagesForUser: vi.fn().mockResolvedValue([]),
};
const semanticLayerService = {
forWorktree: vi.fn().mockReturnValue({}),
forWorktree: vi.fn(),
listFilesForConnection: vi
.fn()
.mockImplementation((connectionId: string) =>
@ -193,6 +194,7 @@ const makeDeps = () => {
}),
),
};
semanticLayerService.forWorktree.mockReturnValue(semanticLayerService);
const slSearchService = {
indexSources: vi.fn().mockResolvedValue(undefined),
};

View file

@ -22,6 +22,7 @@ import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-fl
import type {
ContextEvidenceIndexSummary,
IngestBundleRunnerDeps,
IngestProvenanceInsert,
IngestProvenanceRow,
IngestRunsPort,
IngestSessionWorktree,
@ -32,7 +33,9 @@ import {
buildStageIndexFromReportBody,
postProcessorSavedMemoryCounts,
type IngestReportPostProcessorOutcome,
type IngestReportProvenanceDetail,
type IngestReportSnapshot,
type IngestReportWorkUnit,
} from './reports.js';
import {
buildReconcileSystemPrompt,
@ -142,6 +145,40 @@ function rawPathsForAction(action: MemoryAction, fallbackRawPaths: string[]): st
return action.rawPaths && action.rawPaths.length > 0 ? [...new Set(action.rawPaths)] : fallbackRawPaths;
}
type ProvenanceRowOrigin =
| {
source: 'work_unit_action';
unitKey: string;
unitIndex: number;
unitRawFiles: string[];
actionIndex: number;
action: MemoryAction;
}
| {
source: 'reconciliation_action';
actionIndex: number;
action: MemoryAction;
}
| {
source: 'artifact_resolution';
resolutionIndex: number;
resolution: NonNullable<StageIndex['artifactResolutions']>[number];
}
| {
source: 'raw_snapshot_fallback';
rawPath: string;
};
interface ProvenanceRowDiagnostic {
row: IngestProvenanceInsert;
origin: ProvenanceRowOrigin;
}
interface ProvenancePlan {
rows: IngestProvenanceInsert[];
diagnostics: ProvenanceRowDiagnostic[];
}
export class IngestBundleRunner {
private readonly logger: KtxLogger;
private readonly chainByConnection = new Map<string, Promise<unknown>>();
@ -426,6 +463,157 @@ export class IngestBundleRunner {
return error instanceof Error ? error.message : String(error);
}
private buildProvenancePlan(input: {
job: IngestBundleJob;
syncId: string;
currentHashes: Map<string, string>;
stageIndex: StageIndex;
reconcileActions: MemoryAction[];
}): ProvenancePlan {
const rows: IngestProvenanceInsert[] = [];
const diagnostics: ProvenanceRowDiagnostic[] = [];
const actionToType = (action: MemoryAction): IngestProvenanceInsert['actionType'] => {
if (action.target === 'wiki') {
return 'wiki_written';
}
return action.type === 'created' ? 'source_created' : 'measure_added';
};
const producedPaths = new Set<string>();
const pushRow = (row: IngestProvenanceInsert, origin: ProvenanceRowOrigin): void => {
rows.push(row);
diagnostics.push({ row, origin });
producedPaths.add(row.rawPath);
};
const pushActionProvenance = (rawPath: string, action: MemoryAction, origin: ProvenanceRowOrigin): void => {
const hash = input.currentHashes.get(rawPath) ?? '';
pushRow(
{
connectionId: input.job.connectionId,
sourceKey: input.job.sourceKey,
syncId: input.syncId,
rawPath,
rawContentHash: hash,
artifactKind: action.target,
artifactKey: action.key,
targetConnectionId: action.target === 'sl' ? actionTargetConnectionId(action, input.job.connectionId) : null,
artifactContentHash: null,
actionType: actionToType(action),
},
origin,
);
};
input.stageIndex.workUnits.forEach((wu, unitIndex) => {
wu.actions.forEach((action, actionIndex) => {
for (const rawPath of rawPathsForAction(action, wu.rawFiles)) {
pushActionProvenance(rawPath, action, {
source: 'work_unit_action',
unitKey: wu.unitKey,
unitIndex,
unitRawFiles: wu.rawFiles,
actionIndex,
action,
});
}
});
});
input.reconcileActions.forEach((action, actionIndex) => {
for (const rawPath of action.rawPaths ?? []) {
pushActionProvenance(rawPath, action, {
source: 'reconciliation_action',
actionIndex,
action,
});
}
});
(input.stageIndex.artifactResolutions ?? []).forEach((resolution, resolutionIndex) => {
const hash = input.currentHashes.get(resolution.rawPath) ?? '';
pushRow(
{
connectionId: input.job.connectionId,
sourceKey: input.job.sourceKey,
syncId: input.syncId,
rawPath: resolution.rawPath,
rawContentHash: hash,
artifactKind: resolution.artifactKind,
artifactKey: resolution.artifactKey,
targetConnectionId: null,
artifactContentHash: null,
actionType: resolution.actionType,
},
{
source: 'artifact_resolution',
resolutionIndex,
resolution,
},
);
});
for (const [rawPath, hash] of input.currentHashes) {
if (producedPaths.has(rawPath)) {
continue;
}
pushRow(
{
connectionId: input.job.connectionId,
sourceKey: input.job.sourceKey,
syncId: input.syncId,
rawPath,
rawContentHash: hash,
artifactKind: null,
artifactKey: null,
targetConnectionId: null,
artifactContentHash: null,
actionType: 'skipped',
},
{ source: 'raw_snapshot_fallback', rawPath },
);
}
return { rows, diagnostics };
}
private toReportProvenanceRows(rows: IngestProvenanceInsert[]): IngestReportProvenanceDetail[] {
return rows.map(({ rawPath, artifactKind, artifactKey, actionType, targetConnectionId }) => ({
rawPath,
artifactKind,
artifactKey,
targetConnectionId: targetConnectionId ?? null,
actionType,
}));
}
private toReportWorkUnits(stageIndex: StageIndex): IngestReportWorkUnit[] {
return stageIndex.workUnits.map((wu) => ({
unitKey: wu.unitKey,
rawFiles: wu.rawFiles,
status: wu.status,
reason: wu.reason,
actions: wu.actions,
touchedSlSources: wu.touchedSlSources,
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
}));
}
private provenanceValidationTraceData(input: {
plan: ProvenancePlan;
currentRawPaths: Set<string>;
deletedRawPaths: Set<string>;
}): Record<string, unknown> {
const invalidRows = input.plan.diagnostics.filter(
({ row }) => !input.currentRawPaths.has(row.rawPath) && !input.deletedRawPaths.has(row.rawPath),
);
return {
rowCount: input.plan.rows.length,
currentRawPathCount: input.currentRawPaths.size,
deletedRawPathCount: input.deletedRawPaths.size,
currentRawPaths: [...input.currentRawPaths].sort(),
deletedRawPaths: [...input.deletedRawPaths].sort(),
invalidRawPaths: [...new Set(invalidRows.map(({ row }) => row.rawPath))].sort(),
invalidRows,
};
}
private wikiPageKeysFromPaths(paths: string[]): string[] {
return [
...new Set(
@ -673,6 +861,16 @@ export class IngestBundleRunner {
let latestWorkUnits: WorkUnitOutcome[] = [];
let latestFailedWorkUnits: string[] = [];
let latestReconciliationSkipped = true;
let latestReportWorkUnits: IngestReportWorkUnit[] = [];
let latestReconciliationActions: MemoryAction[] = [];
let latestConflictsResolved: StageIndex['conflictsResolved'] = [];
let latestEvictionsApplied: StageIndex['evictionsApplied'] = [];
let latestUnmappedFallbacks: StageIndex['unmappedFallbacks'] = [];
let latestArtifactResolutions: NonNullable<StageIndex['artifactResolutions']> = [];
let latestEvictionInputs: string[] = [];
let latestUnresolvedCards: UnresolvedCardInfo[] = [];
let latestReportProvenanceRows: IngestReportProvenanceDetail[] = [];
let activeFailureDetails: Record<string, unknown> | undefined;
let latestIsolatedDiffSummary:
| {
enabled: boolean;
@ -1495,6 +1693,7 @@ export class IngestBundleRunner {
slDisallowed: o.slDisallowed,
slDisallowedReason: o.slDisallowedReason,
}));
latestReportWorkUnits = this.toReportWorkUnits(stageIndex);
}
const carryForwardResult =
contextReport && this.deps.contextCandidateCarryforward
@ -1928,6 +2127,48 @@ export class IngestBundleRunner {
},
);
activePhase = 'provenance_validation';
latestReportWorkUnits = this.toReportWorkUnits(stageIndex);
latestReconciliationActions = reconcileActions;
latestConflictsResolved = stageIndex.conflictsResolved;
latestEvictionsApplied = stageIndex.evictionsApplied;
latestUnmappedFallbacks = stageIndex.unmappedFallbacks;
latestArtifactResolutions = stageIndex.artifactResolutions ?? [];
latestEvictionInputs = eviction?.deletedRawPaths ?? [];
latestUnresolvedCards = unresolvedCards ?? [];
const provenancePlan = this.buildProvenancePlan({
job,
syncId,
currentHashes,
stageIndex,
reconcileActions,
});
const provenanceRows = provenancePlan.rows;
const currentRawPaths = new Set(currentHashes.keys());
const deletedRawPaths = new Set(eviction?.deletedRawPaths ?? []);
const provenanceValidationData = this.provenanceValidationTraceData({
plan: provenancePlan,
currentRawPaths,
deletedRawPaths,
});
const reportProvenanceRows = this.toReportProvenanceRows(provenanceRows);
latestReportProvenanceRows = reportProvenanceRows;
activeFailureDetails = provenanceValidationData;
await traceTimed(
runTrace,
'provenance',
'provenance_rows_validation',
provenanceValidationData,
async () => {
validateProvenanceRawPaths({
rows: provenanceRows,
currentRawPaths,
deletedRawPaths,
});
},
);
activeFailureDetails = undefined;
// Stage 6 — squash commit
activePhase = 'squash';
const stage6 = ctx?.startPhase(0.04);
@ -2003,89 +2244,10 @@ export class IngestBundleRunner {
await stage5?.updateProgress(0.0, 'Recording history');
activePhase = 'provenance';
// Provenance rows: per-artifact when the WU emitted actions, plus a `skipped`
// fallback for raw files that produced nothing so the next DiffSet still sees
// them.
const provenanceRows: Parameters<typeof this.deps.provenance.insertMany>[0] = [];
const actionToType = (a: MemoryAction): 'source_created' | 'measure_added' | 'wiki_written' => {
if (a.target === 'wiki') {
return 'wiki_written';
}
// SL action: 'created' → source_created; 'updated' → measure_added (coarse-grained;
// action.detail preserves the finer distinction for the report body).
return a.type === 'created' ? 'source_created' : 'measure_added';
};
const producedPaths = new Set<string>();
const pushActionProvenance = (rawPath: string, action: MemoryAction): void => {
const hash = currentHashes.get(rawPath) ?? '';
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
rawPath,
rawContentHash: hash,
artifactKind: action.target,
artifactKey: action.key,
targetConnectionId: action.target === 'sl' ? actionTargetConnectionId(action, job.connectionId) : null,
artifactContentHash: null,
actionType: actionToType(action),
});
producedPaths.add(rawPath);
};
for (const wu of stageIndex.workUnits) {
for (const action of wu.actions) {
for (const rawPath of rawPathsForAction(action, wu.rawFiles)) {
pushActionProvenance(rawPath, action);
}
}
}
for (const action of reconcileActions) {
for (const rawPath of action.rawPaths ?? []) {
pushActionProvenance(rawPath, action);
}
}
for (const resolution of stageIndex.artifactResolutions ?? []) {
const hash = currentHashes.get(resolution.rawPath) ?? '';
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
rawPath: resolution.rawPath,
rawContentHash: hash,
artifactKind: resolution.artifactKind,
artifactKey: resolution.artifactKey,
targetConnectionId: null,
artifactContentHash: null,
actionType: resolution.actionType,
});
producedPaths.add(resolution.rawPath);
}
for (const [rawPath, hash] of currentHashes) {
if (producedPaths.has(rawPath)) {
continue;
}
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
rawPath,
rawContentHash: hash,
artifactKind: null,
artifactKey: null,
targetConnectionId: null,
artifactContentHash: null,
actionType: 'skipped',
});
}
validateProvenanceRawPaths({
rows: provenanceRows,
currentRawPaths: new Set(currentHashes.keys()),
deletedRawPaths: new Set(eviction?.deletedRawPaths ?? []),
});
await runTrace.event('debug', 'provenance', 'provenance_rows_validated', {
await this.deps.provenance.insertMany(provenanceRows);
await runTrace.event('debug', 'provenance', 'provenance_rows_inserted', {
rowCount: provenanceRows.length,
});
await this.deps.provenance.insertMany(provenanceRows);
memoryFlow?.emit({ type: 'provenance_recorded', rowCount: provenanceRows.length });
await stage5?.updateProgress(
1.0,
@ -2096,15 +2258,6 @@ export class IngestBundleRunner {
await stage7?.updateProgress(0.0, 'Wrapping up');
activePhase = 'report';
const reportProvenanceRows = provenanceRows.map(
({ rawPath, artifactKind, artifactKey, actionType, targetConnectionId }) => ({
rawPath,
artifactKind,
artifactKey,
targetConnectionId: targetConnectionId ?? null,
actionType,
}),
);
const reportToolTranscripts = Array.from(transcriptSummaries.values()).map((summary) => ({
unitKey: summary.unitKey,
path: summary.path,
@ -2307,30 +2460,34 @@ export class IngestBundleRunner {
failure: {
phase: activePhase,
message: this.errorMessage(error),
...(activeFailureDetails ? { details: activeFailureDetails } : {}),
},
workUnits: latestWorkUnits.map((wu) => ({
unitKey: wu.unitKey,
rawFiles: [],
status: wu.status,
reason: wu.reason,
actions: wu.actions,
touchedSlSources: wu.touchedSlSources,
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
})),
workUnits:
latestReportWorkUnits.length > 0
? latestReportWorkUnits
: latestWorkUnits.map((wu) => ({
unitKey: wu.unitKey,
rawFiles: [],
status: wu.status,
reason: wu.reason,
actions: wu.actions,
touchedSlSources: wu.touchedSlSources,
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
})),
failedWorkUnits: latestFailedWorkUnits,
reconciliationSkipped: latestReconciliationSkipped,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
artifactResolutions: [],
evictionInputs: [],
reconciliationActions: [],
conflictsResolved: latestConflictsResolved,
evictionsApplied: latestEvictionsApplied,
unmappedFallbacks: latestUnmappedFallbacks,
artifactResolutions: latestArtifactResolutions,
evictionInputs: latestEvictionInputs,
reconciliationActions: latestReconciliationActions,
evictionDecisions: [],
unresolvedCards: [],
unresolvedCards: latestUnresolvedCards,
supersededBy: null,
overrideOf: null,
provenanceRows: [],
provenanceRows: latestReportProvenanceRows,
toolTranscripts: Array.from(transcriptSummaries.values()).map((summary) => ({
unitKey: summary.unitKey,
path: summary.path,

View file

@ -126,6 +126,7 @@ const sourceFetchReportSchema = z.object({
const ingestReportFailureSchema = z.object({
phase: z.string().min(1),
message: z.string().min(1),
details: z.record(z.string(), z.unknown()).optional(),
});
export const ingestReportSnapshotSchema = z

View file

@ -51,6 +51,7 @@ export interface IngestReportPostProcessorOutcome {
export interface IngestReportFailure {
phase: string;
message: string;
details?: Record<string, unknown>;
}
export interface IngestReportBody {