diff --git a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts index 4a9119ff..b4b0e7fc 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -1,4 +1,4 @@ -import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; +import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; @@ -69,8 +69,29 @@ async function loadSourcesFromRoot(root: string) { }; } +async function listGlobalWikiPageKeys(root: string): Promise { + const dir = join(root, 'wiki/global'); + const entries = await readdir(dir).catch(() => []); + return entries + .filter((entry) => entry.endsWith('.md')) + .map((entry) => entry.slice(0, -'.md'.length)) + .sort(); +} + +function frontmatterList(yaml: string, key: string): string[] { + const pattern = new RegExp(`${key}:\\n((?: - .+\\n?)*)`); + return ( + pattern + .exec(yaml)?.[1] + ?.split('\n') + .map((line) => line.trim().replace(/^- /, '')) + .filter(Boolean) ?? [] + ); +} + function makeWikiService(root: string) { return { + listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])), readPage: vi.fn(async (_scope: string, _scopeId: string | null, key: string) => { const path = join(root, 'wiki/global', `${key}.md`); const raw = await readFile(path, 'utf-8').catch(() => null); @@ -78,15 +99,14 @@ function makeWikiService(root: string) { return null; } const [, yaml = '', content = ''] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/.exec(raw) ?? []; - const slRefs = - /sl_refs:\n((?: - .+\n?)*)/ - .exec(yaml)?.[1] - ?.split('\n') - .map((line) => line.trim().replace(/^- /, '')) - .filter(Boolean) ?? []; return { pageKey: key, - frontmatter: { summary: key, usage_mode: 'auto', sl_refs: slRefs }, + frontmatter: { + summary: key, + usage_mode: 'auto', + refs: frontmatterList(yaml, 'refs'), + sl_refs: frontmatterList(yaml, 'sl_refs'), + }, content: content.trim(), }; }), @@ -823,4 +843,277 @@ describe('IngestBundleRunner isolated diff path', () => { await rm(runtime.homeDir, { recursive: true, force: true }); } }); + + it('rejects final wiki refs broken by another accepted WorkUnit before squash', async () => { + const runtime = await makeRealGitRuntime(); + try { + await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); + await writeFile( + join(runtime.configDir, 'wiki/global/source-page.md'), + '---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n', + ); + await runtime.git.commitFiles(['wiki/global/source-page.md'], 'seed source page', 'KTX Test', 'system@ktx.local'); + const preRunHead = await runtime.git.revParseHead(); + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [ + { unitKey: 'page-ref', rawFiles: ['pages/ref.json'], peerFileIndex: [], dependencyPaths: [] }, + { unitKey: 'page-delete', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }, + ], + }); + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async (params: any) => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + if (params.telemetryTags.unitKey === 'page-ref') { + await mkdir(join(root, 'wiki/global'), { recursive: true }); + await writeFile( + join(root, 'wiki/global/account-segments.md'), + '---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n', + ); + currentSession.actions.push({ + target: 'wiki', + type: 'created', + key: 'account-segments', + detail: 'Page with wiki ref', + rawPaths: ['pages/ref.json'], + }); + await currentSession.gitService.commitFiles( + ['wiki/global/account-segments.md'], + 'wu page ref', + 'KTX Test', + 'system@ktx.local', + ); + } + if (params.telemetryTags.unitKey === 'page-delete') { + await rm(join(root, 'wiki/global/source-page.md'), { force: true }); + currentSession.actions.push({ + target: 'wiki', + type: 'removed', + key: 'source-page', + detail: 'Delete referenced page', + rawPaths: ['pages/delete.json'], + }); + await currentSession.gitService.commitFiles( + ['wiki/global/source-page.md'], + 'wu delete source page', + 'KTX Test', + 'system@ktx.local', + ); + } + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [ + ['pages/ref.json', 'h1'], + ['pages/delete.json', 'h2'], + ]); + + await expect( + runner.run({ + jobId: 'job-wiki-ref-conflict', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/); + + expect(await runtime.git.revParseHead()).toBe(preRunHead); + const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-wiki-ref-conflict/trace.jsonl'), 'utf-8'); + expect(trace).toContain('final_artifact_gates_failed'); + expect(trace).toContain('account-segments -> source-page'); + expect(trace).toContain('ingest_failed'); + expect(trace).toContain('failure_report_created'); + expect(trace).not.toContain('squash_finished'); + + const failureReport = (deps.reports.create as any).mock.calls + .map((call: any[]) => call[0]) + .find((report: any) => report.body.status === 'failed'); + expect(failureReport.body.failure).toMatchObject({ + phase: 'final_gates', + message: expect.stringContaining('account-segments -> source-page'), + details: expect.objectContaining({ + changedWikiPageKeys: expect.arrayContaining(['account-segments']), + workUnitPatchTouchedPaths: expect.arrayContaining([ + 'wiki/global/account-segments.md', + 'wiki/global/source-page.md', + ]), + }), + }); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + + it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'finance-source', rawFiles: ['cards/finance.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async () => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + await mkdir(join(root, 'semantic-layer/finance'), { recursive: true }); + await writeFile( + join(root, 'semantic-layer/finance/orders.yaml'), + 'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n', + ); + addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'orders'); + currentSession.actions.push({ + target: 'sl', + type: 'created', + key: 'orders', + detail: 'Unauthorized target', + targetConnectionId: 'finance', + rawPaths: ['cards/finance.json'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/finance/orders.yaml'], + 'wu unauthorized target', + 'KTX Test', + 'system@ktx.local', + ); + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['cards/finance.json', 'h1']]); + const preRunHead = await runtime.git.revParseHead(); + + await expect( + runner.run({ + jobId: 'job-unauthorized-wu-target', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/isolated diff textual conflict.*semantic-layer target connection not allowed/); + + expect(await runtime.git.revParseHead()).toBe(preRunHead); + const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-wu-target/trace.jsonl'), 'utf-8'); + expect(trace).toContain('patch_policy_rejected'); + expect(trace).toContain('semantic-layer/finance/orders.yaml'); + expect(trace).toContain('allowedTargetConnectionIds'); + expect(trace).toContain('failure_report_created'); + expect(trace).not.toContain('squash_finished'); + + const failureReport = (deps.reports.create as any).mock.calls + .map((call: any[]) => call[0]) + .find((report: any) => report.body.status === 'failed'); + expect(failureReport.body.failure).toMatchObject({ + phase: 'integration', + message: expect.stringContaining('semantic-layer target connection not allowed'), + }); + expect(failureReport.body.failure.details).toMatchObject({ + unitKey: 'finance-source', + allowedTargetConnectionIds: ['warehouse'], + touchedPaths: ['semantic-layer/finance/orders.yaml'], + reason: expect.stringContaining('semantic-layer/finance/orders.yaml (finance)'), + }); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + + it('rejects reconciliation mutations that touch unauthorized semantic-layer target connections before squash', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'valid-page', rawFiles: ['pages/source.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async (params: any) => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + if (params.telemetryTags.operationName === 'ingest-bundle-wu') { + await mkdir(join(root, 'wiki/global'), { recursive: true }); + await writeFile(join(root, 'wiki/global/valid-page.md'), '---\nsummary: Valid page\nusage_mode: auto\n---\n\nValid\n'); + currentSession.actions.push({ + target: 'wiki', + type: 'created', + key: 'valid-page', + detail: 'Valid page', + rawPaths: ['pages/source.json'], + }); + await currentSession.gitService.commitFiles(['wiki/global/valid-page.md'], 'wu valid page', 'KTX Test', 'system@ktx.local'); + } else { + await mkdir(join(root, 'semantic-layer/finance'), { recursive: true }); + await writeFile( + join(root, 'semantic-layer/finance/reconcile_orders.yaml'), + 'name: reconcile_orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n', + ); + addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'reconcile_orders'); + currentSession.actions.push({ + target: 'sl', + type: 'created', + key: 'reconcile_orders', + detail: 'Unauthorized reconcile target', + targetConnectionId: 'finance', + rawPaths: ['pages/source.json'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/finance/reconcile_orders.yaml'], + 'reconcile unauthorized target', + 'KTX Test', + 'system@ktx.local', + ); + } + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['pages/source.json', 'h1']]); + const preRunHead = await runtime.git.revParseHead(); + + await expect( + runner.run({ + jobId: 'job-unauthorized-reconcile-target', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/semantic-layer target connection not allowed/); + + expect(await runtime.git.revParseHead()).toBe(preRunHead); + const trace = await readFile( + join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-reconcile-target/trace.jsonl'), + 'utf-8', + ); + expect(trace).toContain('semantic_layer_target_policy_started'); + expect(trace).toContain('semantic_layer_target_policy_failed'); + expect(trace).toContain('semantic-layer/finance/reconcile_orders.yaml'); + expect(trace).toContain('ingest_failed'); + expect(trace).not.toContain('squash_finished'); + const failureReport = (deps.reports.create as any).mock.calls + .map((call: any[]) => call[0]) + .find((report: any) => report.body.status === 'failed'); + expect(failureReport.body.failure).toMatchObject({ + phase: 'target_policy', + message: expect.stringContaining('semantic-layer target connection not allowed'), + }); + expect(failureReport.body.failure.details).toMatchObject({ + allowedTargetConnectionIds: ['warehouse'], + touchedPaths: expect.arrayContaining(['semantic-layer/finance/reconcile_orders.yaml']), + }); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); }); diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index d0c15ebe..c88e0ad2 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -2129,6 +2129,8 @@ export class IngestBundleRunner { const finalArtifactGateTraceData = { changedWikiPageKeys: finalChangedWikiPageKeys, touchedSlSources: finalTouchedSlSources, + projectionTouchedPaths, + workUnitPatchTouchedPaths: workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []), preReconciliationSha, postReconciliationSha, postReconciliationPaths,