From 53c9a1eefa992b76bbf82ad0fbdd974d8c8cd56b Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Wed, 13 May 2026 01:58:59 +0200 Subject: [PATCH] feat: require ingest verification ledger --- .../src/ingest/ingest-bundle.runner.test.ts | 36 ++++++- .../stages/build-reconcile-context.test.ts | 51 ++++++++++ .../ingest/stages/build-reconcile-context.ts | 34 ++++--- .../ingest/stages/build-wu-context.test.ts | 34 +++++++ .../src/ingest/stages/build-wu-context.ts | 30 ++++-- .../ingest/tools/verification-ledger.tool.ts | 97 +++++++++++++++++++ 6 files changed, 258 insertions(+), 24 deletions(-) create mode 100644 packages/context/src/ingest/tools/verification-ledger.tool.ts diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index dc48b18c..6134fbe7 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -675,6 +675,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { }); } if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') { + await params.toolSet.record_verification_ledger.execute( + { + summary: 'Reconciliation emits no warehouse identifiers before fallback recording.', + verifiedIdentifiers: [], + unverifiedIdentifiers: [], + }, + { toolCallId: 'ledger-1', messages: [] }, + ); await params.toolSet.emit_conflict_resolution.execute( { kind: 'near_duplicate', @@ -843,6 +851,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { { path: 'a.yml', startLine: 1, endLine: 2 }, { toolCallId: 'read-1', messages: [] }, ); + await params.toolSet.record_verification_ledger.execute( + { + summary: 'Wiki write contains no warehouse identifiers.', + verifiedIdentifiers: [], + unverifiedIdentifiers: [], + }, + { toolCallId: 'ledger-1', messages: [] }, + ); await params.toolSet.wiki_write.execute( { key: 'knowledge/a.md', content: 'safe summary' }, { toolCallId: 'wiki-1', messages: [] }, @@ -882,9 +898,9 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { { unitKey: 'u1', path: '/tmp/ktx-test/run/wu-transcripts/j1/u1.jsonl', - toolCallCount: 2, + toolCallCount: 3, errorCount: 0, - toolNames: ['read_raw_span', 'wiki_write'], + toolNames: ['read_raw_span', 'record_verification_ledger', 'wiki_write'], }, ], }), @@ -896,6 +912,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { const deps = makeDeps(); deps.agentRunner.runLoop.mockImplementation(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-bundle-wu') { + await params.toolSet.record_verification_ledger.execute( + { + summary: 'Unmapped fallback records an unsupported conversion metric without verified warehouse identifiers.', + verifiedIdentifiers: [], + unverifiedIdentifiers: [], + }, + { toolCallId: 'ledger-1', messages: [] }, + ); await params.toolSet.emit_unmapped_fallback.execute( { rawPath: 'a.yml', @@ -952,6 +976,14 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { }); deps.agentRunner.runLoop.mockImplementation(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-bundle-reconcile') { + await params.toolSet.record_verification_ledger.execute( + { + summary: 'Reconciliation records conflict, eviction, and fallback decisions without warehouse identifiers.', + verifiedIdentifiers: [], + unverifiedIdentifiers: [], + }, + { toolCallId: 'ledger-1', messages: [] }, + ); await params.toolSet.emit_conflict_resolution.execute( { kind: 'near_duplicate', diff --git a/packages/context/src/ingest/stages/build-reconcile-context.test.ts b/packages/context/src/ingest/stages/build-reconcile-context.test.ts index 7db0bb23..9ac95356 100644 --- a/packages/context/src/ingest/stages/build-reconcile-context.test.ts +++ b/packages/context/src/ingest/stages/build-reconcile-context.test.ts @@ -107,6 +107,7 @@ describe('buildReconcileToolSet', () => { 'eviction_list', 'load_skill', 'read_raw_span', + 'record_verification_ledger', 'sl_write_source', 'stage_diff', 'stage_list', @@ -114,4 +115,54 @@ describe('buildReconcileToolSet', () => { ].sort(), ); }); + + it('requires the verification ledger before reconciliation write tools run', async () => { + const slWrite = vi.fn().mockResolvedValue({ markdown: 'written', structured: { success: true } }); + const toolSet = buildReconcileToolSet({ + loadSkillTool: { load_skill: { description: 'load', inputSchema: {} as any, execute: vi.fn() } } as any, + stageListTool: { stage_list: { description: 'stage list', inputSchema: {} as any, execute: vi.fn() } } as any, + stageDiffTool: { stage_diff: { description: 'stage diff', inputSchema: {} as any, execute: vi.fn() } } as any, + evictionListTool: { + eviction_list: { description: 'eviction list', inputSchema: {} as any, execute: vi.fn() }, + } as any, + emitConflictResolutionTool: { + emit_conflict_resolution: { description: 'conflict', inputSchema: {} as any, execute: vi.fn() }, + } as any, + emitEvictionDecisionTool: { + emit_eviction_decision: { description: 'eviction', inputSchema: {} as any, execute: vi.fn() }, + } as any, + emitArtifactResolutionTool: { + emit_artifact_resolution: { description: 'resolution', inputSchema: {} as any, execute: vi.fn() }, + } as any, + emitUnmappedFallbackTool: { + emit_unmapped_fallback: { description: 'fallback', inputSchema: {} as any, execute: vi.fn() }, + } as any, + readRawSpanTool: { read_raw_span: { description: 'raw span', inputSchema: {} as any, execute: vi.fn() } } as any, + toolsetTools: { sl_write_source: { description: 'sl write', inputSchema: {} as any, execute: slWrite } as any }, + }); + + const correction = await toolSet.sl_write_source.execute?.( + { connectionId: 'warehouse', sourceName: 'accounts' }, + { toolCallId: 't1' } as any, + ); + + expect(slWrite).not.toHaveBeenCalled(); + expect(correction).toMatchObject({ structured: { success: false, reason: 'verification_ledger_required' } }); + + await toolSet.record_verification_ledger.execute?.( + { + summary: 'Verified warehouse.accounts with entity_details.', + verifiedIdentifiers: ['warehouse.accounts'], + unverifiedIdentifiers: [], + }, + { toolCallId: 't2' } as any, + ); + const written = await toolSet.sl_write_source.execute?.( + { connectionId: 'warehouse', sourceName: 'accounts' }, + { toolCallId: 't3' } as any, + ); + + expect(slWrite).toHaveBeenCalledTimes(1); + expect(written).toMatchObject({ structured: { success: true } }); + }); }); diff --git a/packages/context/src/ingest/stages/build-reconcile-context.ts b/packages/context/src/ingest/stages/build-reconcile-context.ts index 30ff6341..9533acbd 100644 --- a/packages/context/src/ingest/stages/build-reconcile-context.ts +++ b/packages/context/src/ingest/stages/build-reconcile-context.ts @@ -1,5 +1,10 @@ import type { Tool, ToolSet } from 'ai'; import { buildCanonicalPinsPromptBlock, type CanonicalPin } from '../canonical-pins.js'; +import { + createVerificationLedgerState, + VERIFICATION_LEDGER_PROMPT, + withVerificationLedger, +} from '../tools/verification-ledger.tool.js'; import type { EvictionUnit } from '../types.js'; import type { StageIndex } from './stage-index.types.js'; @@ -12,6 +17,7 @@ export function buildReconcileSystemPrompt(params: { }): string { return [ params.baseFraming.trimEnd(), + VERIFICATION_LEDGER_PROMPT, params.skillsPrompt.trimEnd(), buildCanonicalPinsPromptBlock(params.canonicalPins), `\n\nsyncId: ${params.syncId}\nsource: ${params.sourceKey}\n`, @@ -188,16 +194,20 @@ export interface ReconcileToolSetInput { } export function buildReconcileToolSet(input: ReconcileToolSetInput): ToolSet { - return { - ...input.toolsetTools, - ...input.loadSkillTool, - ...input.stageListTool, - ...input.stageDiffTool, - ...input.evictionListTool, - ...input.emitConflictResolutionTool, - ...input.emitEvictionDecisionTool, - ...input.emitArtifactResolutionTool, - ...input.emitUnmappedFallbackTool, - ...input.readRawSpanTool, - }; + const state = createVerificationLedgerState(); + return withVerificationLedger( + { + ...input.toolsetTools, + ...input.loadSkillTool, + ...input.stageListTool, + ...input.stageDiffTool, + ...input.evictionListTool, + ...input.emitConflictResolutionTool, + ...input.emitEvictionDecisionTool, + ...input.emitArtifactResolutionTool, + ...input.emitUnmappedFallbackTool, + ...input.readRawSpanTool, + }, + state, + ); } diff --git a/packages/context/src/ingest/stages/build-wu-context.test.ts b/packages/context/src/ingest/stages/build-wu-context.test.ts index 13a3ff8f..db17154e 100644 --- a/packages/context/src/ingest/stages/build-wu-context.test.ts +++ b/packages/context/src/ingest/stages/build-wu-context.test.ts @@ -68,12 +68,45 @@ describe('buildWuToolSet', () => { 'load_skill', 'read_raw_file', 'read_raw_span', + 'record_verification_ledger', 'sl_write_source', 'wiki_search', ].sort(), ); }); + it('requires the verification ledger before write-capable tools run', async () => { + const wikiWrite = vi.fn().mockResolvedValue({ markdown: 'written', structured: { success: true } }); + const toolSet = buildWuToolSet({ + stagedDir: '/tmp/staged', + wu: { unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] }, + loadSkillTool: { load_skill: { description: 'load', inputSchema: {} as any, execute: vi.fn() } } as any, + emitUnmappedFallbackTool: { + emit_unmapped_fallback: { description: 'fallback', inputSchema: {} as any, execute: vi.fn() }, + } as any, + toolsetTools: { wiki_write: { description: 'write', inputSchema: {} as any, execute: wikiWrite } as any }, + }); + + const correction = await toolSet.wiki_write.execute?.({ key: 'customer-rules' }, { toolCallId: 't1' } as any); + + expect(wikiWrite).not.toHaveBeenCalled(); + expect(correction).toMatchObject({ structured: { success: false, reason: 'verification_ledger_required' } }); + expect(String((correction as any).markdown)).toContain('record_verification_ledger'); + + await toolSet.record_verification_ledger.execute?.( + { + summary: 'No warehouse identifiers will be emitted in this wiki write.', + verifiedIdentifiers: [], + unverifiedIdentifiers: [], + }, + { toolCallId: 't2' } as any, + ); + const written = await toolSet.wiki_write.execute?.({ key: 'customer-rules' }, { toolCallId: 't3' } as any); + + expect(wikiWrite).toHaveBeenCalledTimes(1); + expect(written).toMatchObject({ structured: { success: true } }); + }); + it('includes looker_query_to_sl only for Looker WorkUnits', () => { const toolSet = buildWuToolSet({ sourceKey: 'looker', @@ -93,6 +126,7 @@ describe('buildWuToolSet', () => { 'looker_query_to_sl', 'read_raw_file', 'read_raw_span', + 'record_verification_ledger', 'sl_write_source', 'wiki_search', ].sort(), diff --git a/packages/context/src/ingest/stages/build-wu-context.ts b/packages/context/src/ingest/stages/build-wu-context.ts index 6ba26fd7..bfa1bd9c 100644 --- a/packages/context/src/ingest/stages/build-wu-context.ts +++ b/packages/context/src/ingest/stages/build-wu-context.ts @@ -4,6 +4,11 @@ import { createLookerQueryToSlTool } from '../adapters/looker/tools/looker-query import type { IngestProvenanceRow } from '../ports.js'; import { createReadRawFileTool } from '../tools/read-raw-file.tool.js'; import { createReadRawSpanTool } from '../tools/read-raw-span.tool.js'; +import { + createVerificationLedgerState, + VERIFICATION_LEDGER_PROMPT, + withVerificationLedger, +} from '../tools/verification-ledger.tool.js'; import type { WorkUnit } from '../types.js'; const PEER_FILE_INDEX_PROMPT_LIMIT = 100; @@ -24,6 +29,7 @@ export function buildWuSystemPrompt(params: { }): string { const parts = [ params.baseFraming.trimEnd(), + VERIFICATION_LEDGER_PROMPT, params.skillsPrompt.trimEnd(), buildCanonicalPinsPromptBlock(params.canonicalPins ?? []), `\n\nsyncId: ${params.syncId}\nsource: ${params.sourceKey}\n`, @@ -100,15 +106,19 @@ function withoutWriteSlTools(toolset: ToolSet, wu: WorkUnit): ToolSet { export function buildWuToolSet(input: BuildWuToolSetInput): ToolSet { const allowedPaths = new Set([...input.wu.rawFiles, ...input.wu.dependencyPaths]); const lookerTools: ToolSet = input.sourceKey === 'looker' ? { looker_query_to_sl: createLookerQueryToSlTool() } : {}; - return withoutWriteSlTools( - { - ...input.toolsetTools, - ...lookerTools, - ...input.loadSkillTool, - ...input.emitUnmappedFallbackTool, - read_raw_file: createReadRawFileTool({ stagedDir: input.stagedDir, allowedPaths }), - read_raw_span: createReadRawSpanTool({ stagedDir: input.stagedDir, allowedPaths }), - }, - input.wu, + const state = createVerificationLedgerState(); + return withVerificationLedger( + withoutWriteSlTools( + { + ...input.toolsetTools, + ...lookerTools, + ...input.loadSkillTool, + ...input.emitUnmappedFallbackTool, + read_raw_file: createReadRawFileTool({ stagedDir: input.stagedDir, allowedPaths }), + read_raw_span: createReadRawSpanTool({ stagedDir: input.stagedDir, allowedPaths }), + }, + input.wu, + ), + state, ); } diff --git a/packages/context/src/ingest/tools/verification-ledger.tool.ts b/packages/context/src/ingest/tools/verification-ledger.tool.ts new file mode 100644 index 00000000..f99e79be --- /dev/null +++ b/packages/context/src/ingest/tools/verification-ledger.tool.ts @@ -0,0 +1,97 @@ +import { tool, type ToolExecuteFunction, type ToolExecutionOptions, type ToolSet } from 'ai'; +import { z } from 'zod'; + +const verificationLedgerInputSchema = z.object({ + summary: z.string().min(1).max(2000), + verifiedIdentifiers: z.array(z.string().min(1)).max(100).default([]), + unverifiedIdentifiers: z.array(z.string().min(1)).max(100).default([]), + notes: z.string().max(2000).optional(), +}); + +export interface VerificationLedgerEntry { + summary: string; + verifiedIdentifiers: string[]; + unverifiedIdentifiers: string[]; + notes?: string; +} + +export interface VerificationLedgerState { + entries: VerificationLedgerEntry[]; +} + +const WRITE_TOOL_NAMES = new Set([ + 'wiki_write', + 'wiki_remove', + 'sl_write_source', + 'sl_edit_source', + 'emit_unmapped_fallback', +]); + +export const VERIFICATION_LEDGER_PROMPT = ` +Before any write-capable tool call (wiki_write, wiki_remove, sl_write_source, sl_edit_source, emit_unmapped_fallback), call record_verification_ledger. +The ledger is a model-authored checkpoint, not a deterministic parser gate. Summarize the verification protocol from the loaded skill, list identifiers verified with discover_data/entity_details/sql_execution, and list anything intentionally left unverified. If the write contains no warehouse identifiers, say that explicitly. +If a write tool returns verification_ledger_required, complete the ledger and retry the write. +`; + +export function createVerificationLedgerState(): VerificationLedgerState { + return { entries: [] }; +} + +export function withVerificationLedger(tools: ToolSet, state: VerificationLedgerState): ToolSet { + const wrapped: ToolSet = {}; + for (const [name, original] of Object.entries(tools)) { + if (!WRITE_TOOL_NAMES.has(name) || typeof original.execute !== 'function') { + wrapped[name] = original; + continue; + } + const originalExecute = original.execute; + const guardedExecute: ToolExecuteFunction = async ( + input: unknown, + opts: ToolExecutionOptions, + ) => { + if (state.entries.length === 0) { + return verificationRequiredOutput(name); + } + return (originalExecute as ToolExecuteFunction)(input, opts); + }; + wrapped[name] = { ...original, execute: guardedExecute }; + } + wrapped.record_verification_ledger = createRecordVerificationLedgerTool(state); + return wrapped; +} + +function createRecordVerificationLedgerTool(state: VerificationLedgerState) { + return tool({ + description: + 'Record the pre-write verification ledger required by loaded ingest skills. Call this before wiki/SL/fallback writes to state what was verified, which tool calls support it, and what remains intentionally unverified.', + inputSchema: verificationLedgerInputSchema, + execute: async (input) => { + const entry = verificationLedgerInputSchema.parse(input); + state.entries.push(entry); + return { + markdown: + `Verification ledger recorded. Summary: ${entry.summary}\n` + + `Verified identifiers: ${entry.verifiedIdentifiers.length ? entry.verifiedIdentifiers.join(', ') : '(none)'}\n` + + `Unverified identifiers: ${ + entry.unverifiedIdentifiers.length ? entry.unverifiedIdentifiers.join(', ') : '(none)' + }`, + structured: { success: true, entry }, + }; + }, + }); +} + +function verificationRequiredOutput(toolName: string) { + return { + markdown: + `Pre-write verification required before calling ${toolName}. ` + + 'Call record_verification_ledger first. In the ledger, summarize the loaded skill protocol you followed, ' + + 'list identifiers verified via discover_data/entity_details/sql_execution, and list any identifiers intentionally left unverified. ' + + 'If the write contains no warehouse identifiers, say that explicitly in the ledger summary.', + structured: { + success: false, + reason: 'verification_ledger_required', + toolName, + }, + }; +}