fix(ingest): trace isolated SL target policy gates

This commit is contained in:
Andrey Avtomonov 2026-05-17 22:12:53 +02:00
parent 9d756b2c6c
commit 3613fb3686
4 changed files with 108 additions and 9 deletions

View file

@ -17,6 +17,11 @@ export {
buildLiveDatabaseTableNaturalKey,
ktxSchemaSnapshotToExtractedSchema,
} from './adapters/live-database/extracted-schema.js';
export {
assertSemanticLayerTargetPathsAllowed,
findDisallowedSemanticLayerTargetPaths,
semanticLayerConnectionIdFromPath,
} from './semantic-layer-target-policy.js';
export { LiveDatabaseSourceAdapter } from './adapters/live-database/live-database.adapter.js';
export type {
BuildLiveDatabaseManifestShardsInput,

View file

@ -48,6 +48,7 @@ import { executeWorkUnit, type WorkUnitOutcome } from './stages/stage-3-work-uni
import { runReconciliationStage4 } from './stages/stage-4-reconciliation.js';
import type { StageIndex } from './stages/stage-index.types.js';
import { validateWuTouchedSources } from './stages/validate-wu-sources.js';
import { assertSemanticLayerTargetPathsAllowed } from './semantic-layer-target-policy.js';
import { createEmitArtifactResolutionTool } from './tools/emit-artifact-resolution.tool.js';
import { createEmitConflictResolutionTool } from './tools/emit-conflict-resolution.tool.js';
import { createEmitEvictionDecisionTool } from './tools/emit-eviction-decision.tool.js';
@ -1210,6 +1211,7 @@ export class IngestBundleRunner {
this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`);
let projectionTouchedSources: TouchedSlSource[] = [];
let projectionChangedWikiPageKeys: string[] = [];
let projectionTouchedPaths: string[] = [];
if (!overrideReport && isolatedDiffEnabled) {
await runTrace.event('info', 'routing', 'isolated_diff_enabled', {
@ -1246,6 +1248,7 @@ export class IngestBundleRunner {
...projection.touchedSources.map((source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`),
...projection.changedWikiPageKeys.map((pageKey) => `wiki/global/${pageKey}.md`),
];
projectionTouchedPaths = projectionPaths;
const projectionCommit =
projectionPaths.length > 0
? await sessionWorktree.git.commitFiles(
@ -1392,6 +1395,12 @@ export class IngestBundleRunner {
if (!wu) {
continue;
}
const integrationFailureDetails = {
unitKey: outcome.unitKey,
patchPath: outcome.patchPath,
allowedTargetConnectionIds: slConnectionIds,
};
activeFailureDetails = integrationFailureDetails;
const integration = await integrateWorkUnitPatch({
unitKey: outcome.unitKey,
patchPath: outcome.patchPath,
@ -1399,6 +1408,7 @@ export class IngestBundleRunner {
trace: runTrace,
author: this.deps.storage.systemGitAuthor,
slDisallowed: wu.slDisallowed === true,
allowedTargetConnectionIds: new Set(slConnectionIds),
validateAppliedTree: async (touchedPaths) => {
await validateFinalIngestArtifacts({
connectionIds: slConnectionIds,
@ -1432,14 +1442,25 @@ export class IngestBundleRunner {
isolatedDiffSummary.textualConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
activeFailureDetails = {
...integrationFailureDetails,
touchedPaths: integration.touchedPaths,
reason: integration.reason,
};
throw new Error(`isolated diff textual conflict in ${outcome.unitKey}: ${integration.reason}`);
}
if (integration.status === 'semantic_conflict') {
isolatedDiffSummary.semanticConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
activeFailureDetails = {
...integrationFailureDetails,
touchedPaths: integration.touchedPaths,
reason: integration.reason,
};
throw new Error(`isolated diff semantic conflict in ${outcome.unitKey}: ${integration.reason}`);
}
activeFailureDetails = undefined;
isolatedDiffSummary.acceptedPatches += 1;
}
@ -2083,20 +2104,44 @@ export class IngestBundleRunner {
...(postProcessorOutcome?.touchedSources ?? []),
]);
const finalTargetPolicyPaths = [
...projectionTouchedPaths,
...workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []),
...postReconciliationPaths,
...(postProcessorOutcome?.touchedSources ?? []).map(
(source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`,
),
];
const targetPolicyTraceData = {
allowedTargetConnectionIds: slConnectionIds,
touchedPaths: [...new Set(finalTargetPolicyPaths)].sort(),
};
activePhase = 'target_policy';
activeFailureDetails = targetPolicyTraceData;
await traceTimed(runTrace, 'target_policy', 'semantic_layer_target_policy', targetPolicyTraceData, async () => {
assertSemanticLayerTargetPathsAllowed({
paths: finalTargetPolicyPaths,
allowedConnectionIds: new Set(slConnectionIds),
});
});
activeFailureDetails = undefined;
const finalArtifactGateTraceData = {
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
preReconciliationSha,
postReconciliationSha,
postReconciliationPaths,
reconciliationActionCount: reconcileActions.length,
wikiSlRefRepairCount: wikiSlRefRepairResult.repairs.length,
};
activePhase = 'final_gates';
activeFailureDetails = finalArtifactGateTraceData;
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates',
{
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
preReconciliationSha,
postReconciliationSha,
postReconciliationPaths,
reconciliationActionCount: reconcileActions.length,
wikiSlRefRepairCount: wikiSlRefRepairResult.repairs.length,
},
finalArtifactGateTraceData,
async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
@ -2126,6 +2171,7 @@ export class IngestBundleRunner {
});
},
);
activeFailureDetails = undefined;
activePhase = 'provenance_validation';
latestReportWorkUnits = this.toReportWorkUnits(stageIndex);

View file

@ -52,6 +52,7 @@ describe('integrateWorkUnitPatch', () => {
author: { name: 'KTX Test', email: 'system@ktx.local' },
validateAppliedTree: vi.fn().mockResolvedValue(undefined),
slDisallowed: false,
allowedTargetConnectionIds: new Set(['c1']),
});
expect(result.status).toBe('accepted');
@ -84,6 +85,7 @@ describe('integrateWorkUnitPatch', () => {
author: { name: 'KTX Test', email: 'system@ktx.local' },
validateAppliedTree: vi.fn().mockRejectedValue(new Error('final artifact gates failed')),
slDisallowed: false,
allowedTargetConnectionIds: new Set(['c1']),
});
expect(result.status).toBe('semantic_conflict');
@ -118,6 +120,7 @@ describe('integrateWorkUnitPatch', () => {
author: { name: 'KTX Test', email: 'system@ktx.local' },
validateAppliedTree: vi.fn().mockResolvedValue(undefined),
slDisallowed: true,
allowedTargetConnectionIds: new Set(['c1']),
});
expect(result).toMatchObject({
@ -128,4 +131,46 @@ describe('integrateWorkUnitPatch', () => {
expect(rawTrace).toContain('patch_policy_rejected');
expect(rawTrace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/c1/orders.yaml');
});
it('classifies unauthorized semantic-layer targets as traced textual conflicts', async () => {
const { homeDir, git, baseSha } = await makeRepo();
const childDir = join(homeDir, 'child-target-policy');
await git.addWorktree(childDir, 'child-target-policy', baseSha);
const childGit = git.forWorktree(childDir);
await mkdir(join(childDir, 'semantic-layer/finance'), { recursive: true });
await writeFile(
join(childDir, 'semantic-layer/finance/orders.yaml'),
'name: orders\ncolumns: []\njoins: []\nmeasures: []\n',
);
await childGit.commitFiles(['semantic-layer/finance/orders.yaml'], 'unauthorized sl', 'System User', 'system@example.com');
const patchPath = join(homeDir, 'patches/unauthorized.patch');
await childGit.writeBinaryNoRenamePatch(baseSha, 'HEAD', patchPath);
const trace = new FileIngestTraceWriter({
tracePath: join(homeDir, '.ktx/ingest-traces/job-target-policy/trace.jsonl'),
jobId: 'job-target-policy',
connectionId: 'c1',
sourceKey: 'fake',
level: 'trace',
});
const result = await integrateWorkUnitPatch({
unitKey: 'wu-finance',
patchPath,
integrationGit: git,
trace,
author: { name: 'KTX Test', email: 'system@ktx.local' },
validateAppliedTree: vi.fn().mockResolvedValue(undefined),
slDisallowed: false,
allowedTargetConnectionIds: new Set(['warehouse']),
});
expect(result).toMatchObject({
status: 'textual_conflict',
touchedPaths: ['semantic-layer/finance/orders.yaml'],
});
const rawTrace = await readFile(trace.tracePath, 'utf-8');
expect(rawTrace).toContain('patch_policy_rejected');
expect(rawTrace).toContain('semantic-layer target connection not allowed');
expect(rawTrace).toContain('allowedTargetConnectionIds');
});
});

View file

@ -16,6 +16,7 @@ export interface IntegrateWorkUnitPatchInput {
trace: IngestTraceWriter;
author: { name: string; email: string };
slDisallowed: boolean;
allowedTargetConnectionIds: ReadonlySet<string>;
validateAppliedTree(touchedPaths: string[]): Promise<void>;
}
@ -40,12 +41,14 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
unitKey: input.unitKey,
patch,
slDisallowed: input.slDisallowed,
allowedTargetConnectionIds: input.allowedTargetConnectionIds,
});
} catch (error) {
await input.trace.event('error', 'integration', 'patch_policy_rejected', {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths,
allowedTargetConnectionIds: [...input.allowedTargetConnectionIds].sort(),
reason: errorMessage(error),
});
return {