import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; import { GitService } from '../../../src/context/core/git.service.js'; import { SessionWorktreeService } from '../../../src/context/core/session-worktree.service.js'; import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js'; import { addTouchedSlSource } from '../../../src/context/tools/touched-sl-sources.js'; import { IngestBundleRunner } from '../../../src/context/ingest/ingest-bundle.runner.js'; import type { IngestBundleRunnerDeps } from '../../../src/context/ingest/ports.js'; async function makeRealGitRuntime() { const homeDir = await mkdtemp(join(tmpdir(), 'ktx-isolated-runner-')); const configDir = join(homeDir, 'config'); const git = new GitService({ storage: { configDir, homeDir }, git: { userName: 'System User', userEmail: 'system@example.com', bootstrapMessage: 'init', bootstrapAuthor: 'system', bootstrapAuthorEmail: 'system@example.com', }, }); await git.onModuleInit(); const configService = new LocalGitFileStore({ rootDir: configDir, git }); const sessionWorktreeService = new SessionWorktreeService({ coreConfig: { storage: { configDir, homeDir }, git: { userName: 'System User', userEmail: 'system@example.com', bootstrapMessage: 'init', bootstrapAuthor: 'system', bootstrapAuthorEmail: 'system@example.com', }, }, gitService: git, configService, }); return { homeDir, configDir, git, configService, sessionWorktreeService }; } function rootOfConfig(configService: unknown, fallback: string): string { const rootDir = (configService as { rootDir?: unknown }).rootDir; return typeof rootDir === 'string' ? rootDir : fallback; } async function loadSourcesFromRoot(root: string) { const raw = await readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8').catch( () => '', ); const hasCents = raw.includes('total_contract_arr_cents'); const hasDollars = raw.includes('total_contract_arr'); return { sources: hasCents || hasDollars ? [ { name: 'mart_account_segments', grain: ['account_id'], columns: [{ name: 'account_id', type: 'string' }], joins: [], measures: [{ name: hasCents ? 'total_contract_arr_cents' : 'total_contract_arr', expr: 'sum(contract_arr)' }], table: 'analytics.mart_account_segments', }, ] : [], loadErrors: [], }; } async function listGlobalWikiPageKeys(root: string): Promise { const dir = join(root, 'wiki/global'); const entries = await readdir(dir).catch(() => []); return entries .filter((entry) => entry.endsWith('.md')) .map((entry) => entry.slice(0, -'.md'.length)) .sort(); } function frontmatterList(yaml: string, key: string): string[] { const pattern = new RegExp(`(?:^|\\n)${key}:\\n((?: - .+\\n?)*)`); return ( pattern .exec(yaml)?.[1] ?.split('\n') .map((line) => line.trim().replace(/^- /, '')) .filter(Boolean) ?? [] ); } function legacyFallbackSettingKey(): string { return ['sharedWorktree', 'SourceKeys'].join(''); } function legacySharedTraceEvent(): string { return ['shared', 'worktree', 'path', 'enabled'].join('_'); } function makeWikiService(root: string) { return { listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])), readPage: vi.fn(async (_scope: string, _scopeId: string | null, key: string) => { const path = join(root, 'wiki/global', `${key}.md`); const raw = await readFile(path, 'utf-8').catch(() => null); if (!raw) { return null; } const [, yaml = '', content = ''] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/.exec(raw) ?? []; return { pageKey: key, frontmatter: { summary: key, usage_mode: 'auto', refs: frontmatterList(yaml, 'refs'), sl_refs: frontmatterList(yaml, 'sl_refs'), }, content: content.trim(), }; }), writePage: vi.fn( async ( _scope: string, _scopeId: string | null, key: string, frontmatter: { summary?: string; usage_mode?: string; refs?: string[]; sl_refs?: string[] }, content: string, ) => { await mkdir(join(root, 'wiki/global'), { recursive: true }); const refs = (frontmatter.refs ?? []).map((ref) => ` - ${ref}`).join('\n'); const slRefs = (frontmatter.sl_refs ?? []).map((ref) => ` - ${ref}`).join('\n'); await writeFile( join(root, 'wiki/global', `${key}.md`), [ '---', `summary: ${frontmatter.summary ?? key}`, `usage_mode: ${frontmatter.usage_mode ?? 'auto'}`, 'refs:', refs, 'sl_refs:', slRefs, '---', '', content, '', ].join('\n'), ); }, ), syncFromCommit: vi.fn(), }; } function makeDeps( runtime: Awaited>, sourceKey = 'metabase', settings: Partial = {}, ) { const adapter: any = { source: sourceKey, skillNames: [], detect: vi.fn().mockResolvedValue(true), chunk: vi.fn().mockResolvedValue({ workUnits: [ { unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] }, { unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }, ], }), }; const wikiService = makeWikiService(runtime.configDir); const semanticLayerService: any = { loadAllSources: vi.fn(async () => loadSourcesFromRoot(runtime.configDir)), listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']), }; semanticLayerService.forWorktree = vi.fn((workdir: string) => ({ ...semanticLayerService, loadAllSources: vi.fn(async () => loadSourcesFromRoot(workdir)), listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']), })); const deps: IngestBundleRunnerDeps = { runs: { create: vi.fn().mockResolvedValue({ id: 'run-1' }), markCompleted: vi.fn(), markFailed: vi.fn() }, provenance: { insertMany: vi.fn(), findLatestHashesForCompletedSyncs: vi.fn().mockResolvedValue(new Map()), findLatestArtifactsForRawPaths: vi.fn().mockResolvedValue(new Map()), }, reports: { create: vi.fn().mockResolvedValue({ id: 'report-1' }), findByJobId: vi.fn().mockResolvedValue(null), markSuperseded: vi.fn() }, canonicalPins: { listPins: vi.fn().mockResolvedValue([]) }, registry: { get: vi.fn().mockReturnValue(adapter), register: vi.fn(), has: vi.fn(), list: vi.fn() }, diffSetService: { compute: vi.fn().mockResolvedValue({ added: ['cards/wiki.json', 'cards/source.json'], modified: [], deleted: [], unchanged: [] }), }, sessionWorktreeService: runtime.sessionWorktreeService, agentRunner: { runLoop: vi.fn() }, gitService: runtime.git, lockingService: { withLock: vi.fn(async (_key, fn) => fn()) }, storage: { homeDir: join(runtime.configDir, '.ktx'), systemGitAuthor: { name: 'KTX Test', email: 'system@ktx.local' }, resolveUploadDir: (id) => join(runtime.homeDir, 'upload', id), resolvePullDir: (id) => join(runtime.homeDir, 'pull', id), resolveTranscriptDir: (id) => join(runtime.configDir, '.ktx/ingest-transcripts', id), resolveTracePath: (id) => join(runtime.configDir, '.ktx/ingest-traces', id, 'trace.jsonl'), }, settings: { memoryIngestionModel: 'test', probeRowCount: 1, ingestTraceLevel: 'trace', ...settings, }, skillsRegistry: { listSkills: vi.fn().mockResolvedValue([]), getSkill: vi.fn().mockResolvedValue(null), buildSkillsPrompt: vi.fn().mockReturnValue(''), stripFrontmatter: vi.fn((body) => body), } as never, promptService: { loadPrompt: vi.fn().mockResolvedValue('base') } as never, wikiService: { ...wikiService, forWorktree: vi.fn((workdir: string) => makeWikiService(workdir)) } as never, knowledgeIndex: { listPagesForUser: vi.fn().mockResolvedValue([]) }, knowledgeSlRefs: { syncFromWiki: vi.fn() }, semanticLayerService, slSearchService: { indexSources: vi.fn() } as never, slSourcesRepository: {} as never, slValidator: { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) }, connections: { listEnabledConnections: vi.fn().mockResolvedValue([]), getConnectionById: vi.fn() } as never, toolsetFactory: { createIngestWuToolset: vi.fn(() => ({ toRuntimeTools: vi.fn(() => ({})) })) }, commitMessages: { enqueueForExternalCommit: vi.fn() }, embedding: { maxBatchSize: 64, computeEmbedding: vi.fn(), computeEmbeddingsBulk: vi.fn() }, }; return { deps, adapter }; } async function mockStageRawFiles( runner: IngestBundleRunner, runtime: Awaited>, hashes: [string, string][], sourceKey = 'metabase', ) { (runner as any).resolveStagedDir = vi.fn().mockResolvedValue(join(runtime.homeDir, 'stage')); (runner as any).stageRawFilesStage1 = vi.fn(async ({ worktreeRoot }: any) => { const rawDir = join(worktreeRoot, 'raw-sources/warehouse', sourceKey, 's'); await mkdir(rawDir, { recursive: true }); for (const [rawPath] of hashes) { await mkdir(join(rawDir, rawPath.split('/').slice(0, -1).join('/')), { recursive: true }); await writeFile(join(rawDir, rawPath), '{}'); } return { currentHashes: new Map(hashes), rawDirInWorktree: `raw-sources/warehouse/${sourceKey}/s` }; }); } describe('IngestBundleRunner isolated diff path', () => { it('routes an unlisted direct-writing source through isolated diffs by default', async () => { const runtime = await makeRealGitRuntime(); try { const sourceKey = 'custom-direct-source'; const { deps, adapter } = makeDeps(runtime, sourceKey); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'custom-wiki', rawFiles: ['custom/page.json'], peerFileIndex: [], dependencyPaths: [], }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName !== 'ingest-bundle-wu') { return { stopReason: 'natural' }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/custom-isolated.md'), '---\nsummary: Custom isolated write\nusage_mode: auto\n---\n\nCustom isolated write.\n', 'utf-8', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'custom-isolated', detail: 'Custom isolated write', rawPaths: ['custom/page.json'], }); await currentSession.gitService.commitFiles( ['wiki/global/custom-isolated.md'], 'custom wiki', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['custom/page.json', 'h1']], sourceKey); await expect( runner.run({ jobId: 'job-custom-default', connectionId: 'warehouse', sourceKey, trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).resolves.toMatchObject({ jobId: 'job-custom-default', failedWorkUnits: [], workUnitCount: 1, }); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-custom-default/trace.jsonl'), 'utf-8', ); expect(trace).toContain('isolated_diff_enabled'); expect(trace).toContain('work_unit_child_created'); expect(trace).not.toContain(legacySharedTraceEvent()); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined; expect(reportBody?.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1, }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('does not support shared-worktree fallback settings', async () => { const runtime = await makeRealGitRuntime(); try { const sourceKey = 'legacy-source'; const staleSettings = { [legacyFallbackSettingKey()]: ['legacy-source'], } as Partial & Record; const { deps, adapter } = makeDeps(runtime, sourceKey, staleSettings); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'legacy-wiki', rawFiles: ['legacy/page.json'], peerFileIndex: [], dependencyPaths: [], }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName !== 'ingest-bundle-wu') { return { stopReason: 'natural' }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/legacy-isolated.md'), '---\nsummary: Legacy isolated write\nusage_mode: auto\n---\n\nLegacy isolated write.\n', 'utf-8', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'legacy-isolated', detail: 'Legacy isolated write', rawPaths: ['legacy/page.json'], }); await currentSession.gitService.commitFiles( ['wiki/global/legacy-isolated.md'], 'legacy isolated wiki', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['legacy/page.json', 'h1']], sourceKey); await expect( runner.run({ jobId: 'job-legacy-isolated', connectionId: 'warehouse', sourceKey, trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).resolves.toMatchObject({ jobId: 'job-legacy-isolated', failedWorkUnits: [], workUnitCount: 1, }); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-legacy-isolated/trace.jsonl'), 'utf-8', ); expect(trace).toContain('isolated_diff_enabled'); expect(trace).toContain('work_unit_child_created'); expect(trace).not.toContain(legacySharedTraceEvent()); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined; expect(reportBody?.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1, }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('does not integrate failed isolated WorkUnit patches', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime, 'fake'); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] }, { unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] }, ], }); deps.diffSetService.compute = vi.fn().mockResolvedValue({ added: ['good.raw', 'bad.raw'], modified: [], deleted: [], unchanged: [], }); deps.slValidator.validateSingleSource = vi.fn( async (_validationDeps: unknown, _connectionId: string, sourceName: string) => ({ errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [], warnings: [], }), ) as never; let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName !== 'ingest-bundle-wu') { return { stopReason: 'natural' }; } const unitKey = params.telemetryTags.unitKey; const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); if (unitKey === 'wu-good') { await writeFile(join(root, 'semantic-layer/warehouse/good.yaml'), 'name: good\n', 'utf-8'); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'good'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'good', detail: 'good source', targetConnectionId: 'warehouse', rawPaths: ['good.raw'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/good.yaml'], 'test: add good source', 'KTX Test', 'system@ktx.local', ); } if (unitKey === 'wu-bad') { await writeFile(join(root, 'semantic-layer/warehouse/bad.yaml'), 'name: bad\n', 'utf-8'); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'bad'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'bad', detail: 'bad source', targetConnectionId: 'warehouse', rawPaths: ['bad.raw'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/bad.yaml'], 'test: add bad source', 'KTX Test', 'system@ktx.local', ); } return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles( runner, runtime, [ ['good.raw', 'good-hash'], ['bad.raw', 'bad-hash'], ], 'fake', ); const result = await runner.run({ jobId: 'job-failed-wu-isolated', connectionId: 'warehouse', sourceKey: 'fake', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }); expect(result.failedWorkUnits).toEqual(['wu-bad']); await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/good.yaml'), 'utf-8')).resolves.toContain( 'good', ); await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/bad.yaml'), 'utf-8')).rejects.toThrow(); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: { acceptedPatches?: number }; failedWorkUnits?: string[]; }; expect(reportBody.failedWorkUnits).toEqual(['wu-bad']); expect(reportBody.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1 }); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-failed-wu-isolated/trace.jsonl'), 'utf-8', ); expect(trace).toContain('work_unit_failed_before_patch'); expect(trace).toContain('patch_accepted'); expect(trace).not.toContain(legacySharedTraceEvent()); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it.each(['notion', 'lookml', 'looker', 'dbt', 'metricflow'] as const)( 'routes %s direct writes through isolated child worktrees', async (sourceKey) => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime, sourceKey); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: `${sourceKey}-wiki`, rawFiles: [`${sourceKey}/page.json`], peerFileIndex: [], dependencyPaths: [], }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName !== 'ingest-bundle-wu') { return { stopReason: 'natural' }; } expect(params.telemetryTags).toMatchObject({ operationName: 'ingest-bundle-wu', source: sourceKey, unitKey: `${sourceKey}-wiki`, }); const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global', `${sourceKey}-isolated.md`), `---\nsummary: ${sourceKey} isolated write\nusage_mode: auto\n---\n\nIsolated ${sourceKey} write.\n`, 'utf-8', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: `${sourceKey}-isolated`, detail: `${sourceKey} isolated write`, rawPaths: [`${sourceKey}/page.json`], }); await currentSession.gitService.commitFiles( [`wiki/global/${sourceKey}-isolated.md`], `${sourceKey} wiki`, 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [[`${sourceKey}/page.json`, 'h1']], sourceKey); await expect( runner.run({ jobId: `job-${sourceKey}`, connectionId: 'warehouse', sourceKey, trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).resolves.toMatchObject({ jobId: `job-${sourceKey}`, failedWorkUnits: [], workUnitCount: 1, }); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces', `job-${sourceKey}`, 'trace.jsonl'), 'utf-8', ); expect(trace).toContain('isolated_diff_enabled'); expect(trace).toContain('work_unit_child_created'); expect(trace).toContain('work_unit_patch_collected'); expect(trace).toContain('patch_apply_started'); expect(trace).not.toContain(legacySharedTraceEvent()); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined; expect(reportBody?.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1, }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }, ); it('rejects the Metabase stale-measure wiki body regression before squash', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.project = vi.fn(async ({ workdir }) => { await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true }); await writeFile( join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n', ); return { warnings: [], errors: [], touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }], changedWikiPageKeys: [], }; }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { const root = rootOfConfig(currentSession.configService, runtime.configDir); if (params.telemetryTags.unitKey === 'card-wiki') { await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Account segments' }); await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local'); } if (params.telemetryTags.unitKey === 'card-source') { await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'updated', key: 'mart_account_segments', detail: 'Dollar measure', targetConnectionId: 'warehouse', }); await currentSession.gitService.commitFiles(['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source', 'KTX Test', 'system@ktx.local'); } return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [ ['cards/wiki.json', 'h1'], ['cards/source.json', 'h2'], ]); await expect( runner.run({ jobId: 'job-1', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }), ).rejects.toThrow(/total_contract_arr_cents/); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-1/trace.jsonl'), 'utf-8'); expect(trace).toContain('input_snapshot'); expect(trace).toContain('isolated_diff_enabled'); expect(trace).toContain('work_unit_child_created'); expect(trace).toContain('work_unit_patch_collected'); expect(trace).toContain('patch_apply_started'); expect(trace).toContain('final_artifact_gates_failed'); expect(trace).toContain('ingest_failed'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects unchanged wiki body refs made stale by isolated semantic-layer changes', async () => { const runtime = await makeRealGitRuntime(); try { await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true }); await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); await writeFile( join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n', ); await writeFile( join(runtime.configDir, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n', ); await runtime.git.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'], 'seed existing wiki body ref', 'KTX Test', 'system@ktx.local', ); const preRunHead = await runtime.git.revParseHead(); const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'updated', key: 'mart_account_segments', detail: 'Rename ARR measure', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source rename', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); await expect( runner.run({ jobId: 'job-existing-body-stale', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/total_contract_arr_cents/); expect(await runtime.git.revParseHead()).toBe(preRunHead); const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-body-stale/trace.jsonl'), 'utf-8')) .trim() .split('\n') .map((line) => JSON.parse(line)); expect(events.map((event) => event.event)).toEqual( expect.arrayContaining([ 'final_artifact_gates_started', 'final_artifact_gates_failed', 'ingest_failed', 'failure_report_created', ]), ); expect(events.map((event) => event.event)).not.toContain('squash_finished'); const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed'); expect(gateFailure).toMatchObject({ data: { wikiReferenceGateScope: { global: true, reasons: expect.arrayContaining(['semantic_layer_changed']), pageKeysValidated: expect.arrayContaining(['account-segments']), }, actionOrigins: expect.arrayContaining([ expect.objectContaining({ source: 'work_unit_action', unitKey: 'source-only', unitRawFiles: ['cards/source.json'], action: expect.objectContaining({ target: 'sl', type: 'updated', key: 'mart_account_segments', rawPaths: ['cards/source.json'], targetConnectionId: 'warehouse', }), }), ]), }, error: { message: expect.stringContaining('total_contract_arr_cents') }, }); const failureReport = (deps.reports.create as any).mock.calls .map((call: any[]) => call[0]) .find((report: any) => report.body.status === 'failed'); expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates', message: expect.stringContaining('total_contract_arr_cents'), details: expect.objectContaining({ wikiReferenceGateScope: expect.objectContaining({ global: true, reasons: expect.arrayContaining(['semantic_layer_changed']), pageKeysValidated: expect.arrayContaining(['account-segments']), }), touchedSlSources: expect.arrayContaining([ expect.objectContaining({ connectionId: 'warehouse', sourceName: 'mart_account_segments' }), ]), actionOrigins: expect.arrayContaining([ expect.objectContaining({ source: 'work_unit_action', unitKey: 'source-only', action: expect.objectContaining({ target: 'sl', type: 'updated', key: 'mart_account_segments', rawPaths: ['cards/source.json'], targetConnectionId: 'warehouse', }), }), ]), }), }); expect(failureReport.body.workUnits).toEqual( expect.arrayContaining([ expect.objectContaining({ unitKey: 'source-only', actions: expect.arrayContaining([ expect.objectContaining({ target: 'sl', type: 'updated', key: 'mart_account_segments', rawPaths: ['cards/source.json'], }), ]), }), ]), ); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('accepts two isolated work units that edit different wiki pages', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'page-a', rawFiles: ['pages/a.json'], peerFileIndex: [], dependencyPaths: [] }, { unitKey: 'page-b', rawFiles: ['pages/b.json'], peerFileIndex: [], dependencyPaths: [] }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { const unitKey = params.telemetryTags.unitKey; const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile(join(root, `wiki/global/${unitKey}.md`), `---\nsummary: ${unitKey}\nusage_mode: auto\n---\n\n${unitKey}\n`); currentSession.actions.push({ target: 'wiki', type: 'created', key: unitKey, detail: unitKey }); await currentSession.gitService.commitFiles([`wiki/global/${unitKey}.md`], `wu ${unitKey}`, 'KTX Test', 'system@ktx.local'); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [ ['pages/a.json', 'h1'], ['pages/b.json', 'h2'], ]); const result = await runner.run({ jobId: 'job-clean', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }); expect(result.failedWorkUnits).toEqual([]); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-clean/trace.jsonl'), 'utf-8'); expect(trace.match(/patch_accepted/g)).toHaveLength(2); expect(trace).toContain('ingest_finished'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('classifies same-source patch application failure as a textual conflict', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'orders-a', rawFiles: ['orders/a.json'], peerFileIndex: [], dependencyPaths: [] }, { unitKey: 'orders-b', rawFiles: ['orders/b.json'], peerFileIndex: [], dependencyPaths: [] }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') { return { stopReason: 'natural' }; } const suffix = params.telemetryTags.unitKey === 'orders-a' ? 'a' : 'b'; const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); await writeFile( join(root, 'semantic-layer/warehouse/orders.yaml'), `name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures:\n - name: order_count_${suffix}\n expr: count(*)\n`, ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'orders'); currentSession.actions.push({ target: 'sl', type: 'updated', key: 'orders', detail: suffix, targetConnectionId: 'warehouse' }); await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], `wu ${suffix}`, 'KTX Test', 'system@ktx.local'); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [ ['orders/a.json', 'h1'], ['orders/b.json', 'h2'], ]); await expect( runner.run({ jobId: 'job-text-conflict', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }), ).rejects.toThrow(/isolated diff textual conflict/); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-text-conflict/trace.jsonl'), 'utf-8'); expect(trace).toContain('patch_textual_conflict'); expect(trace).toContain('textual_conflict_resolver_failed'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('makes deterministic projection visible to child worktrees before WorkUnit synthesis', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'wiki-projected', rawFiles: ['projected/wiki.json'], peerFileIndex: [], dependencyPaths: [] }], }); adapter.project = vi.fn(async ({ workdir }) => { await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true }); await writeFile( join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); return { warnings: [], errors: [], touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }], changedWikiPageKeys: [], }; }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await expect(readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8')).resolves.toContain( 'total_contract_arr', ); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/projected-orders.md'), '---\nsummary: Projected orders\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR `mart_account_segments.total_contract_arr`.\n', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'projected-orders', detail: 'Projected orders' }); await currentSession.gitService.commitFiles(['wiki/global/projected-orders.md'], 'wu projected wiki', 'KTX Test', 'system@ktx.local'); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['projected/wiki.json', 'h1']]); const result = await runner.run({ jobId: 'job-projection', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }); expect(result.failedWorkUnits).toEqual([]); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-projection/trace.jsonl'), 'utf-8'); expect(trace).toContain('deterministic_projection_finished'); expect(trace).toContain('deterministic_projection_committed'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects Notion-style changed wiki pages with invalid sl_refs', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'notion-page', rawFiles: ['pages/notion.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') { return { stopReason: 'natural' as const }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile(join(root, 'wiki/global/notion-page.md'), '---\nsummary: Notion page\nusage_mode: auto\nsl_refs:\n - missing_source\n---\n\nBody\n'); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'notion-page', detail: 'Notion page' }); await currentSession.gitService.commitFiles(['wiki/global/notion-page.md'], 'wu notion', 'KTX Test', 'system@ktx.local'); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['pages/notion.json', 'h1']]); await expect( runner.run({ jobId: 'job-invalid-slrefs', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }), ).rejects.toThrow(/gate repair completed without editing an allowed path/); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('runs final artifact gates after reconciliation mutates the integration tree', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { const root = rootOfConfig(currentSession.configService, runtime.configDir); if (params.telemetryTags.operationName === 'ingest-bundle-wu') { await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'mart_account_segments', detail: 'Source with renamed ARR measure', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source', 'KTX Test', 'system@ktx.local', ); } else { await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nReconcile wrote stale ARR `mart_account_segments.total_contract_arr_cents`.\n', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Stale reconcile wiki page', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'reconcile wiki', 'KTX Test', 'system@ktx.local'); } return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); await expect( runner.run({ jobId: 'job-reconcile-stale', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/total_contract_arr_cents/); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-reconcile-stale/trace.jsonl'), 'utf-8'); expect(trace).toContain('reconciliation_finished'); expect(trace).toContain('final_artifact_gates_failed'); expect(trace).toContain('ingest_failed'); expect(await runtime.git.revParseHead()).not.toContain('reconcile wiki'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('stores a failure report and postmortem trace for final gate failures', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); const createdReports: any[] = []; deps.reports.create = vi.fn(async (args: any) => { createdReports.push(args); return { id: `report-${createdReports.length}` }; }); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] }, { unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { const root = rootOfConfig(currentSession.configService, runtime.configDir); if (params.telemetryTags.unitKey === 'card-wiki') { await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Account segments', rawPaths: ['cards/wiki.json'], }); await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local'); } if (params.telemetryTags.unitKey === 'card-source') { await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'mart_account_segments', detail: 'Dollar measure', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source', 'KTX Test', 'system@ktx.local', ); } return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [ ['cards/wiki.json', 'h1'], ['cards/source.json', 'h2'], ]); await expect( runner.run({ jobId: 'job-trace-failure', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/total_contract_arr_cents/); const failureReport = createdReports.find((report) => report.body.status === 'failed'); expect(failureReport.body.tracePath).toContain('job-trace-failure/trace.jsonl'); expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates' }); const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-trace-failure/trace.jsonl'), 'utf-8')) .trim() .split('\n') .map((line) => JSON.parse(line)); expect(events.map((event) => event.event)).toEqual( expect.arrayContaining([ 'ingest_started', 'input_snapshot', 'work_units_planned', 'isolated_diff_enabled', 'work_unit_child_created', 'work_unit_patch_collected', 'patch_apply_started', 'patch_accepted', 'reconciliation_finished', 'final_artifact_gates_failed', 'ingest_failed', 'failure_report_created', ]), ); const failed = events.find((event) => event.event === 'ingest_failed'); expect(failed).toMatchObject({ runId: 'run-1', syncId: expect.any(String), data: { phase: 'final_gates', tracePath: expect.stringContaining('trace.jsonl') }, error: { message: expect.stringContaining('total_contract_arr_cents') }, }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects invalid provenance raw paths before squash reaches main', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); const createdReports: any[] = []; deps.reports.create = vi.fn(async (args: any) => { createdReports.push(args); return { id: `report-${createdReports.length}` }; }); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'card-valid-artifacts', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [], }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); await writeFile( join(root, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr`.\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'mart_account_segments', detail: 'Valid source', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Valid wiki with invalid provenance raw path', rawPaths: ['cards/missing.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'], 'valid artifacts with invalid provenance', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); const preRunHead = await runtime.git.revParseHead(); await expect( runner.run({ jobId: 'job-invalid-provenance', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/provenance row references raw path outside this snapshot: cards\/missing\.json/); expect(await runtime.git.revParseHead()).toBe(preRunHead); expect(deps.provenance.insertMany).not.toHaveBeenCalled(); const failureReport = createdReports.find((report) => report.body.status === 'failed'); expect(failureReport.body.tracePath).toContain('job-invalid-provenance/trace.jsonl'); expect(failureReport.body.failure).toMatchObject({ phase: 'provenance_validation', message: expect.stringContaining('cards/missing.json'), }); expect(failureReport.body.failure.details).toMatchObject({ invalidRawPaths: ['cards/missing.json'], currentRawPaths: ['cards/source.json'], invalidRows: expect.arrayContaining([ expect.objectContaining({ row: expect.objectContaining({ rawPath: 'cards/missing.json', artifactKind: 'wiki', artifactKey: 'account-segments', actionType: 'wiki_written', }), origin: expect.objectContaining({ source: 'work_unit_action', unitKey: 'card-valid-artifacts', actionIndex: 1, unitRawFiles: ['cards/source.json'], action: expect.objectContaining({ target: 'wiki', type: 'created', key: 'account-segments', rawPaths: ['cards/missing.json'], }), }), }), ]), }); expect(failureReport.body.provenanceRows).toEqual( expect.arrayContaining([ expect.objectContaining({ rawPath: 'cards/source.json', artifactKind: 'sl', artifactKey: 'mart_account_segments' }), expect.objectContaining({ rawPath: 'cards/missing.json', artifactKind: 'wiki', artifactKey: 'account-segments' }), ]), ); expect(failureReport.body.workUnits).toEqual( expect.arrayContaining([ expect.objectContaining({ unitKey: 'card-valid-artifacts', rawFiles: ['cards/source.json'], actions: expect.arrayContaining([ expect.objectContaining({ target: 'wiki', key: 'account-segments', rawPaths: ['cards/missing.json'], }), ]), }), ]), ); const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-invalid-provenance/trace.jsonl'), 'utf-8')) .trim() .split('\n') .map((line) => JSON.parse(line)); expect(events.map((event) => event.event)).toEqual( expect.arrayContaining([ 'final_artifact_gates_finished', 'provenance_rows_validation_failed', 'ingest_failed', 'failure_report_created', ]), ); expect(events.map((event) => event.event)).not.toContain('squash_finished'); const validationFailure = events.find((event) => event.event === 'provenance_rows_validation_failed'); expect(validationFailure).toMatchObject({ phase: 'provenance', data: { invalidRawPaths: ['cards/missing.json'], currentRawPaths: ['cards/source.json'], invalidRows: expect.arrayContaining([ expect.objectContaining({ row: expect.objectContaining({ rawPath: 'cards/missing.json' }), origin: expect.objectContaining({ source: 'work_unit_action', unitKey: 'card-valid-artifacts', actionIndex: 1, }), }), ]), }, }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects slDisallowed patches that touch semantic-layer files', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'lookml-mismatch', rawFiles: ['views/orders.lkml'], peerFileIndex: [], dependencyPaths: [], slDisallowed: true, slDisallowedReason: 'lookml_connection_mismatch', }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); await writeFile( join(root, 'semantic-layer/warehouse/orders.yaml'), 'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n', ); currentSession.actions.push({ target: 'sl', type: 'created', key: 'orders', detail: 'forbidden', targetConnectionId: 'warehouse' }); await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], 'forbidden sl', 'KTX Test', 'system@ktx.local'); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['views/orders.lkml', 'h1']]); await expect( runner.run({ jobId: 'job-sl-disallowed', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }), ).rejects.toThrow(/isolated diff textual conflict/); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-sl-disallowed/trace.jsonl'), 'utf-8'); expect(trace).toContain('patch_policy_rejected'); expect(trace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/warehouse/orders.yaml'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects final wiki refs broken by another accepted WorkUnit before squash', async () => { const runtime = await makeRealGitRuntime(); try { await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); await writeFile( join(runtime.configDir, 'wiki/global/source-page.md'), '---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n', ); await runtime.git.commitFiles(['wiki/global/source-page.md'], 'seed source page', 'KTX Test', 'system@ktx.local'); const preRunHead = await runtime.git.revParseHead(); const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [ { unitKey: 'page-ref', rawFiles: ['pages/ref.json'], peerFileIndex: [], dependencyPaths: [] }, { unitKey: 'page-delete', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }, ], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { const root = rootOfConfig(currentSession.configService, runtime.configDir); if (params.telemetryTags.unitKey === 'page-ref') { await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Page with wiki ref', rawPaths: ['pages/ref.json'], }); await currentSession.gitService.commitFiles( ['wiki/global/account-segments.md'], 'wu page ref', 'KTX Test', 'system@ktx.local', ); } if (params.telemetryTags.unitKey === 'page-delete') { await rm(join(root, 'wiki/global/source-page.md'), { force: true }); currentSession.actions.push({ target: 'wiki', type: 'removed', key: 'source-page', detail: 'Delete referenced page', rawPaths: ['pages/delete.json'], }); await currentSession.gitService.commitFiles( ['wiki/global/source-page.md'], 'wu delete source page', 'KTX Test', 'system@ktx.local', ); } return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [ ['pages/ref.json', 'h1'], ['pages/delete.json', 'h2'], ]); await expect( runner.run({ jobId: 'job-wiki-ref-conflict', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/); expect(await runtime.git.revParseHead()).toBe(preRunHead); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-wiki-ref-conflict/trace.jsonl'), 'utf-8'); expect(trace).toContain('final_artifact_gates_failed'); expect(trace).toContain('account-segments -> source-page'); expect(trace).toContain('ingest_failed'); expect(trace).toContain('failure_report_created'); expect(trace).not.toContain('squash_finished'); const failureReport = (deps.reports.create as any).mock.calls .map((call: any[]) => call[0]) .find((report: any) => report.body.status === 'failed'); expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates', message: expect.stringContaining('account-segments -> source-page'), details: expect.objectContaining({ changedWikiPageKeys: expect.arrayContaining(['account-segments']), workUnitPatchTouchedPaths: expect.arrayContaining([ 'wiki/global/account-segments.md', 'wiki/global/source-page.md', ]), }), }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects unchanged inbound wiki refs broken by an isolated wiki deletion', async () => { const runtime = await makeRealGitRuntime(); try { await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); await writeFile( join(runtime.configDir, 'wiki/global/source-page.md'), '---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n', ); await writeFile( join(runtime.configDir, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n', ); await runtime.git.commitFiles( ['wiki/global/source-page.md', 'wiki/global/account-segments.md'], 'seed inbound wiki refs', 'KTX Test', 'system@ktx.local', ); const preRunHead = await runtime.git.revParseHead(); const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'delete-target-page', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.unitKey !== 'delete-target-page') { return { stopReason: 'natural' }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await rm(join(root, 'wiki/global/source-page.md'), { force: true }); currentSession.actions.push({ target: 'wiki', type: 'removed', key: 'source-page', detail: 'Delete referenced page', rawPaths: ['pages/delete.json'], }); await currentSession.gitService.commitFiles( ['wiki/global/source-page.md'], 'wu delete target page', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['pages/delete.json', 'h1']]); await expect( runner.run({ jobId: 'job-existing-wiki-ref-stale', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/); expect(await runtime.git.revParseHead()).toBe(preRunHead); const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-wiki-ref-stale/trace.jsonl'), 'utf-8')) .trim() .split('\n') .map((line) => JSON.parse(line)); expect(events.map((event) => event.event)).toEqual( expect.arrayContaining([ 'final_artifact_gates_started', 'final_artifact_gates_failed', 'ingest_failed', 'failure_report_created', ]), ); expect(events.map((event) => event.event)).not.toContain('squash_finished'); const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed'); expect(gateFailure).toMatchObject({ data: { wikiReferenceGateScope: { global: true, reasons: expect.arrayContaining(['wiki_page_removed']), removedWikiPageKeys: expect.arrayContaining(['source-page']), pageKeysValidated: expect.arrayContaining(['account-segments']), }, actionOrigins: expect.arrayContaining([ expect.objectContaining({ source: 'work_unit_action', unitKey: 'delete-target-page', unitRawFiles: ['pages/delete.json'], action: expect.objectContaining({ target: 'wiki', type: 'removed', key: 'source-page', rawPaths: ['pages/delete.json'], }), }), ]), }, error: { message: expect.stringContaining('account-segments -> source-page') }, }); const failureReport = (deps.reports.create as any).mock.calls .map((call: any[]) => call[0]) .find((report: any) => report.body.status === 'failed'); expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates', message: expect.stringContaining('account-segments -> source-page'), details: expect.objectContaining({ wikiReferenceGateScope: expect.objectContaining({ global: true, reasons: expect.arrayContaining(['wiki_page_removed']), removedWikiPageKeys: expect.arrayContaining(['source-page']), pageKeysValidated: expect.arrayContaining(['account-segments']), }), changedWikiPageKeys: expect.arrayContaining(['source-page']), actionOrigins: expect.arrayContaining([ expect.objectContaining({ source: 'work_unit_action', unitKey: 'delete-target-page', action: expect.objectContaining({ target: 'wiki', type: 'removed', key: 'source-page', rawPaths: ['pages/delete.json'], }), }), ]), }), }); expect(failureReport.body.workUnits).toEqual( expect.arrayContaining([ expect.objectContaining({ unitKey: 'delete-target-page', actions: expect.arrayContaining([ expect.objectContaining({ target: 'wiki', type: 'removed', key: 'source-page', rawPaths: ['pages/delete.json'], }), ]), }), ]), ); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'finance-source', rawFiles: ['cards/finance.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'semantic-layer/finance'), { recursive: true }); await writeFile( join(root, 'semantic-layer/finance/orders.yaml'), 'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'orders'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'orders', detail: 'Unauthorized target', targetConnectionId: 'finance', rawPaths: ['cards/finance.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/finance/orders.yaml'], 'wu unauthorized target', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/finance.json', 'h1']]); const preRunHead = await runtime.git.revParseHead(); await expect( runner.run({ jobId: 'job-unauthorized-wu-target', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/isolated diff textual conflict.*semantic-layer target connection not allowed/); expect(await runtime.git.revParseHead()).toBe(preRunHead); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-wu-target/trace.jsonl'), 'utf-8'); expect(trace).toContain('patch_policy_rejected'); expect(trace).toContain('semantic-layer/finance/orders.yaml'); expect(trace).toContain('allowedTargetConnectionIds'); expect(trace).toContain('ingest_failed'); expect(trace).toContain('failure_report_created'); expect(trace).not.toContain('squash_finished'); const failureReport = (deps.reports.create as any).mock.calls .map((call: any[]) => call[0]) .find((report: any) => report.body.status === 'failed'); expect(failureReport.body.failure).toMatchObject({ phase: 'integration', message: expect.stringContaining('semantic-layer target connection not allowed'), }); expect(failureReport.body.failure.details).toMatchObject({ unitKey: 'finance-source', allowedTargetConnectionIds: ['warehouse'], touchedPaths: ['semantic-layer/finance/orders.yaml'], reason: expect.stringContaining('semantic-layer/finance/orders.yaml (finance)'), }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects reconciliation mutations that touch unauthorized semantic-layer target connections before squash', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'valid-page', rawFiles: ['pages/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { const root = rootOfConfig(currentSession.configService, runtime.configDir); if (params.telemetryTags.operationName === 'ingest-bundle-wu') { await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile(join(root, 'wiki/global/valid-page.md'), '---\nsummary: Valid page\nusage_mode: auto\n---\n\nValid\n'); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'valid-page', detail: 'Valid page', rawPaths: ['pages/source.json'], }); await currentSession.gitService.commitFiles(['wiki/global/valid-page.md'], 'wu valid page', 'KTX Test', 'system@ktx.local'); } else { await mkdir(join(root, 'semantic-layer/finance'), { recursive: true }); await writeFile( join(root, 'semantic-layer/finance/reconcile_orders.yaml'), 'name: reconcile_orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'reconcile_orders'); currentSession.actions.push({ target: 'sl', type: 'created', key: 'reconcile_orders', detail: 'Unauthorized reconcile target', targetConnectionId: 'finance', rawPaths: ['pages/source.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/finance/reconcile_orders.yaml'], 'reconcile unauthorized target', 'KTX Test', 'system@ktx.local', ); } return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['pages/source.json', 'h1']]); const preRunHead = await runtime.git.revParseHead(); await expect( runner.run({ jobId: 'job-unauthorized-reconcile-target', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/semantic-layer target connection not allowed/); expect(await runtime.git.revParseHead()).toBe(preRunHead); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-reconcile-target/trace.jsonl'), 'utf-8', ); expect(trace).toContain('semantic_layer_target_policy_started'); expect(trace).toContain('semantic_layer_target_policy_failed'); expect(trace).toContain('allowedTargetConnectionIds'); expect(trace).toContain('semantic-layer/finance/reconcile_orders.yaml'); expect(trace).toContain('ingest_failed'); expect(trace).toContain('failure_report_created'); expect(trace).not.toContain('squash_finished'); const failureReport = (deps.reports.create as any).mock.calls .map((call: any[]) => call[0]) .find((report: any) => report.body.status === 'failed'); expect(failureReport.body.failure).toMatchObject({ phase: 'target_policy', message: expect.stringContaining('semantic-layer target connection not allowed'), }); expect(failureReport.body.failure.details).toMatchObject({ allowedTargetConnectionIds: ['warehouse'], touchedPaths: expect.arrayContaining(['semantic-layer/finance/reconcile_orders.yaml']), }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('repairs additive same-source textual conflicts before final gates and squash', async () => { const runtime = await makeRealGitRuntime(); try { const { deps } = makeDeps(runtime); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') { const current = await params.toolSet.read_integration_file.execute({ path: 'semantic-layer/warehouse/mart_account_segments.yaml', }); expect(current.markdown).toContain('total_contract_arr_cents'); const patch = await params.toolSet.read_failed_patch.execute({}); expect(patch.markdown).toContain('account_count'); await params.toolSet.write_integration_file.execute({ path: 'semantic-layer/warehouse/mart_account_segments.yaml', content: 'name: mart_account_segments\n' + 'grain: [account_id]\n' + 'columns: [{name: account_id, type: string}]\n' + 'joins: []\n' + 'measures:\n' + ' - name: total_contract_arr_cents\n' + ' expr: sum(contract_arr)\n' + ' - name: account_count\n' + ' expr: count_distinct(account_id)\n', }); return { stopReason: 'natural' }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); if (params.telemetryTags.unitKey === 'card-wiki') { await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\n' + 'grain: [account_id]\n' + 'columns: [{name: account_id, type: string}]\n' + 'joins: []\n' + 'measures:\n' + ' - name: total_contract_arr_cents\n' + ' expr: sum(contract_arr)\n', ); } else if (params.telemetryTags.unitKey === 'card-source') { await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\n' + 'grain: [account_id]\n' + 'columns: [{name: account_id, type: string}]\n' + 'joins: []\n' + 'measures:\n' + ' - name: account_count\n' + ' expr: count_distinct(account_id)\n', ); } addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'updated', key: 'mart_account_segments', detail: 'Updated account segments source', targetConnectionId: 'warehouse', }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml'], `wu ${params.telemetryTags.unitKey}`, 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [ ['cards/wiki.json', 'hash-a'], ['cards/source.json', 'hash-b'], ]); const result = await runner.run({ jobId: 'job-resolver-e2e', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'manual_resync', bundleRef: { kind: 'upload', uploadId: 'upload-1' }, }); expect(result.commitSha).toBeTruthy(); const source = await readFile(join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8'); expect(source).toContain('total_contract_arr_cents'); expect(source).toContain('account_count'); expect(deps.agentRunner.runLoop).toHaveBeenCalledWith( expect.objectContaining({ modelRole: 'repair', telemetryTags: expect.objectContaining({ operationName: 'ingest-isolated-diff-textual-resolver', unitKey: 'card-source', }), }), ); const successReport = (deps.reports.create as any).mock.calls.at(-1)?.[0]?.body; expect(successReport.isolatedDiff).toMatchObject({ acceptedPatches: 2, textualConflicts: 1, semanticConflicts: 0, resolverAttempts: 1, resolverRepairs: 1, resolverFailures: 0, }); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-resolver-e2e/trace.jsonl'), 'utf-8'); expect(trace).toContain('textual_conflict_resolver_repaired'); expect(trace).toContain('patch_accepted_after_textual_resolution'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('repairs final wiki body refs before squash when the repair agent edits the scoped page', async () => { const runtime = await makeRealGitRuntime(); try { await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true }); await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); await writeFile( join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n', ); await writeFile( join(runtime.configDir, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n', ); await runtime.git.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'], 'seed stale wiki body ref', 'KTX Test', 'system@ktx.local', ); const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') { const gateError = await params.toolSet.read_gate_error.execute({}); expect(gateError.markdown).toContain('total_contract_arr_cents'); const page = await params.toolSet.read_repair_file.execute({ path: 'wiki/global/account-segments.md', }); await params.toolSet.write_repair_file.execute({ path: 'wiki/global/account-segments.md', content: page.markdown.replace('total_contract_arr_cents', 'total_contract_arr'), }); return { stopReason: 'natural' as const }; } if (params.modelRole === 'reconcile') { return { stopReason: 'natural' as const }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'updated', key: 'mart_account_segments', detail: 'Rename ARR measure', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source rename', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' as const }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); const result = await runner.run({ jobId: 'job-final-gate-repair', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }); expect(result.commitSha).toBeTruthy(); await expect(readFile(join(runtime.configDir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.toContain( 'mart_account_segments.total_contract_arr', ); await expect(readFile(join(runtime.configDir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.not.toContain( 'total_contract_arr_cents', ); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any; expect(reportCreate.body.isolatedDiff).toMatchObject({ gateRepairAttempts: 1, gateRepairs: 1, gateRepairFailures: 0, }); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair/trace.jsonl'), 'utf-8'); expect(trace).toContain('gate_repair_repaired'); expect(trace).toContain('final_artifact_gates_after_gate_repair_finished'); expect(trace).toContain('final_gate_repair_committed'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('fails before squash when final gate repair makes no edit', async () => { const runtime = await makeRealGitRuntime(); try { await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true }); await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true }); await writeFile( join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n', ); await writeFile( join(runtime.configDir, 'wiki/global/account-segments.md'), '---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n', ); await runtime.git.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'], 'seed stale wiki body ref', 'KTX Test', 'system@ktx.local', ); const preRunHead = await runtime.git.revParseHead(); const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-isolated-diff-gate-repair') { return { stopReason: 'natural' as const }; } if (params.modelRole === 'reconcile') { return { stopReason: 'natural' as const }; } const root = rootOfConfig(currentSession.configService, runtime.configDir); await writeFile( join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); currentSession.actions.push({ target: 'sl', type: 'updated', key: 'mart_account_segments', detail: 'Rename ARR measure', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles( ['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source rename', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' as const }; }) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); await expect( runner.run({ jobId: 'job-final-gate-repair-fails', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/gate repair completed without editing an allowed path/); expect(await runtime.git.revParseHead()).toBe(preRunHead); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any; expect(reportCreate.body.status).toBe('failed'); expect(reportCreate.body.isolatedDiff).toMatchObject({ gateRepairAttempts: 1, gateRepairs: 0, gateRepairFailures: 1, }); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair-fails/trace.jsonl'), 'utf-8', ); expect(trace).toContain('gate_repair_failed'); expect(trace).not.toContain('squash_finished'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('runs finalization before wiki sl-ref repair and final gates', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); adapter.finalize = vi.fn(async ({ workdir }) => { await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true }); await mkdir(join(workdir, 'wiki/global'), { recursive: true }); await writeFile( join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', ); await writeFile( join(workdir, 'wiki/global/finalized-accounts.md'), '---\nsummary: Finalized accounts\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n - missing_source\n---\n\nAccounts use `mart_account_segments.total_contract_arr`.\n', ); return { warnings: [], errors: [], touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }], changedWikiPageKeys: ['finalized-accounts'], actions: [ { target: 'sl', type: 'created', key: 'mart_account_segments', detail: 'Finalized accounts', targetConnectionId: 'warehouse', rawPaths: ['cards/source.json'], }, { target: 'wiki', type: 'created', key: 'finalized-accounts', detail: 'Finalized wiki', rawPaths: ['cards/source.json'], }, ], }; }); deps.agentRunner.runLoop = vi.fn(async () => ({ stopReason: 'natural' as const })) as never; const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); await runner.run({ jobId: 'job-finalization', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-finalization/trace.jsonl'), 'utf-8', ); expect(trace.indexOf('finalization_committed')).toBeLessThan(trace.indexOf('wiki_sl_refs_repaired')); expect(trace.indexOf('wiki_sl_refs_repaired')).toBeLessThan(trace.indexOf('final_artifact_gates')); await expect(readFile(join(runtime.configDir, 'wiki/global/finalized-accounts.md'), 'utf-8')).resolves.toContain( 'sl_refs:\n - mart_account_segments', ); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('fails when finalization edits a path already changed earlier in the run', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [{ unitKey: 'wiki-page', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], }); let currentSession: any = null; deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { currentSession = toolSession; return { toRuntimeTools: vi.fn(() => ({})) }; }); deps.agentRunner.runLoop = vi.fn(async () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( join(root, 'wiki/global/orders.md'), '---\nsummary: Orders\nusage_mode: auto\n---\n\nWU body\n', ); currentSession.actions.push({ target: 'wiki', type: 'created', key: 'orders', detail: 'WU orders', rawPaths: ['cards/source.json'], }); await currentSession.gitService.commitFiles( ['wiki/global/orders.md'], 'wu orders', 'KTX Test', 'system@ktx.local', ); return { stopReason: 'natural' as const }; }) as never; adapter.finalize = vi.fn(async ({ workdir }) => { await writeFile( join(workdir, 'wiki/global/orders.md'), '---\nsummary: Orders\nusage_mode: auto\n---\n\nFinalized body\n', ); return { warnings: [], errors: [], touchedSources: [], changedWikiPageKeys: ['orders'], actions: [{ target: 'wiki', type: 'updated', key: 'orders', detail: 'Conflicting finalization' }], }; }); const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); await expect( runner.run({ jobId: 'job-finalization-overlap', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/finalization modified path\(s\) already changed earlier in this run: wiki\/global\/orders\.md/); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); it('rejects finalization writes to unauthorized semantic-layer targets', async () => { const runtime = await makeRealGitRuntime(); try { const { deps, adapter } = makeDeps(runtime); adapter.chunk.mockResolvedValue({ workUnits: [] }); adapter.finalize = vi.fn(async ({ workdir }) => { await mkdir(join(workdir, 'semantic-layer/other-warehouse'), { recursive: true }); await writeFile( join(workdir, 'semantic-layer/other-warehouse/orders.yaml'), 'name: orders\ngrain: [order_id]\ncolumns: [{name: order_id, type: string}]\njoins: []\nmeasures: []\n', ); return { warnings: [], errors: [], touchedSources: [{ connectionId: 'other-warehouse', sourceName: 'orders' }], changedWikiPageKeys: [], actions: [ { target: 'sl', type: 'created', key: 'orders', targetConnectionId: 'other-warehouse', detail: 'Forbidden target', rawPaths: ['cards/source.json'], }, ], }; }); const runner = new IngestBundleRunner(deps); await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); await expect( runner.run({ jobId: 'job-finalization-target-policy', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).rejects.toThrow(/semantic-layer target connection not allowed/); const trace = await readFile( join(runtime.configDir, '.ktx/ingest-traces/job-finalization-target-policy/trace.jsonl'), 'utf-8', ); expect(trace).toContain('finalization_committed'); expect(trace).toContain('semantic_layer_target_policy'); expect(trace).toContain('ingest_failed'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); });