fix(ingest): gate global wiki references

This commit is contained in:
Andrey Avtomonov 2026-05-17 22:25:46 +02:00
parent 43f581154f
commit ba534fb8ca
2 changed files with 409 additions and 1 deletions

View file

@ -272,6 +272,164 @@ describe('IngestBundleRunner isolated diff path', () => {
}
});
it('rejects unchanged wiki body refs made stale by isolated semantic-layer changes', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'semantic-layer/warehouse'), { recursive: true });
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr_cents\n expr: sum(contract_arr)\n',
);
await writeFile(
join(runtime.configDir, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nExisting ARR uses `mart_account_segments.total_contract_arr_cents`.\n',
);
await runtime.git.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml', 'wiki/global/account-segments.md'],
'seed existing wiki body ref',
'KTX Test',
'system@ktx.local',
);
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'source-only', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
detail: 'Rename ARR measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source rename',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-existing-body-stale',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/total_contract_arr_cents/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-body-stale/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'final_artifact_gates_started',
'final_artifact_gates_failed',
'ingest_failed',
'failure_report_created',
]),
);
expect(events.map((event) => event.event)).not.toContain('squash_finished');
const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed');
expect(gateFailure).toMatchObject({
data: {
wikiReferenceGateScope: {
global: true,
reasons: expect.arrayContaining(['semantic_layer_changed']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
},
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'source-only',
unitRawFiles: ['cards/source.json'],
action: expect.objectContaining({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
rawPaths: ['cards/source.json'],
targetConnectionId: 'warehouse',
}),
}),
]),
},
error: { message: expect.stringContaining('total_contract_arr_cents') },
});
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'final_gates',
message: expect.stringContaining('total_contract_arr_cents'),
details: expect.objectContaining({
wikiReferenceGateScope: expect.objectContaining({
global: true,
reasons: expect.arrayContaining(['semantic_layer_changed']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
}),
touchedSlSources: expect.arrayContaining([
expect.objectContaining({ connectionId: 'warehouse', sourceName: 'mart_account_segments' }),
]),
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'source-only',
action: expect.objectContaining({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
rawPaths: ['cards/source.json'],
targetConnectionId: 'warehouse',
}),
}),
]),
}),
});
expect(failureReport.body.workUnits).toEqual(
expect.arrayContaining([
expect.objectContaining({
unitKey: 'source-only',
actions: expect.arrayContaining([
expect.objectContaining({
target: 'sl',
type: 'updated',
key: 'mart_account_segments',
rawPaths: ['cards/source.json'],
}),
]),
}),
]),
);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('accepts two isolated work units that edit different wiki pages', async () => {
const runtime = await makeRealGitRuntime();
try {
@ -950,6 +1108,159 @@ describe('IngestBundleRunner isolated diff path', () => {
}
});
it('rejects unchanged inbound wiki refs broken by an isolated wiki deletion', async () => {
const runtime = await makeRealGitRuntime();
try {
await mkdir(join(runtime.configDir, 'wiki/global'), { recursive: true });
await writeFile(
join(runtime.configDir, 'wiki/global/source-page.md'),
'---\nsummary: Source page\nusage_mode: auto\n---\n\nSource page\n',
);
await writeFile(
join(runtime.configDir, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\nrefs:\n - source-page\n---\n\nSee [[source-page]].\n',
);
await runtime.git.commitFiles(
['wiki/global/source-page.md', 'wiki/global/account-segments.md'],
'seed inbound wiki refs',
'KTX Test',
'system@ktx.local',
);
const preRunHead = await runtime.git.revParseHead();
const { deps, adapter } = makeDeps(runtime);
adapter.chunk.mockResolvedValue({
workUnits: [{ unitKey: 'delete-target-page', rawFiles: ['pages/delete.json'], peerFileIndex: [], dependencyPaths: [] }],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.unitKey !== 'delete-target-page') {
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await rm(join(root, 'wiki/global/source-page.md'), { force: true });
currentSession.actions.push({
target: 'wiki',
type: 'removed',
key: 'source-page',
detail: 'Delete referenced page',
rawPaths: ['pages/delete.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/source-page.md'],
'wu delete target page',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['pages/delete.json', 'h1']]);
await expect(
runner.run({
jobId: 'job-existing-wiki-ref-stale',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/wiki references target missing page\(s\): account-segments -> source-page/);
expect(await runtime.git.revParseHead()).toBe(preRunHead);
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-existing-wiki-ref-stale/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'final_artifact_gates_started',
'final_artifact_gates_failed',
'ingest_failed',
'failure_report_created',
]),
);
expect(events.map((event) => event.event)).not.toContain('squash_finished');
const gateFailure = events.find((event) => event.event === 'final_artifact_gates_failed');
expect(gateFailure).toMatchObject({
data: {
wikiReferenceGateScope: {
global: true,
reasons: expect.arrayContaining(['wiki_page_removed']),
removedWikiPageKeys: expect.arrayContaining(['source-page']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
},
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'delete-target-page',
unitRawFiles: ['pages/delete.json'],
action: expect.objectContaining({
target: 'wiki',
type: 'removed',
key: 'source-page',
rawPaths: ['pages/delete.json'],
}),
}),
]),
},
error: { message: expect.stringContaining('account-segments -> source-page') },
});
const failureReport = (deps.reports.create as any).mock.calls
.map((call: any[]) => call[0])
.find((report: any) => report.body.status === 'failed');
expect(failureReport.body.failure).toMatchObject({
phase: 'final_gates',
message: expect.stringContaining('account-segments -> source-page'),
details: expect.objectContaining({
wikiReferenceGateScope: expect.objectContaining({
global: true,
reasons: expect.arrayContaining(['wiki_page_removed']),
removedWikiPageKeys: expect.arrayContaining(['source-page']),
pageKeysValidated: expect.arrayContaining(['account-segments']),
}),
changedWikiPageKeys: expect.arrayContaining(['source-page']),
actionOrigins: expect.arrayContaining([
expect.objectContaining({
source: 'work_unit_action',
unitKey: 'delete-target-page',
action: expect.objectContaining({
target: 'wiki',
type: 'removed',
key: 'source-page',
rawPaths: ['pages/delete.json'],
}),
}),
]),
}),
});
expect(failureReport.body.workUnits).toEqual(
expect.arrayContaining([
expect.objectContaining({
unitKey: 'delete-target-page',
actions: expect.arrayContaining([
expect.objectContaining({
target: 'wiki',
type: 'removed',
key: 'source-page',
rawPaths: ['pages/delete.json'],
}),
]),
}),
]),
);
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects WorkUnit patches that touch unauthorized semantic-layer target connections', async () => {
const runtime = await makeRealGitRuntime();
try {

View file

@ -669,6 +669,90 @@ export class IngestBundleRunner {
});
}
private removedWikiPageKeysFromActions(actions: MemoryAction[]): string[] {
return this.uniqueWikiPageKeys(
actions.filter((action) => action.target === 'wiki' && action.type === 'removed').map((action) => action.key),
);
}
private finalGateActionOrigins(input: {
stageIndex: StageIndex;
reconcileActions: MemoryAction[];
fallbackConnectionId: string;
}) {
const actionContext = (action: MemoryAction, fallbackRawPaths: string[]) => ({
target: action.target,
type: action.type,
key: action.key,
detail: action.detail,
rawPaths: rawPathsForAction(action, fallbackRawPaths),
...(action.target === 'sl' ? { targetConnectionId: actionTargetConnectionId(action, input.fallbackConnectionId) } : {}),
});
return [
...input.stageIndex.workUnits.flatMap((workUnit, unitIndex) =>
workUnit.actions.map((action, actionIndex) => ({
source: 'work_unit_action',
unitKey: workUnit.unitKey,
unitIndex,
unitRawFiles: workUnit.rawFiles,
actionIndex,
action: actionContext(action, workUnit.rawFiles),
})),
),
...input.reconcileActions.map((action, actionIndex) => ({
source: 'reconciliation_action',
actionIndex,
action: actionContext(action, []),
})),
];
}
private async wikiPageKeysForFinalGates(input: {
wikiService: ReturnType<KnowledgeWikiService['forWorktree']>;
changedWikiPageKeys: string[];
touchedSlSources: TouchedSlSource[];
actions: MemoryAction[];
}): Promise<{
pageKeys: string[];
trace: {
global: boolean;
reasons: string[];
changedWikiPageKeys: string[];
removedWikiPageKeys: string[];
pageKeysValidated: string[];
};
}> {
const changedWikiPageKeys = this.uniqueWikiPageKeys(input.changedWikiPageKeys);
const removedWikiPageKeys = this.removedWikiPageKeysFromActions(input.actions);
const reasons: string[] = [];
if (input.touchedSlSources.length > 0) {
reasons.push('semantic_layer_changed');
}
if (removedWikiPageKeys.length > 0) {
reasons.push('wiki_page_removed');
}
let pageKeys = changedWikiPageKeys;
if (reasons.length > 0) {
pageKeys = this.uniqueWikiPageKeys([
...changedWikiPageKeys,
...(await input.wikiService.listPageKeys('GLOBAL', null)),
]);
}
return {
pageKeys,
trace: {
global: reasons.length > 0,
reasons,
changedWikiPageKeys,
removedWikiPageKeys,
pageKeysValidated: pageKeys,
},
};
}
private async runWorkUnitInWorktree(input: {
job: IngestBundleJob;
syncId: string;
@ -2087,7 +2171,7 @@ export class IngestBundleRunner {
preReconciliationSha && postReconciliationSha && preReconciliationSha !== postReconciliationSha
? (await sessionWorktree.git.diffNameStatus(preReconciliationSha, postReconciliationSha)).map((entry) => entry.path)
: [];
const finalChangedWikiPageKeys = this.uniqueWikiPageKeys([
const baseFinalChangedWikiPageKeys = this.uniqueWikiPageKeys([
...(isolatedDiffEnabled ? projectionChangedWikiPageKeys : []),
...workUnitOutcomes
.flatMap((outcome) => outcome.patchTouchedPaths ?? [])
@ -2103,6 +2187,13 @@ export class IngestBundleRunner {
...this.touchedSlSourcesFromPaths(postReconciliationPaths),
...(postProcessorOutcome?.touchedSources ?? []),
]);
const finalWikiGateScope = await this.wikiPageKeysForFinalGates({
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
changedWikiPageKeys: baseFinalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
actions: [...stageIndex.workUnits.flatMap((wu) => wu.actions), ...reconcileActions],
});
const finalChangedWikiPageKeys = finalWikiGateScope.pageKeys;
const finalTargetPolicyPaths = [
...projectionTouchedPaths,
@ -2128,9 +2219,15 @@ export class IngestBundleRunner {
const finalArtifactGateTraceData = {
changedWikiPageKeys: finalChangedWikiPageKeys,
wikiReferenceGateScope: finalWikiGateScope.trace,
touchedSlSources: finalTouchedSlSources,
projectionTouchedPaths,
workUnitPatchTouchedPaths: workUnitOutcomes.flatMap((outcome) => outcome.patchTouchedPaths ?? []),
actionOrigins: this.finalGateActionOrigins({
stageIndex,
reconcileActions,
fallbackConnectionId: job.connectionId,
}),
preReconciliationSha,
postReconciliationSha,
postReconciliationPaths,