test: cover isolated diff ingestion regressions

This commit is contained in:
Andrey Avtomonov 2026-05-17 21:29:08 +02:00
parent c481f1cce9
commit 1013bb65df

View file

@ -0,0 +1,464 @@
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { GitService, SessionWorktreeService } from '../core/index.js';
import { LocalGitFileStore } from '../project/local-git-file-store.js';
import { addTouchedSlSource } from '../tools/index.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
import type { IngestBundleRunnerDeps } from './ports.js';
async function makeRealGitRuntime() {
const homeDir = await mkdtemp(join(tmpdir(), 'ktx-isolated-runner-'));
const configDir = join(homeDir, 'config');
const git = new GitService({
storage: { configDir, homeDir },
git: {
userName: 'System User',
userEmail: 'system@example.com',
bootstrapMessage: 'init',
bootstrapAuthor: 'system',
bootstrapAuthorEmail: 'system@example.com',
},
});
await git.onModuleInit();
const configService = new LocalGitFileStore({ rootDir: configDir, git });
const sessionWorktreeService = new SessionWorktreeService({
coreConfig: {
storage: { configDir, homeDir },
git: {
userName: 'System User',
userEmail: 'system@example.com',
bootstrapMessage: 'init',
bootstrapAuthor: 'system',
bootstrapAuthorEmail: 'system@example.com',
},
},
gitService: git,
configService,
});
return { homeDir, configDir, git, configService, sessionWorktreeService };
}
function rootOfConfig(configService: unknown, fallback: string): string {
const rootDir = (configService as { rootDir?: unknown }).rootDir;
return typeof rootDir === 'string' ? rootDir : fallback;
}
async function loadSourcesFromRoot(root: string) {
const raw = await readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8').catch(
() => '',
);
const hasCents = raw.includes('total_contract_arr_cents');
const hasDollars = raw.includes('total_contract_arr');
return {
sources:
hasCents || hasDollars
? [
{
name: 'mart_account_segments',
grain: ['account_id'],
columns: [{ name: 'account_id', type: 'string' }],
joins: [],
measures: [{ name: hasCents ? 'total_contract_arr_cents' : 'total_contract_arr', expr: 'sum(contract_arr)' }],
table: 'analytics.mart_account_segments',
},
]
: [],
loadErrors: [],
};
}
function makeWikiService(root: string) {
return {
readPage: vi.fn(async (_scope: string, _scopeId: string | null, key: string) => {
const path = join(root, 'wiki/global', `${key}.md`);
const raw = await readFile(path, 'utf-8').catch(() => null);
if (!raw) {
return null;
}
const [, yaml = '', content = ''] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/.exec(raw) ?? [];
const slRefs =
/sl_refs:\n((?: - .+\n?)*)/
.exec(yaml)?.[1]
?.split('\n')
.map((line) => line.trim().replace(/^- /, ''))
.filter(Boolean) ?? [];
return {
pageKey: key,
frontmatter: { summary: key, usage_mode: 'auto', sl_refs: slRefs },
content: content.trim(),
};
}),
syncFromCommit: vi.fn(),
};
}
function makeDeps(runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>) {
const adapter = {
source: 'metabase',
skillNames: [],
detect: vi.fn().mockResolvedValue(true),
chunk: vi.fn().mockResolvedValue({
workUnits: [
{ unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] },
],
}),
};
const wikiService = makeWikiService(runtime.configDir);
const semanticLayerService: any = {
loadAllSources: vi.fn(async () => loadSourcesFromRoot(runtime.configDir)),
listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']),
};
semanticLayerService.forWorktree = vi.fn((workdir: string) => ({
...semanticLayerService,
loadAllSources: vi.fn(async () => loadSourcesFromRoot(workdir)),
listFilesForConnection: vi.fn().mockResolvedValue(['mart_account_segments.yaml']),
}));
const deps: IngestBundleRunnerDeps = {
runs: { create: vi.fn().mockResolvedValue({ id: 'run-1' }), markCompleted: vi.fn(), markFailed: vi.fn() },
provenance: {
insertMany: vi.fn(),
findLatestHashesForCompletedSyncs: vi.fn().mockResolvedValue(new Map()),
findLatestArtifactsForRawPaths: vi.fn().mockResolvedValue(new Map()),
},
reports: { create: vi.fn().mockResolvedValue({ id: 'report-1' }), findByJobId: vi.fn().mockResolvedValue(null), markSuperseded: vi.fn() },
canonicalPins: { listPins: vi.fn().mockResolvedValue([]) },
registry: { get: vi.fn().mockReturnValue(adapter), register: vi.fn(), has: vi.fn(), list: vi.fn() },
diffSetService: {
compute: vi.fn().mockResolvedValue({ added: ['cards/wiki.json', 'cards/source.json'], modified: [], deleted: [], unchanged: [] }),
},
sessionWorktreeService: runtime.sessionWorktreeService,
agentRunner: { runLoop: vi.fn() },
gitService: runtime.git,
lockingService: { withLock: vi.fn(async (_key, fn) => fn()) },
storage: {
homeDir: join(runtime.configDir, '.ktx'),
systemGitAuthor: { name: 'KTX Test', email: 'system@ktx.local' },
resolveUploadDir: (id) => join(runtime.homeDir, 'upload', id),
resolvePullDir: (id) => join(runtime.homeDir, 'pull', id),
resolveTranscriptDir: (id) => join(runtime.configDir, '.ktx/ingest-transcripts', id),
resolveTracePath: (id) => join(runtime.configDir, '.ktx/ingest-traces', id, 'trace.jsonl'),
},
settings: { memoryIngestionModel: 'test', probeRowCount: 1, isolatedDiffSourceKeys: ['metabase'], ingestTraceLevel: 'trace' },
skillsRegistry: {
listSkills: vi.fn().mockResolvedValue([]),
getSkill: vi.fn().mockResolvedValue(null),
buildSkillsPrompt: vi.fn().mockReturnValue(''),
stripFrontmatter: vi.fn((body) => body),
} as never,
promptService: { loadPrompt: vi.fn().mockResolvedValue('base') },
wikiService: { ...wikiService, forWorktree: vi.fn((workdir: string) => makeWikiService(workdir)) } as never,
knowledgeIndex: { listPagesForUser: vi.fn().mockResolvedValue([]) },
knowledgeSlRefs: { syncFromWiki: vi.fn() },
semanticLayerService,
slSearchService: { indexSources: vi.fn() },
slSourcesRepository: {} as never,
slValidator: { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) },
connections: { listEnabledConnections: vi.fn().mockResolvedValue([]), getConnectionById: vi.fn() } as never,
toolsetFactory: { createIngestWuToolset: vi.fn(() => ({ toRuntimeTools: vi.fn(() => ({})) })) },
commitMessages: { enqueueForExternalCommit: vi.fn() },
embedding: { maxBatchSize: 64, computeEmbedding: vi.fn(), computeEmbeddingsBulk: vi.fn() },
};
return { deps, adapter };
}
async function mockStageRawFiles(runner: IngestBundleRunner, runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>, hashes: [string, string][]) {
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue(join(runtime.homeDir, 'stage'));
(runner as any).stageRawFilesStage1 = vi.fn(async ({ worktreeRoot }: any) => {
const rawDir = join(worktreeRoot, 'raw-sources/warehouse/metabase/s');
await mkdir(rawDir, { recursive: true });
for (const [rawPath] of hashes) {
await mkdir(join(rawDir, rawPath.split('/').slice(0, -1).join('/')), { recursive: true });
await writeFile(join(rawDir, rawPath), '{}');
}
return { currentHashes: new Map(hashes), rawDirInWorktree: 'raw-sources/warehouse/metabase/s' };
});
}
describe('IngestBundleRunner isolated diff path', () => {
it('rejects the Metabase stale-measure wiki body regression before squash', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.project = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
changedWikiPageKeys: [],
};
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.unitKey === 'card-wiki') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n',
);
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'account-segments', detail: 'Account segments' });
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local');
}
if (params.telemetryTags.unitKey === 'card-source') {
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Dollar measure',
targetConnectionId: 'warehouse',
});
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/mart_account_segments.yaml'], 'wu source', 'KTX Test', 'system@ktx.local');
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['cards/wiki.json', 'h1'],
['cards/source.json', 'h2'],
]);
await expect(
runner.run({ jobId: 'job-1', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/total_contract_arr_cents/);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-1/trace.jsonl'), 'utf-8');
expect(trace).toContain('input_snapshot');
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).toContain('work_unit_patch_collected');
expect(trace).toContain('patch_apply_started');
expect(trace).toContain('final_artifact_gates_failed');
expect(trace).toContain('ingest_failed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('accepts two isolated work units that edit different wiki pages', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'page-a', rawFiles: ['pages/a.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'page-b', rawFiles: ['pages/b.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const unitKey = params.telemetryTags.unitKey;
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(join(root, `wiki/global/${unitKey}.md`), `---\nsummary: ${unitKey}\nusage_mode: auto\n---\n\n${unitKey}\n`);
currentSession.actions.push({ target: 'wiki', type: 'created', key: unitKey, detail: unitKey });
await currentSession.gitService.commitFiles([`wiki/global/${unitKey}.md`], `wu ${unitKey}`, 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['pages/a.json', 'h1'],
['pages/b.json', 'h2'],
]);
const result = await runner.run({ jobId: 'job-clean', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } });
expect(result.failedWorkUnits).toEqual([]);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-clean/trace.jsonl'), 'utf-8');
expect(trace.match(/patch_accepted/g)).toHaveLength(2);
expect(trace).toContain('ingest_finished');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('classifies same-source patch application failure as a textual conflict', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'orders-a', rawFiles: ['orders/a.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'orders-b', rawFiles: ['orders/b.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const suffix = params.telemetryTags.unitKey === 'orders-a' ? 'a' : 'b';
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/orders.yaml'),
`name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures:\n - name: order_count_${suffix}\n expr: count(*)\n`,
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'orders');
currentSession.actions.push({ target: 'sl', type: 'updated', key: 'orders', detail: suffix, targetConnectionId: 'warehouse' });
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], `wu ${suffix}`, 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['orders/a.json', 'h1'],
['orders/b.json', 'h2'],
]);
await expect(
runner.run({ jobId: 'job-text-conflict', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/isolated diff textual conflict/);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-text-conflict/trace.jsonl'), 'utf-8');
expect(trace).toContain('patch_textual_conflict');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('makes deterministic projection visible to child worktrees before WorkUnit synthesis', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'wiki-projected', rawFiles: ['projected/wiki.json'], peerFileIndex: [], dependencyPaths: [] }],
});
adapter.project = vi.fn(async ({ workdir }) => {
await mkdir(join(workdir, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(workdir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
return {
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }],
changedWikiPageKeys: [],
};
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await expect(readFile(join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), 'utf-8')).resolves.toContain(
'total_contract_arr',
);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/projected-orders.md'),
'---\nsummary: Projected orders\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nARR `mart_account_segments.total_contract_arr`.\n',
);
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'projected-orders', detail: 'Projected orders' });
await currentSession.gitService.commitFiles(['wiki/global/projected-orders.md'], 'wu projected wiki', 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['projected/wiki.json', 'h1']]);
const result = await runner.run({ jobId: 'job-projection', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } });
expect(result.failedWorkUnits).toEqual([]);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-projection/trace.jsonl'), 'utf-8');
expect(trace).toContain('deterministic_projection_finished');
expect(trace).toContain('deterministic_projection_committed');
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects Notion-style changed wiki pages with invalid sl_refs', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'notion-page', rawFiles: ['pages/notion.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(join(root, 'wiki/global/notion-page.md'), '---\nsummary: Notion page\nusage_mode: auto\nsl_refs:\n - missing_source\n---\n\nBody\n');
currentSession.actions.push({ target: 'wiki', type: 'created', key: 'notion-page', detail: 'Notion page' });
await currentSession.gitService.commitFiles(['wiki/global/notion-page.md'], 'wu notion', 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['pages/notion.json', 'h1']]);
await expect(
runner.run({ jobId: 'job-invalid-slrefs', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/unknown sl_refs entry missing_source/);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects slDisallowed patches that touch semantic-layer files', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'lookml-mismatch',
rawFiles: ['views/orders.lkml'],
peerFileIndex: [],
dependencyPaths: [],
slDisallowed: true,
slDisallowedReason: 'lookml_connection_mismatch',
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/orders.yaml'),
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
);
currentSession.actions.push({ target: 'sl', type: 'created', key: 'orders', detail: 'forbidden', targetConnectionId: 'warehouse' });
await currentSession.gitService.commitFiles(['semantic-layer/warehouse/orders.yaml'], 'forbidden sl', 'KTX Test', 'system@ktx.local');
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['views/orders.lkml', 'h1']]);
await expect(
runner.run({ jobId: 'job-sl-disallowed', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
).rejects.toThrow(/slDisallowed WorkUnit lookml-mismatch touched semantic-layer\/warehouse\/orders.yaml/);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
});