feat: require ingest verification ledger

This commit is contained in:
Andrey Avtomonov 2026-05-13 01:58:59 +02:00
parent 65faa96b58
commit 53c9a1eefa
6 changed files with 258 additions and 24 deletions

View file

@ -675,6 +675,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
});
}
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Reconciliation emits no warehouse identifiers before fallback recording.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.emit_conflict_resolution.execute(
{
kind: 'near_duplicate',
@ -843,6 +851,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
{ path: 'a.yml', startLine: 1, endLine: 2 },
{ toolCallId: 'read-1', messages: [] },
);
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Wiki write contains no warehouse identifiers.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.wiki_write.execute(
{ key: 'knowledge/a.md', content: 'safe summary' },
{ toolCallId: 'wiki-1', messages: [] },
@ -882,9 +898,9 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
{
unitKey: 'u1',
path: '/tmp/ktx-test/run/wu-transcripts/j1/u1.jsonl',
toolCallCount: 2,
toolCallCount: 3,
errorCount: 0,
toolNames: ['read_raw_span', 'wiki_write'],
toolNames: ['read_raw_span', 'record_verification_ledger', 'wiki_write'],
},
],
}),
@ -896,6 +912,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
const deps = makeDeps();
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Unmapped fallback records an unsupported conversion metric without verified warehouse identifiers.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.emit_unmapped_fallback.execute(
{
rawPath: 'a.yml',
@ -952,6 +976,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
});
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') {
await params.toolSet.record_verification_ledger.execute(
{
summary: 'Reconciliation records conflict, eviction, and fallback decisions without warehouse identifiers.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 'ledger-1', messages: [] },
);
await params.toolSet.emit_conflict_resolution.execute(
{
kind: 'near_duplicate',

View file

@ -107,6 +107,7 @@ describe('buildReconcileToolSet', () => {
'eviction_list',
'load_skill',
'read_raw_span',
'record_verification_ledger',
'sl_write_source',
'stage_diff',
'stage_list',
@ -114,4 +115,54 @@ describe('buildReconcileToolSet', () => {
].sort(),
);
});
it('requires the verification ledger before reconciliation write tools run', async () => {
const slWrite = vi.fn().mockResolvedValue({ markdown: 'written', structured: { success: true } });
const toolSet = buildReconcileToolSet({
loadSkillTool: { load_skill: { description: 'load', inputSchema: {} as any, execute: vi.fn() } } as any,
stageListTool: { stage_list: { description: 'stage list', inputSchema: {} as any, execute: vi.fn() } } as any,
stageDiffTool: { stage_diff: { description: 'stage diff', inputSchema: {} as any, execute: vi.fn() } } as any,
evictionListTool: {
eviction_list: { description: 'eviction list', inputSchema: {} as any, execute: vi.fn() },
} as any,
emitConflictResolutionTool: {
emit_conflict_resolution: { description: 'conflict', inputSchema: {} as any, execute: vi.fn() },
} as any,
emitEvictionDecisionTool: {
emit_eviction_decision: { description: 'eviction', inputSchema: {} as any, execute: vi.fn() },
} as any,
emitArtifactResolutionTool: {
emit_artifact_resolution: { description: 'resolution', inputSchema: {} as any, execute: vi.fn() },
} as any,
emitUnmappedFallbackTool: {
emit_unmapped_fallback: { description: 'fallback', inputSchema: {} as any, execute: vi.fn() },
} as any,
readRawSpanTool: { read_raw_span: { description: 'raw span', inputSchema: {} as any, execute: vi.fn() } } as any,
toolsetTools: { sl_write_source: { description: 'sl write', inputSchema: {} as any, execute: slWrite } as any },
});
const correction = await toolSet.sl_write_source.execute?.(
{ connectionId: 'warehouse', sourceName: 'accounts' },
{ toolCallId: 't1' } as any,
);
expect(slWrite).not.toHaveBeenCalled();
expect(correction).toMatchObject({ structured: { success: false, reason: 'verification_ledger_required' } });
await toolSet.record_verification_ledger.execute?.(
{
summary: 'Verified warehouse.accounts with entity_details.',
verifiedIdentifiers: ['warehouse.accounts'],
unverifiedIdentifiers: [],
},
{ toolCallId: 't2' } as any,
);
const written = await toolSet.sl_write_source.execute?.(
{ connectionId: 'warehouse', sourceName: 'accounts' },
{ toolCallId: 't3' } as any,
);
expect(slWrite).toHaveBeenCalledTimes(1);
expect(written).toMatchObject({ structured: { success: true } });
});
});

View file

@ -1,5 +1,10 @@
import type { Tool, ToolSet } from 'ai';
import { buildCanonicalPinsPromptBlock, type CanonicalPin } from '../canonical-pins.js';
import {
createVerificationLedgerState,
VERIFICATION_LEDGER_PROMPT,
withVerificationLedger,
} from '../tools/verification-ledger.tool.js';
import type { EvictionUnit } from '../types.js';
import type { StageIndex } from './stage-index.types.js';
@ -12,6 +17,7 @@ export function buildReconcileSystemPrompt(params: {
}): string {
return [
params.baseFraming.trimEnd(),
VERIFICATION_LEDGER_PROMPT,
params.skillsPrompt.trimEnd(),
buildCanonicalPinsPromptBlock(params.canonicalPins),
`\n<context>\nsyncId: ${params.syncId}\nsource: ${params.sourceKey}\n</context>`,
@ -188,16 +194,20 @@ export interface ReconcileToolSetInput {
}
export function buildReconcileToolSet(input: ReconcileToolSetInput): ToolSet {
return {
...input.toolsetTools,
...input.loadSkillTool,
...input.stageListTool,
...input.stageDiffTool,
...input.evictionListTool,
...input.emitConflictResolutionTool,
...input.emitEvictionDecisionTool,
...input.emitArtifactResolutionTool,
...input.emitUnmappedFallbackTool,
...input.readRawSpanTool,
};
const state = createVerificationLedgerState();
return withVerificationLedger(
{
...input.toolsetTools,
...input.loadSkillTool,
...input.stageListTool,
...input.stageDiffTool,
...input.evictionListTool,
...input.emitConflictResolutionTool,
...input.emitEvictionDecisionTool,
...input.emitArtifactResolutionTool,
...input.emitUnmappedFallbackTool,
...input.readRawSpanTool,
},
state,
);
}

View file

@ -68,12 +68,45 @@ describe('buildWuToolSet', () => {
'load_skill',
'read_raw_file',
'read_raw_span',
'record_verification_ledger',
'sl_write_source',
'wiki_search',
].sort(),
);
});
it('requires the verification ledger before write-capable tools run', async () => {
const wikiWrite = vi.fn().mockResolvedValue({ markdown: 'written', structured: { success: true } });
const toolSet = buildWuToolSet({
stagedDir: '/tmp/staged',
wu: { unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] },
loadSkillTool: { load_skill: { description: 'load', inputSchema: {} as any, execute: vi.fn() } } as any,
emitUnmappedFallbackTool: {
emit_unmapped_fallback: { description: 'fallback', inputSchema: {} as any, execute: vi.fn() },
} as any,
toolsetTools: { wiki_write: { description: 'write', inputSchema: {} as any, execute: wikiWrite } as any },
});
const correction = await toolSet.wiki_write.execute?.({ key: 'customer-rules' }, { toolCallId: 't1' } as any);
expect(wikiWrite).not.toHaveBeenCalled();
expect(correction).toMatchObject({ structured: { success: false, reason: 'verification_ledger_required' } });
expect(String((correction as any).markdown)).toContain('record_verification_ledger');
await toolSet.record_verification_ledger.execute?.(
{
summary: 'No warehouse identifiers will be emitted in this wiki write.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 't2' } as any,
);
const written = await toolSet.wiki_write.execute?.({ key: 'customer-rules' }, { toolCallId: 't3' } as any);
expect(wikiWrite).toHaveBeenCalledTimes(1);
expect(written).toMatchObject({ structured: { success: true } });
});
it('includes looker_query_to_sl only for Looker WorkUnits', () => {
const toolSet = buildWuToolSet({
sourceKey: 'looker',
@ -93,6 +126,7 @@ describe('buildWuToolSet', () => {
'looker_query_to_sl',
'read_raw_file',
'read_raw_span',
'record_verification_ledger',
'sl_write_source',
'wiki_search',
].sort(),

View file

@ -4,6 +4,11 @@ import { createLookerQueryToSlTool } from '../adapters/looker/tools/looker-query
import type { IngestProvenanceRow } from '../ports.js';
import { createReadRawFileTool } from '../tools/read-raw-file.tool.js';
import { createReadRawSpanTool } from '../tools/read-raw-span.tool.js';
import {
createVerificationLedgerState,
VERIFICATION_LEDGER_PROMPT,
withVerificationLedger,
} from '../tools/verification-ledger.tool.js';
import type { WorkUnit } from '../types.js';
const PEER_FILE_INDEX_PROMPT_LIMIT = 100;
@ -24,6 +29,7 @@ export function buildWuSystemPrompt(params: {
}): string {
const parts = [
params.baseFraming.trimEnd(),
VERIFICATION_LEDGER_PROMPT,
params.skillsPrompt.trimEnd(),
buildCanonicalPinsPromptBlock(params.canonicalPins ?? []),
`\n<context>\nsyncId: ${params.syncId}\nsource: ${params.sourceKey}\n</context>`,
@ -100,15 +106,19 @@ function withoutWriteSlTools(toolset: ToolSet, wu: WorkUnit): ToolSet {
export function buildWuToolSet(input: BuildWuToolSetInput): ToolSet {
const allowedPaths = new Set<string>([...input.wu.rawFiles, ...input.wu.dependencyPaths]);
const lookerTools: ToolSet = input.sourceKey === 'looker' ? { looker_query_to_sl: createLookerQueryToSlTool() } : {};
return withoutWriteSlTools(
{
...input.toolsetTools,
...lookerTools,
...input.loadSkillTool,
...input.emitUnmappedFallbackTool,
read_raw_file: createReadRawFileTool({ stagedDir: input.stagedDir, allowedPaths }),
read_raw_span: createReadRawSpanTool({ stagedDir: input.stagedDir, allowedPaths }),
},
input.wu,
const state = createVerificationLedgerState();
return withVerificationLedger(
withoutWriteSlTools(
{
...input.toolsetTools,
...lookerTools,
...input.loadSkillTool,
...input.emitUnmappedFallbackTool,
read_raw_file: createReadRawFileTool({ stagedDir: input.stagedDir, allowedPaths }),
read_raw_span: createReadRawSpanTool({ stagedDir: input.stagedDir, allowedPaths }),
},
input.wu,
),
state,
);
}

View file

@ -0,0 +1,97 @@
import { tool, type ToolExecuteFunction, type ToolExecutionOptions, type ToolSet } from 'ai';
import { z } from 'zod';
const verificationLedgerInputSchema = z.object({
summary: z.string().min(1).max(2000),
verifiedIdentifiers: z.array(z.string().min(1)).max(100).default([]),
unverifiedIdentifiers: z.array(z.string().min(1)).max(100).default([]),
notes: z.string().max(2000).optional(),
});
export interface VerificationLedgerEntry {
summary: string;
verifiedIdentifiers: string[];
unverifiedIdentifiers: string[];
notes?: string;
}
export interface VerificationLedgerState {
entries: VerificationLedgerEntry[];
}
const WRITE_TOOL_NAMES = new Set([
'wiki_write',
'wiki_remove',
'sl_write_source',
'sl_edit_source',
'emit_unmapped_fallback',
]);
export const VERIFICATION_LEDGER_PROMPT = `<pre_write_verification>
Before any write-capable tool call (wiki_write, wiki_remove, sl_write_source, sl_edit_source, emit_unmapped_fallback), call record_verification_ledger.
The ledger is a model-authored checkpoint, not a deterministic parser gate. Summarize the verification protocol from the loaded skill, list identifiers verified with discover_data/entity_details/sql_execution, and list anything intentionally left unverified. If the write contains no warehouse identifiers, say that explicitly.
If a write tool returns verification_ledger_required, complete the ledger and retry the write.
</pre_write_verification>`;
export function createVerificationLedgerState(): VerificationLedgerState {
return { entries: [] };
}
export function withVerificationLedger(tools: ToolSet, state: VerificationLedgerState): ToolSet {
const wrapped: ToolSet = {};
for (const [name, original] of Object.entries(tools)) {
if (!WRITE_TOOL_NAMES.has(name) || typeof original.execute !== 'function') {
wrapped[name] = original;
continue;
}
const originalExecute = original.execute;
const guardedExecute: ToolExecuteFunction<unknown, unknown> = async (
input: unknown,
opts: ToolExecutionOptions,
) => {
if (state.entries.length === 0) {
return verificationRequiredOutput(name);
}
return (originalExecute as ToolExecuteFunction<unknown, unknown>)(input, opts);
};
wrapped[name] = { ...original, execute: guardedExecute };
}
wrapped.record_verification_ledger = createRecordVerificationLedgerTool(state);
return wrapped;
}
function createRecordVerificationLedgerTool(state: VerificationLedgerState) {
return tool({
description:
'Record the pre-write verification ledger required by loaded ingest skills. Call this before wiki/SL/fallback writes to state what was verified, which tool calls support it, and what remains intentionally unverified.',
inputSchema: verificationLedgerInputSchema,
execute: async (input) => {
const entry = verificationLedgerInputSchema.parse(input);
state.entries.push(entry);
return {
markdown:
`Verification ledger recorded. Summary: ${entry.summary}\n` +
`Verified identifiers: ${entry.verifiedIdentifiers.length ? entry.verifiedIdentifiers.join(', ') : '(none)'}\n` +
`Unverified identifiers: ${
entry.unverifiedIdentifiers.length ? entry.unverifiedIdentifiers.join(', ') : '(none)'
}`,
structured: { success: true, entry },
};
},
});
}
function verificationRequiredOutput(toolName: string) {
return {
markdown:
`Pre-write verification required before calling ${toolName}. ` +
'Call record_verification_ledger first. In the ledger, summarize the loaded skill protocol you followed, ' +
'list identifiers verified via discover_data/entity_details/sql_execution, and list any identifiers intentionally left unverified. ' +
'If the write contains no warehouse identifiers, say that explicitly in the ledger summary.',
structured: {
success: false,
reason: 'verification_ledger_required',
toolName,
},
};
}