From aaa928e768dbcba2482861b0b7994ddf73a86cd9 Mon Sep 17 00:00:00 2001 From: Luca Martial Date: Mon, 11 May 2026 23:31:01 -0700 Subject: [PATCH] fix(ingest): scope fallback recovery checks --- .../src/ingest/ingest-bundle.runner.test.ts | 13 +++++ .../src/ingest/ingest-bundle.runner.ts | 7 +-- .../tools/tool-transcript-summary.test.ts | 56 +++++++++++++++++++ .../ingest/tools/tool-transcript-summary.ts | 52 ++++++++++++++++- 4 files changed, 122 insertions(+), 6 deletions(-) diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index a02f1022..a4513f63 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -1590,6 +1590,19 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['dbt-main', 'postgres-warehouse']); }); + it('does not resolve qualified fallback table refs by source name alone', async () => { + const deps = makeDeps(); + deps.semanticLayerService.loadAllSources.mockResolvedValue([{ name: 'orders', table: 'sales.orders' }]); + const runner = buildRunner(deps); + + await expect( + (runner as any).tableRefExistsInSemanticLayer(deps.semanticLayerService, ['warehouse'], 'finance.orders'), + ).resolves.toBe(false); + await expect( + (runner as any).tableRefExistsInSemanticLayer(deps.semanticLayerService, ['warehouse'], 'sales.orders'), + ).resolves.toBe(true); + }); + it('passes relevant canonical pins into the reconciliation system prompt', async () => { const deps = makeDeps(); deps.diffSetService.compute.mockResolvedValue({ diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 822b9487..31b444bf 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -105,18 +105,17 @@ function semanticSourceMatchesTableRef(source: SemanticLayerSource, tableRef: st return false; } + const refIsQualified = normalizedRef.includes('.'); const normalizedSourceName = normalizeTableReference(source.name); if (normalizedSourceName === normalizedRef) { return true; } const table = typeof source.table === 'string' ? normalizeTableReference(source.table) : ''; - if (table && table === normalizedRef) { + if (table && (table === normalizedRef || table.endsWith(`.${normalizedRef}`))) { return true; } - - const refIsQualified = normalizedRef.includes('.'); - if (refIsQualified && normalizedSourceName === finalReferenceSegment(normalizedRef)) { + if (!refIsQualified && table && finalReferenceSegment(table) === normalizedRef) { return true; } diff --git a/packages/context/src/ingest/tools/tool-transcript-summary.test.ts b/packages/context/src/ingest/tools/tool-transcript-summary.test.ts index 9ca01626..bc836e97 100644 --- a/packages/context/src/ingest/tools/tool-transcript-summary.test.ts +++ b/packages/context/src/ingest/tools/tool-transcript-summary.test.ts @@ -95,6 +95,30 @@ describe('tool transcript summaries', () => { it('treats explicit unmapped fallback as recovery for guarded SL write failures', () => { const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl'); + recordToolTranscriptEntry( + summary, + entry({ + toolName: 'sl_write_source', + input: { connectionId: 'dbt-main', sourceName: 'stg_accounts' }, + output: { structured: { success: false, sourceName: 'stg_accounts' } }, + }), + ); + recordToolTranscriptEntry( + summary, + entry({ + toolName: 'emit_unmapped_fallback', + input: { rawPath: 'models/schema.yml', reason: 'no_physical_table', tableRef: 'stg_accounts', fallback: 'wiki_only' }, + output: 'recorded unmapped fallback for models/schema.yml (wiki_only)', + }), + ); + + expect(summary.errorCount).toBe(1); + expect(summary.fatalErrorCount).toBe(0); + }); + + it('treats an untargeted unmapped fallback as recovery when there is only one pending SL failure', () => { + const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl'); + recordToolTranscriptEntry( summary, entry({ @@ -116,6 +140,38 @@ describe('tool transcript summaries', () => { expect(summary.fatalErrorCount).toBe(0); }); + it('keeps unrelated SL write failures fatal when one source gets an unmapped fallback', () => { + const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl'); + + recordToolTranscriptEntry( + summary, + entry({ + toolName: 'sl_write_source', + input: { connectionId: 'dbt-main', sourceName: 'stg_accounts' }, + output: { structured: { success: false, sourceName: 'stg_accounts' } }, + }), + ); + recordToolTranscriptEntry( + summary, + entry({ + toolName: 'sl_write_source', + input: { connectionId: 'dbt-main', sourceName: 'stg_orders' }, + output: { structured: { success: false, sourceName: 'stg_orders' } }, + }), + ); + recordToolTranscriptEntry( + summary, + entry({ + toolName: 'emit_unmapped_fallback', + input: { rawPath: 'models/schema.yml', reason: 'no_physical_table', tableRef: 'stg_accounts', fallback: 'wiki_only' }, + output: 'recorded unmapped fallback for models/schema.yml (wiki_only)', + }), + ); + + expect(summary.errorCount).toBe(2); + expect(summary.fatalErrorCount).toBe(1); + }); + it('keeps thrown tool errors fatal even after a successful write', () => { const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl'); diff --git a/packages/context/src/ingest/tools/tool-transcript-summary.ts b/packages/context/src/ingest/tools/tool-transcript-summary.ts index 2c2593d2..de7ee668 100644 --- a/packages/context/src/ingest/tools/tool-transcript-summary.ts +++ b/packages/context/src/ingest/tools/tool-transcript-summary.ts @@ -51,8 +51,13 @@ export function recordToolTranscriptEntry(summary: MutableToolTranscriptSummary, summary.recoverableFailureCounts.delete(recoveryKey); } if (entry.toolName === 'emit_unmapped_fallback') { - for (const key of [...summary.recoverableFailureCounts.keys()]) { - if (key.startsWith('sl:')) { + const fallbackTarget = fallbackSlTargetKey(entry); + const pendingSlKeys = [...summary.recoverableFailureCounts.keys()].filter((key) => key.startsWith('sl:')); + for (const key of pendingSlKeys) { + if ( + (fallbackTarget && slFailureKeyMatchesFallback(key, fallbackTarget)) || + (!fallbackTarget && pendingSlKeys.length === 1) + ) { summary.recoverableFailureCounts.delete(key); } } @@ -120,6 +125,49 @@ function slTargetKey(entry: ToolCallLogEntry): string | null { return `sl:${connectionId}:${sourceName}`; } +function fallbackSlTargetKey(entry: ToolCallLogEntry): { connectionId?: string; sourceName: string } | null { + const tableRef = stringField(entry.input, 'tableRef'); + if (!tableRef) { + return null; + } + const sourceName = finalReferenceSegment(tableRef); + if (!sourceName) { + return null; + } + const connectionId = stringField(entry.input, 'connectionId'); + return { + sourceName, + ...(connectionId ? { connectionId } : {}), + }; +} + +function slFailureKeyMatchesFallback( + failureKey: string, + fallback: { connectionId?: string; sourceName: string }, +): boolean { + const match = /^sl:([^:]*):(.*)$/.exec(failureKey); + if (!match) { + return false; + } + const [, connectionId, sourceName] = match; + if (fallback.connectionId && connectionId !== fallback.connectionId) { + return false; + } + return normalizeReferenceSegment(sourceName ?? '') === normalizeReferenceSegment(fallback.sourceName); +} + +function finalReferenceSegment(value: string): string { + const normalized = value + .trim() + .replace(/["`]/g, '') + .replace(/[\[\]]/g, ''); + return normalized.split('.').filter(Boolean).at(-1) ?? ''; +} + +function normalizeReferenceSegment(value: string): string { + return finalReferenceSegment(value).toLowerCase(); +} + function recordField(value: unknown, field: string): Record | null { if (!value || typeof value !== 'object' || Array.isArray(value)) { return null;