diff --git a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts index 53ca8718..a38eca29 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -272,6 +272,164 @@ describe('IngestBundleRunner isolated diff path', () => { } }); + it('rejects unchanged wiki body refs made stale by isolated semantic-layer changes', async () => { + const runtime = await makeRealGitRuntime(); + try { + await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true }); + await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); + await writeFile( + join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), + 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n', + ); + await writeFile( + join(runtime.configDir, 'wiki/global/account-segments.md'), + '---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n', + ); + await runtime.git.commitFiles( + ['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'], + 'seed existing wiki body ref', + 'KTX Test', + 'system@ktx.local', + ); + const preRunHead = await runtime.git.revParseHead(); + + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async () => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + await writeFile( + join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), + 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', + ); + addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); + currentSession.actions.push({ + target: 'sl', + type: 'updated', + key: 'mart_account_segments', + detail: 'Rename ARR measure', + targetConnectionId: 'warehouse', + rawPaths: ['cards/source.json'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/warehouse/mart_account_segments.yaml'], + 'wu source rename', + 'KTX Test', + 'system@ktx.local', + ); + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); + + await expect( + runner.run({ + jobId: 'job-existing-body-stale', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/total_contract_arr_cents/); + + expect(await runtime.git.revParseHead()).toBe(preRunHead); + const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-body-stale/trace.jsonl'), 'utf-8')) + .trim() + .split('\n') + .map((line) => JSON.parse(line)); + expect(events.map((event) => event.event)).toEqual( + expect.arrayContaining([ + 'final_artifact_gates_started', + 'final_artifact_gates_failed', + 'ingest_failed', + 'failure_report_created', + ]), + ); + expect(events.map((event) => event.event)).not.toContain('squash_finished'); + const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed'); + expect(gateFailure).toMatchObject({ + data: { + wikiReferenceGateScope: { + global: true, + reasons: expect.arrayContaining(['semantic_layer_changed']), + pageKeysValidated: expect.arrayContaining(['account-segments']), + }, + actionOrigins: expect.arrayContaining([ + expect.objectContaining({ + source: 'work_unit_action', + unitKey: 'source-only', + unitRawFiles: ['cards/source.json'], + action: expect.objectContaining({ + target: 'sl', + type: 'updated', + key: 'mart_account_segments', + rawPaths: ['cards/source.json'], + targetConnectionId: 'warehouse', + }), + }), + ]), + }, + error: { message: expect.stringContaining('total_contract_arr_cents') }, + }); + + const failureReport = (deps.reports.create as any).mock.calls + .map((call: any[]) => call[0]) + .find((report: any) => report.body.status === 'failed'); + expect(failureReport.body.failure).toMatchObject({ + phase: 'final_gates', + message: expect.stringContaining('total_contract_arr_cents'), + details: expect.objectContaining({ + wikiReferenceGateScope: expect.objectContaining({ + global: true, + reasons: expect.arrayContaining(['semantic_layer_changed']), + pageKeysValidated: expect.arrayContaining(['account-segments']), + }), + touchedSlSources: expect.arrayContaining([ + expect.objectContaining({ connectionId: 'warehouse', sourceName: 'mart_account_segments' }), + ]), + actionOrigins: expect.arrayContaining([ + expect.objectContaining({ + source: 'work_unit_action', + unitKey: 'source-only', + action: expect.objectContaining({ + target: 'sl', + type: 'updated', + key: 'mart_account_segments', + rawPaths: ['cards/source.json'], + targetConnectionId: 'warehouse', + }), + }), + ]), + }), + }); + expect(failureReport.body.workUnits).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + unitKey: 'source-only', + actions: expect.arrayContaining([ + expect.objectContaining({ + target: 'sl', + type: 'updated', + key: 'mart_account_segments', + rawPaths: ['cards/source.json'], + }), + ]), + }), + ]), + ); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + it('accepts two isolated work units that edit different wiki pages', async () => { const runtime = await makeRealGitRuntime(); try { @@ -950,6 +1108,159 @@ describe('IngestBundleRunner isolated diff path', () => { } }); + it('rejects unchanged inbound wiki refs broken by an isolated wiki deletion', async () => { + const runtime = await makeRealGitRuntime(); + try { + await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); + await writeFile( + join(runtime.configDir, 'wiki/global/source-page.md'), + '---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n', + ); + await writeFile( + join(runtime.configDir, 'wiki/global/account-segments.md'), + '---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n', + ); + await runtime.git.commitFiles( + ['wiki/global/source-page.md', 'wiki/global/account-segments.md'], + 'seed inbound wiki refs', + 'KTX Test', + 'system@ktx.local', + ); + const preRunHead = await runtime.git.revParseHead(); + + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'delete-target-page', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async (params: any) => { + if (params.telemetryTags.unitKey !== 'delete-target-page') { + return { stopReason: 'natural' }; + } + const root = rootOfConfig(currentSession.configService, runtime.configDir); + await rm(join(root, 'wiki/global/source-page.md'), { force: true }); + currentSession.actions.push({ + target: 'wiki', + type: 'removed', + key: 'source-page', + detail: 'Delete referenced page', + rawPaths: ['pages/delete.json'], + }); + await currentSession.gitService.commitFiles( + ['wiki/global/source-page.md'], + 'wu delete target page', + 'KTX Test', + 'system@ktx.local', + ); + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['pages/delete.json', 'h1']]); + + await expect( + runner.run({ + jobId: 'job-existing-wiki-ref-stale', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/); + + expect(await runtime.git.revParseHead()).toBe(preRunHead); + const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-wiki-ref-stale/trace.jsonl'), 'utf-8')) + .trim() + .split('\n') + .map((line) => JSON.parse(line)); + expect(events.map((event) => event.event)).toEqual( + expect.arrayContaining([ + 'final_artifact_gates_started', + 'final_artifact_gates_failed', + 'ingest_failed', + 'failure_report_created', + ]), + ); + expect(events.map((event) => event.event)).not.toContain('squash_finished'); + const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed'); + expect(gateFailure).toMatchObject({ + data: { + wikiReferenceGateScope: { + global: true, + reasons: expect.arrayContaining(['wiki_page_removed']), + removedWikiPageKeys: expect.arrayContaining(['source-page']), + pageKeysValidated: expect.arrayContaining(['account-segments']), + }, + actionOrigins: expect.arrayContaining([ + expect.objectContaining({ + source: 'work_unit_action', + unitKey: 'delete-target-page', + unitRawFiles: ['pages/delete.json'], + action: expect.objectContaining({ + target: 'wiki', + type: 'removed', + key: 'source-page', + rawPaths: ['pages/delete.json'], + }), + }), + ]), + }, + error: { message: expect.stringContaining('account-segments -> source-page') }, + }); + + const failureReport = (deps.reports.create as any).mock.calls + .map((call: any[]) => call[0]) + .find((report: any) => report.body.status === 'failed'); + expect(failureReport.body.failure).toMatchObject({ + phase: 'final_gates', + message: expect.stringContaining('account-segments -> source-page'), + details: expect.objectContaining({ + wikiReferenceGateScope: expect.objectContaining({ + global: true, + reasons: expect.arrayContaining(['wiki_page_removed']), + removedWikiPageKeys: expect.arrayContaining(['source-page']), + pageKeysValidated: expect.arrayContaining(['account-segments']), + }), + changedWikiPageKeys: expect.arrayContaining(['source-page']), + actionOrigins: expect.arrayContaining([ + expect.objectContaining({ + source: 'work_unit_action', + unitKey: 'delete-target-page', + action: expect.objectContaining({ + target: 'wiki', + type: 'removed', + key: 'source-page', + rawPaths: ['pages/delete.json'], + }), + }), + ]), + }), + }); + expect(failureReport.body.workUnits).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + unitKey: 'delete-target-page', + actions: expect.arrayContaining([ + expect.objectContaining({ + target: 'wiki', + type: 'removed', + key: 'source-page', + rawPaths: ['pages/delete.json'], + }), + ]), + }), + ]), + ); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => { const runtime = await makeRealGitRuntime(); try { diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index c88e0ad2..68ddd83b 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -669,6 +669,90 @@ export class IngestBundleRunner { }); } + private removedWikiPageKeysFromActions(actions: MemoryAction[]): string[] { + return this.uniqueWikiPageKeys( + actions.filter((action) => action.target === 'wiki' && action.type === 'removed').map((action) => action.key), + ); + } + + private finalGateActionOrigins(input: { + stageIndex: StageIndex; + reconcileActions: MemoryAction[]; + fallbackConnectionId: string; + }) { + const actionContext = (action: MemoryAction, fallbackRawPaths: string[]) => ({ + target: action.target, + type: action.type, + key: action.key, + detail: action.detail, + rawPaths: rawPathsForAction(action, fallbackRawPaths), + ...(action.target === 'sl' ? { targetConnectionId: actionTargetConnectionId(action, input.fallbackConnectionId) } : {}), + }); + + return [ + ...input.stageIndex.workUnits.flatMap((workUnit, unitIndex) => + workUnit.actions.map((action, actionIndex) => ({ + source: 'work_unit_action', + unitKey: workUnit.unitKey, + unitIndex, + unitRawFiles: workUnit.rawFiles, + actionIndex, + action: actionContext(action, workUnit.rawFiles), + })), + ), + ...input.reconcileActions.map((action, actionIndex) => ({ + source: 'reconciliation_action', + actionIndex, + action: actionContext(action, []), + })), + ]; + } + + private async wikiPageKeysForFinalGates(input: { + wikiService: ReturnType; + changedWikiPageKeys: string[]; + touchedSlSources: TouchedSlSource[]; + actions: MemoryAction[]; + }): Promise<{ + pageKeys: string[]; + trace: { + global: boolean; + reasons: string[]; + changedWikiPageKeys: string[]; + removedWikiPageKeys: string[]; + pageKeysValidated: string[]; + }; + }> { + const changedWikiPageKeys = this.uniqueWikiPageKeys(input.changedWikiPageKeys); + const removedWikiPageKeys = this.removedWikiPageKeysFromActions(input.actions); + const reasons: string[] = []; + if (input.touchedSlSources.length > 0) { + reasons.push('semantic_layer_changed'); + } + if (removedWikiPageKeys.length > 0) { + reasons.push('wiki_page_removed'); + } + + let pageKeys = changedWikiPageKeys; + if (reasons.length > 0) { + pageKeys = this.uniqueWikiPageKeys([ + ...changedWikiPageKeys, + ...(await input.wikiService.listPageKeys('GLOBAL', null)), + ]); + } + + return { + pageKeys, + trace: { + global: reasons.length > 0, + reasons, + changedWikiPageKeys, + removedWikiPageKeys, + pageKeysValidated: pageKeys, + }, + }; + } + private async runWorkUnitInWorktree(input: { job: IngestBundleJob; syncId: string; @@ -2087,7 +2171,7 @@ export class IngestBundleRunner { preReconciliationSha && postReconciliationSha && preReconciliationSha !== postReconciliationSha ? (await sessionWorktree.git.diffNameStatus(preReconciliationSha, postReconciliationSha)).map((entry) => entry.path) : []; - const finalChangedWikiPageKeys = this.uniqueWikiPageKeys([ + const baseFinalChangedWikiPageKeys = this.uniqueWikiPageKeys([ ...(isolatedDiffEnabled ? projectionChangedWikiPageKeys : []), ...workUnitOutcomes .flatMap((outcome) => outcome.patchTouchedPaths ?? []) @@ -2103,6 +2187,13 @@ export class IngestBundleRunner { ...this.touchedSlSourcesFromPaths(postReconciliationPaths), ...(postProcessorOutcome?.touchedSources ?? []), ]); + const finalWikiGateScope = await this.wikiPageKeysForFinalGates({ + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + changedWikiPageKeys: baseFinalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + actions: [...stageIndex.workUnits.flatMap((wu) => wu.actions), ...reconcileActions], + }); + const finalChangedWikiPageKeys = finalWikiGateScope.pageKeys; const finalTargetPolicyPaths = [ ...projectionTouchedPaths, @@ -2128,9 +2219,15 @@ export class IngestBundleRunner { const finalArtifactGateTraceData = { changedWikiPageKeys: finalChangedWikiPageKeys, + wikiReferenceGateScope: finalWikiGateScope.trace, touchedSlSources: finalTouchedSlSources, projectionTouchedPaths, workUnitPatchTouchedPaths: workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []), + actionOrigins: this.finalGateActionOrigins({ + stageIndex, + reconcileActions, + fallbackConnectionId: job.connectionId, + }), preReconciliationSha, postReconciliationSha, postReconciliationPaths,