fix(ingest): scope fallback recovery checks

This commit is contained in:
Luca Martial 2026-05-11 23:31:01 -07:00
parent da5e826691
commit aaa928e768
4 changed files with 122 additions and 6 deletions

View file

@ -1590,6 +1590,19 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['dbt-main', 'postgres-warehouse']);
});
it('does not resolve qualified fallback table refs by source name alone', async () => {
const deps = makeDeps();
deps.semanticLayerService.loadAllSources.mockResolvedValue([{ name: 'orders', table: 'sales.orders' }]);
const runner = buildRunner(deps);
await expect(
(runner as any).tableRefExistsInSemanticLayer(deps.semanticLayerService, ['warehouse'], 'finance.orders'),
).resolves.toBe(false);
await expect(
(runner as any).tableRefExistsInSemanticLayer(deps.semanticLayerService, ['warehouse'], 'sales.orders'),
).resolves.toBe(true);
});
it('passes relevant canonical pins into the reconciliation system prompt', async () => {
const deps = makeDeps();
deps.diffSetService.compute.mockResolvedValue({

View file

@ -105,18 +105,17 @@ function semanticSourceMatchesTableRef(source: SemanticLayerSource, tableRef: st
return false;
}
const refIsQualified = normalizedRef.includes('.');
const normalizedSourceName = normalizeTableReference(source.name);
if (normalizedSourceName === normalizedRef) {
return true;
}
const table = typeof source.table === 'string' ? normalizeTableReference(source.table) : '';
if (table && table === normalizedRef) {
if (table && (table === normalizedRef || table.endsWith(`.${normalizedRef}`))) {
return true;
}
const refIsQualified = normalizedRef.includes('.');
if (refIsQualified && normalizedSourceName === finalReferenceSegment(normalizedRef)) {
if (!refIsQualified && table && finalReferenceSegment(table) === normalizedRef) {
return true;
}

View file

@ -95,6 +95,30 @@ describe('tool transcript summaries', () => {
it('treats explicit unmapped fallback as recovery for guarded SL write failures', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
toolName: 'sl_write_source',
input: { connectionId: 'dbt-main', sourceName: 'stg_accounts' },
output: { structured: { success: false, sourceName: 'stg_accounts' } },
}),
);
recordToolTranscriptEntry(
summary,
entry({
toolName: 'emit_unmapped_fallback',
input: { rawPath: 'models/schema.yml', reason: 'no_physical_table', tableRef: 'stg_accounts', fallback: 'wiki_only' },
output: 'recorded unmapped fallback for models/schema.yml (wiki_only)',
}),
);
expect(summary.errorCount).toBe(1);
expect(summary.fatalErrorCount).toBe(0);
});
it('treats an untargeted unmapped fallback as recovery when there is only one pending SL failure', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
@ -116,6 +140,38 @@ describe('tool transcript summaries', () => {
expect(summary.fatalErrorCount).toBe(0);
});
it('keeps unrelated SL write failures fatal when one source gets an unmapped fallback', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');
recordToolTranscriptEntry(
summary,
entry({
toolName: 'sl_write_source',
input: { connectionId: 'dbt-main', sourceName: 'stg_accounts' },
output: { structured: { success: false, sourceName: 'stg_accounts' } },
}),
);
recordToolTranscriptEntry(
summary,
entry({
toolName: 'sl_write_source',
input: { connectionId: 'dbt-main', sourceName: 'stg_orders' },
output: { structured: { success: false, sourceName: 'stg_orders' } },
}),
);
recordToolTranscriptEntry(
summary,
entry({
toolName: 'emit_unmapped_fallback',
input: { rawPath: 'models/schema.yml', reason: 'no_physical_table', tableRef: 'stg_accounts', fallback: 'wiki_only' },
output: 'recorded unmapped fallback for models/schema.yml (wiki_only)',
}),
);
expect(summary.errorCount).toBe(2);
expect(summary.fatalErrorCount).toBe(1);
});
it('keeps thrown tool errors fatal even after a successful write', () => {
const summary = createMutableToolTranscriptSummary('wu-1', '/tmp/wu-1.jsonl');

View file

@ -51,8 +51,13 @@ export function recordToolTranscriptEntry(summary: MutableToolTranscriptSummary,
summary.recoverableFailureCounts.delete(recoveryKey);
}
if (entry.toolName === 'emit_unmapped_fallback') {
for (const key of [...summary.recoverableFailureCounts.keys()]) {
if (key.startsWith('sl:')) {
const fallbackTarget = fallbackSlTargetKey(entry);
const pendingSlKeys = [...summary.recoverableFailureCounts.keys()].filter((key) => key.startsWith('sl:'));
for (const key of pendingSlKeys) {
if (
(fallbackTarget && slFailureKeyMatchesFallback(key, fallbackTarget)) ||
(!fallbackTarget && pendingSlKeys.length === 1)
) {
summary.recoverableFailureCounts.delete(key);
}
}
@ -120,6 +125,49 @@ function slTargetKey(entry: ToolCallLogEntry): string | null {
return `sl:${connectionId}:${sourceName}`;
}
function fallbackSlTargetKey(entry: ToolCallLogEntry): { connectionId?: string; sourceName: string } | null {
const tableRef = stringField(entry.input, 'tableRef');
if (!tableRef) {
return null;
}
const sourceName = finalReferenceSegment(tableRef);
if (!sourceName) {
return null;
}
const connectionId = stringField(entry.input, 'connectionId');
return {
sourceName,
...(connectionId ? { connectionId } : {}),
};
}
function slFailureKeyMatchesFallback(
failureKey: string,
fallback: { connectionId?: string; sourceName: string },
): boolean {
const match = /^sl:([^:]*):(.*)$/.exec(failureKey);
if (!match) {
return false;
}
const [, connectionId, sourceName] = match;
if (fallback.connectionId && connectionId !== fallback.connectionId) {
return false;
}
return normalizeReferenceSegment(sourceName ?? '') === normalizeReferenceSegment(fallback.sourceName);
}
function finalReferenceSegment(value: string): string {
const normalized = value
.trim()
.replace(/["`]/g, '')
.replace(/[\[\]]/g, '');
return normalized.split('.').filter(Boolean).at(-1) ?? '';
}
function normalizeReferenceSegment(value: string): string {
return finalReferenceSegment(value).toLowerCase();
}
function recordField(value: unknown, field: string): Record<string, unknown> | null {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return null;