test(ingest): cover isolated diff reference and target gates

This commit is contained in:
Andrey Avtomonov 2026-05-17 22:13:54 +02:00
parent 3613fb3686
commit c61c50be11
2 changed files with 303 additions and 8 deletions

View file

@ -1,4 +1,4 @@
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
@ -69,8 +69,29 @@ async function loadSourcesFromRoot(root: string) {
};
}
async function listGlobalWikiPageKeys(root: string): Promise<string[]> {
const dir = join(root, 'wiki/global');
const entries = await readdir(dir).catch(() => []);
return entries
.filter((entry) => entry.endsWith('.md'))
.map((entry) => entry.slice(0, -'.md'.length))
.sort();
}
function frontmatterList(yaml: string, key: string): string[] {
const pattern = new RegExp(`${key}:\\n((?: - .+\\n?)*)`);
return (
pattern
.exec(yaml)?.[1]
?.split('\n')
.map((line) => line.trim().replace(/^- /, ''))
.filter(Boolean) ?? []
);
}
function makeWikiService(root: string) {
return {
listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])),
readPage: vi.fn(async (_scope: string, _scopeId: string | null, key: string) => {
const path = join(root, 'wiki/global', `${key}.md`);
const raw = await readFile(path, 'utf-8').catch(() => null);
@ -78,15 +99,14 @@ function makeWikiService(root: string) {
return null;
}
const [, yaml = '', content = ''] = /^---\n([\s\S]*?)\n---\n?([\s\S]*)$/.exec(raw) ?? [];
const slRefs =
/sl_refs:\n((?: - .+\n?)*)/
.exec(yaml)?.[1]
?.split('\n')
.map((line) => line.trim().replace(/^- /, ''))
.filter(Boolean) ?? [];
return {
pageKey: key,
frontmatter: { summary: key, usage_mode: 'auto', sl_refs: slRefs },
frontmatter: {
summary: key,
usage_mode: 'auto',
refs: frontmatterList(yaml, 'refs'),
sl_refs: frontmatterList(yaml, 'sl_refs'),
},
content: content.trim(),
};
}),
@ -823,4 +843,277 @@ describe('IngestBundleRunner isolated diff path', () => {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects final wiki refs broken by another accepted WorkUnit before squash', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'wiki/global/source-page.md'),
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n',
);
await runtime.git.commitFiles(['wiki/global/source-page.md'], 'seed source page', 'KTX Test', 'system@ktx.local');
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'page-ref', rawFiles: ['pages/ref.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'page-delete', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.unitKey === 'page-ref') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Page with wiki ref',
rawPaths: ['pages/ref.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/account-segments.md'],
'wu page ref',
'KTX Test',
'system@ktx.local',
);
}
if (params.telemetryTags.unitKey === 'page-delete') {
await rm(join(root, 'wiki/global/source-page.md'), { force: true });
currentSession.actions.push({
target: 'wiki',
type: 'removed',
key: 'source-page',
detail: 'Delete referenced page',
rawPaths: ['pages/delete.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/source-page.md'],
'wu delete source page',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['pages/ref.json', 'h1'],
['pages/delete.json', 'h2'],
]);
await expect(
runner.run({
jobId: 'job-wiki-ref-conflict',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-wiki-ref-conflict/trace.jsonl'), 'utf-8');
expect(trace).toContain('final_artifact_gates_failed');
expect(trace).toContain('account-segments -> source-page');
expect(trace).toContain('ingest_failed');
expect(trace).toContain('failure_report_created');
expect(trace).not.toContain('squash_finished');
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'final_gates',
message: expect.stringContaining('account-segments -> source-page'),
details: expect.objectContaining({
changedWikiPageKeys: expect.arrayContaining(['account-segments']),
workUnitPatchTouchedPaths: expect.arrayContaining([
'wiki/global/account-segments.md',
'wiki/global/source-page.md',
]),
}),
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'finance-source', rawFiles: ['cards/finance.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/finance'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/finance/orders.yaml'),
'name: orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'orders');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'orders',
detail: 'Unauthorized target',
targetConnectionId: 'finance',
rawPaths: ['cards/finance.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/finance/orders.yaml'],
'wu unauthorized target',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/finance.json', 'h1']]);
const preRunHead = await runtime.git.revParseHead();
await expect(
runner.run({
jobId: 'job-unauthorized-wu-target',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/isolated diff textual conflict.*semantic-layer target connection not allowed/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-wu-target/trace.jsonl'), 'utf-8');
expect(trace).toContain('patch_policy_rejected');
expect(trace).toContain('semantic-layer/finance/orders.yaml');
expect(trace).toContain('allowedTargetConnectionIds');
expect(trace).toContain('failure_report_created');
expect(trace).not.toContain('squash_finished');
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'integration',
message: expect.stringContaining('semantic-layer target connection not allowed'),
});
expect(failureReport.body.failure.details).toMatchObject({
unitKey: 'finance-source',
allowedTargetConnectionIds: ['warehouse'],
touchedPaths: ['semantic-layer/finance/orders.yaml'],
reason: expect.stringContaining('semantic-layer/finance/orders.yaml (finance)'),
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects reconciliation mutations that touch unauthorized semantic-layer target connections before squash', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'valid-page', rawFiles: ['pages/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(join(root, 'wiki/global/valid-page.md'), '---\nsummary: Valid page\nusage_mode: auto\n---\n\nValid\n');
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'valid-page',
detail: 'Valid page',
rawPaths: ['pages/source.json'],
});
await currentSession.gitService.commitFiles(['wiki/global/valid-page.md'], 'wu valid page', 'KTX Test', 'system@ktx.local');
} else {
await mkdir(join(root, 'semantic-layer/finance'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/finance/reconcile_orders.yaml'),
'name: reconcile_orders\ngrain: [id]\ncolumns: [{name: id, type: string}]\njoins: []\nmeasures: []\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'finance', 'reconcile_orders');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'reconcile_orders',
detail: 'Unauthorized reconcile target',
targetConnectionId: 'finance',
rawPaths: ['pages/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/finance/reconcile_orders.yaml'],
'reconcile unauthorized target',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['pages/source.json', 'h1']]);
const preRunHead = await runtime.git.revParseHead();
await expect(
runner.run({
jobId: 'job-unauthorized-reconcile-target',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/semantic-layer target connection not allowed/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-unauthorized-reconcile-target/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('semantic_layer_target_policy_started');
expect(trace).toContain('semantic_layer_target_policy_failed');
expect(trace).toContain('semantic-layer/finance/reconcile_orders.yaml');
expect(trace).toContain('ingest_failed');
expect(trace).not.toContain('squash_finished');
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'target_policy',
message: expect.stringContaining('semantic-layer target connection not allowed'),
});
expect(failureReport.body.failure.details).toMatchObject({
allowedTargetConnectionIds: ['warehouse'],
touchedPaths: expect.arrayContaining(['semantic-layer/finance/reconcile_orders.yaml']),
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
});

View file

@ -2129,6 +2129,8 @@ export class IngestBundleRunner {
const finalArtifactGateTraceData = {
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
projectionTouchedPaths,
workUnitPatchTouchedPaths: workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []),
preReconciliationSha,
postReconciliationSha,
postReconciliationPaths,