diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index a7328574..450306dc 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -17,6 +17,11 @@ export { buildLiveDatabaseTableNaturalKey, ktxSchemaSnapshotToExtractedSchema, } from './adapters/live-database/extracted-schema.js'; +export { + assertSemanticLayerTargetPathsAllowed, + findDisallowedSemanticLayerTargetPaths, + semanticLayerConnectionIdFromPath, +} from './semantic-layer-target-policy.js'; export { LiveDatabaseSourceAdapter } from './adapters/live-database/live-database.adapter.js'; export type { BuildLiveDatabaseManifestShardsInput, diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index db78830e..d0c15ebe 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -48,6 +48,7 @@ import { executeWorkUnit, type WorkUnitOutcome } from './stages/stage-3-work-uni import { runReconciliationStage4 } from './stages/stage-4-reconciliation.js'; import type { StageIndex } from './stages/stage-index.types.js'; import { validateWuTouchedSources } from './stages/validate-wu-sources.js'; +import { assertSemanticLayerTargetPathsAllowed } from './semantic-layer-target-policy.js'; import { createEmitArtifactResolutionTool } from './tools/emit-artifact-resolution.tool.js'; import { createEmitConflictResolutionTool } from './tools/emit-conflict-resolution.tool.js'; import { createEmitEvictionDecisionTool } from './tools/emit-eviction-decision.tool.js'; @@ -1210,6 +1211,7 @@ export class IngestBundleRunner { this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`); let projectionTouchedSources: TouchedSlSource[] = []; let projectionChangedWikiPageKeys: string[] = []; + let projectionTouchedPaths: string[] = []; if (!overrideReport && isolatedDiffEnabled) { await runTrace.event('info', 'routing', 'isolated_diff_enabled', { @@ -1246,6 +1248,7 @@ export class IngestBundleRunner { ...projection.touchedSources.map((source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`), ...projection.changedWikiPageKeys.map((pageKey) => `wiki/global/${pageKey}.md`), ]; + projectionTouchedPaths = projectionPaths; const projectionCommit = projectionPaths.length > 0 ? await sessionWorktree.git.commitFiles( @@ -1392,6 +1395,12 @@ export class IngestBundleRunner { if (!wu) { continue; } + const integrationFailureDetails = { + unitKey: outcome.unitKey, + patchPath: outcome.patchPath, + allowedTargetConnectionIds: slConnectionIds, + }; + activeFailureDetails = integrationFailureDetails; const integration = await integrateWorkUnitPatch({ unitKey: outcome.unitKey, patchPath: outcome.patchPath, @@ -1399,6 +1408,7 @@ export class IngestBundleRunner { trace: runTrace, author: this.deps.storage.systemGitAuthor, slDisallowed: wu.slDisallowed === true, + allowedTargetConnectionIds: new Set(slConnectionIds), validateAppliedTree: async (touchedPaths) => { await validateFinalIngestArtifacts({ connectionIds: slConnectionIds, @@ -1432,14 +1442,25 @@ export class IngestBundleRunner { isolatedDiffSummary.textualConflicts += 1; await this.deps.runs.markFailed(runRow.id); cleanupOutcome = 'conflict'; + activeFailureDetails = { + ...integrationFailureDetails, + touchedPaths: integration.touchedPaths, + reason: integration.reason, + }; throw new Error(`isolated diff textual conflict in ${outcome.unitKey}: ${integration.reason}`); } if (integration.status === 'semantic_conflict') { isolatedDiffSummary.semanticConflicts += 1; await this.deps.runs.markFailed(runRow.id); cleanupOutcome = 'conflict'; + activeFailureDetails = { + ...integrationFailureDetails, + touchedPaths: integration.touchedPaths, + reason: integration.reason, + }; throw new Error(`isolated diff semantic conflict in ${outcome.unitKey}: ${integration.reason}`); } + activeFailureDetails = undefined; isolatedDiffSummary.acceptedPatches += 1; } @@ -2083,20 +2104,44 @@ export class IngestBundleRunner { ...(postProcessorOutcome?.touchedSources ?? []), ]); + const finalTargetPolicyPaths = [ + ...projectionTouchedPaths, + ...workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []), + ...postReconciliationPaths, + ...(postProcessorOutcome?.touchedSources ?? []).map( + (source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`, + ), + ]; + const targetPolicyTraceData = { + allowedTargetConnectionIds: slConnectionIds, + touchedPaths: [...new Set(finalTargetPolicyPaths)].sort(), + }; + activePhase = 'target_policy'; + activeFailureDetails = targetPolicyTraceData; + await traceTimed(runTrace, 'target_policy', 'semantic_layer_target_policy', targetPolicyTraceData, async () => { + assertSemanticLayerTargetPathsAllowed({ + paths: finalTargetPolicyPaths, + allowedConnectionIds: new Set(slConnectionIds), + }); + }); + activeFailureDetails = undefined; + + const finalArtifactGateTraceData = { + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + preReconciliationSha, + postReconciliationSha, + postReconciliationPaths, + reconciliationActionCount: reconcileActions.length, + wikiSlRefRepairCount: wikiSlRefRepairResult.repairs.length, + }; activePhase = 'final_gates'; + activeFailureDetails = finalArtifactGateTraceData; await traceTimed( runTrace, 'final_gates', 'final_artifact_gates', - { - changedWikiPageKeys: finalChangedWikiPageKeys, - touchedSlSources: finalTouchedSlSources, - preReconciliationSha, - postReconciliationSha, - postReconciliationPaths, - reconciliationActionCount: reconcileActions.length, - wikiSlRefRepairCount: wikiSlRefRepairResult.repairs.length, - }, + finalArtifactGateTraceData, async () => { await validateFinalIngestArtifacts({ connectionIds: repairConnectionIds, @@ -2126,6 +2171,7 @@ export class IngestBundleRunner { }); }, ); + activeFailureDetails = undefined; activePhase = 'provenance_validation'; latestReportWorkUnits = this.toReportWorkUnits(stageIndex); diff --git a/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts b/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts index 02a7defe..e1bf1120 100644 --- a/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts +++ b/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts @@ -52,6 +52,7 @@ describe('integrateWorkUnitPatch', () => { author: { name: 'KTX Test', email: 'system@ktx.local' }, validateAppliedTree: vi.fn().mockResolvedValue(undefined), slDisallowed: false, + allowedTargetConnectionIds: new Set(['c1']), }); expect(result.status).toBe('accepted'); @@ -84,6 +85,7 @@ describe('integrateWorkUnitPatch', () => { author: { name: 'KTX Test', email: 'system@ktx.local' }, validateAppliedTree: vi.fn().mockRejectedValue(new Error('final artifact gates failed')), slDisallowed: false, + allowedTargetConnectionIds: new Set(['c1']), }); expect(result.status).toBe('semantic_conflict'); @@ -118,6 +120,7 @@ describe('integrateWorkUnitPatch', () => { author: { name: 'KTX Test', email: 'system@ktx.local' }, validateAppliedTree: vi.fn().mockResolvedValue(undefined), slDisallowed: true, + allowedTargetConnectionIds: new Set(['c1']), }); expect(result).toMatchObject({ @@ -128,4 +131,46 @@ describe('integrateWorkUnitPatch', () => { expect(rawTrace).toContain('patch_policy_rejected'); expect(rawTrace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/c1/orders.yaml'); }); + + it('classifies unauthorized semantic-layer targets as traced textual conflicts', async () => { + const { homeDir, git, baseSha } = await makeRepo(); + const childDir = join(homeDir, 'child-target-policy'); + await git.addWorktree(childDir, 'child-target-policy', baseSha); + const childGit = git.forWorktree(childDir); + await mkdir(join(childDir, 'semantic-layer/finance'), { recursive: true }); + await writeFile( + join(childDir, 'semantic-layer/finance/orders.yaml'), + 'name: orders\ncolumns: []\njoins: []\nmeasures: []\n', + ); + await childGit.commitFiles(['semantic-layer/finance/orders.yaml'], 'unauthorized sl', 'System User', 'system@example.com'); + const patchPath = join(homeDir, 'patches/unauthorized.patch'); + await childGit.writeBinaryNoRenamePatch(baseSha, 'HEAD', patchPath); + const trace = new FileIngestTraceWriter({ + tracePath: join(homeDir, '.ktx/ingest-traces/job-target-policy/trace.jsonl'), + jobId: 'job-target-policy', + connectionId: 'c1', + sourceKey: 'fake', + level: 'trace', + }); + + const result = await integrateWorkUnitPatch({ + unitKey: 'wu-finance', + patchPath, + integrationGit: git, + trace, + author: { name: 'KTX Test', email: 'system@ktx.local' }, + validateAppliedTree: vi.fn().mockResolvedValue(undefined), + slDisallowed: false, + allowedTargetConnectionIds: new Set(['warehouse']), + }); + + expect(result).toMatchObject({ + status: 'textual_conflict', + touchedPaths: ['semantic-layer/finance/orders.yaml'], + }); + const rawTrace = await readFile(trace.tracePath, 'utf-8'); + expect(rawTrace).toContain('patch_policy_rejected'); + expect(rawTrace).toContain('semantic-layer target connection not allowed'); + expect(rawTrace).toContain('allowedTargetConnectionIds'); + }); }); diff --git a/packages/context/src/ingest/isolated-diff/patch-integrator.ts b/packages/context/src/ingest/isolated-diff/patch-integrator.ts index 54239ec3..4c6e46f5 100644 --- a/packages/context/src/ingest/isolated-diff/patch-integrator.ts +++ b/packages/context/src/ingest/isolated-diff/patch-integrator.ts @@ -16,6 +16,7 @@ export interface IntegrateWorkUnitPatchInput { trace: IngestTraceWriter; author: { name: string; email: string }; slDisallowed: boolean; + allowedTargetConnectionIds: ReadonlySet; validateAppliedTree(touchedPaths: string[]): Promise; } @@ -40,12 +41,14 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) unitKey: input.unitKey, patch, slDisallowed: input.slDisallowed, + allowedTargetConnectionIds: input.allowedTargetConnectionIds, }); } catch (error) { await input.trace.event('error', 'integration', 'patch_policy_rejected', { unitKey: input.unitKey, patchPath: input.patchPath, touchedPaths, + allowedTargetConnectionIds: [...input.allowedTargetConnectionIds].sort(), reason: errorMessage(error), }); return {