Surface dbt target SL sources

This commit is contained in:
Luca Martial 2026-05-11 22:28:42 -07:00
parent 626284c78d
commit 52ed9450ba
8 changed files with 115 additions and 7 deletions

View file

@ -48,4 +48,10 @@ describe('DbtSourceAdapter', () => {
it('implements fetch() for git-backed dbt source setup', () => {
expect(adapter.fetch).toBeTypeOf('function');
});
it('reports mapped warehouse targets for bundle SL discovery', async () => {
adapter = new DbtSourceAdapter({ targetConnectionIds: ['postgres-warehouse', 'postgres-warehouse'] });
await expect(adapter.listTargetConnectionIds?.(stagedDir)).resolves.toEqual(['postgres-warehouse']);
});
});

View file

@ -11,6 +11,7 @@ import { parseDbtStagedDir } from './parse.js';
interface DbtSourceAdapterOptions {
homeDir?: string;
targetConnectionIds?: string[];
}
export class DbtSourceAdapter implements SourceAdapter {
@ -24,6 +25,10 @@ export class DbtSourceAdapter implements SourceAdapter {
return detectDbtStagedDir(stagedDir);
}
async listTargetConnectionIds(_stagedDir: string): Promise<string[]> {
return [...new Set(this.options.targetConnectionIds ?? [])].sort((left, right) => left.localeCompare(right));
}
async fetch(pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise<void> {
const config = pullConfig as DbtPullConfig | undefined;
if (!config?.repoUrl) {

View file

@ -184,7 +184,11 @@ const makeDeps = () => {
.mockImplementation((connectionId: string) =>
Promise.resolve(connectionId === 'warehouse-2' ? ['looker__orders.yaml'] : []),
),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi
.fn()
.mockImplementation((connectionId: string) =>
Promise.resolve(connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : []),
),
};
const slSearchService = {
indexSources: vi.fn().mockResolvedValue(undefined),
@ -1261,8 +1265,8 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu',
);
expect(deps.adapter.listTargetConnectionIds).toHaveBeenCalledWith('/tmp/stage/upload-x');
expect(deps.semanticLayerService.listFilesForConnection).toHaveBeenCalledWith('looker-run');
expect(deps.semanticLayerService.listFilesForConnection).toHaveBeenCalledWith('warehouse-2');
expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('looker-run');
expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse-2');
expect(workUnitCall?.[0].userPrompt).toContain('looker__orders');
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['looker-run', 'warehouse-2']);
});
@ -1556,6 +1560,36 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(deps.knowledgeIndex.listPagesForUser).toHaveBeenCalledWith('system');
});
it('includes manifest-backed target sources in WorkUnit prompts', async () => {
const deps = makeDeps();
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['postgres-warehouse']);
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve(connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : []),
);
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['models/schema.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/dbt-main/dbt/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'dbt-main',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
const workUnitCall = deps.agentRunner.runLoop.mock.calls.find(
([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu',
);
expect(workUnitCall?.[0].userPrompt).toContain('## postgres-warehouse');
expect(workUnitCall?.[0].userPrompt).toContain('stg_accounts');
expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['dbt-main', 'postgres-warehouse']);
});
it('passes relevant canonical pins into the reconciliation system prompt', async () => {
const deps = makeDeps();
deps.diffSetService.compute.mockResolvedValue({

View file

@ -259,12 +259,22 @@ export class IngestBundleRunner {
const blocks = await Promise.all(
connectionIds.map(async (connectionId) => {
try {
const files = await this.deps.semanticLayerService.listFilesForConnection(connectionId);
const names = files.filter((f) => !f.startsWith('_schema/')).map((f) => f.replace(/\.yaml$/, ''));
const sources = await this.deps.semanticLayerService.loadAllSources(connectionId);
const names = sources.map((source) => source.name).sort((left, right) => left.localeCompare(right));
const body = names.length > 0 ? names.join('\n') : '(no sources yet)';
return `## ${connectionId}\n${body}`;
} catch {
return `## ${connectionId}\n(empty)`;
try {
const files = await this.deps.semanticLayerService.listFilesForConnection(connectionId);
const names = files
.filter((f) => !f.startsWith('_schema/'))
.map((f) => f.replace(/\.yaml$/, ''))
.sort((left, right) => left.localeCompare(right));
const body = names.length > 0 ? names.join('\n') : '(no sources yet)';
return `## ${connectionId}\n${body}`;
} catch {
return `## ${connectionId}\n(empty)`;
}
}
}),
);

View file

@ -466,6 +466,38 @@ describe('local ingest adapters', () => {
});
});
it('exposes configured primary warehouses as dbt target connections', async () => {
const dbtProject: KtxLocalProject = {
...projectWithConnections({
warehouse: {
driver: 'postgres',
url: 'postgresql://example/db',
},
analytics_dbt: {
driver: 'dbt',
source_dir: '/repo/dbt',
},
}),
config: {
...project.config,
setup: { database_connection_ids: ['warehouse'], completed_steps: [] },
connections: {
warehouse: {
driver: 'postgres',
url: 'postgresql://example/db',
},
analytics_dbt: {
driver: 'dbt',
source_dir: '/repo/dbt',
},
},
},
};
const adapter = createDefaultLocalIngestAdapters(dbtProject).find((candidate) => candidate.source === 'dbt');
await expect(adapter?.listTargetConnectionIds?.('/tmp/staged-dbt')).resolves.toEqual(['warehouse']);
});
it('resolves MetricFlow auth_token_ref without writing literal tokens to config', async () => {
const project = projectWithConnections({
metricflow_main: {

View file

@ -76,7 +76,10 @@ export function createDefaultLocalIngestAdapters(
}),
}),
new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }),
new DbtSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }),
new DbtSourceAdapter({
homeDir: join(project.projectDir, '.ktx/cache'),
targetConnectionIds: primaryWarehouseConnectionIds(project),
}),
createLocalMetabaseSourceAdapter(project),
new LookerSourceAdapter({
clientFactory: {
@ -111,6 +114,21 @@ export function createDefaultLocalIngestAdapters(
return adapters;
}
function primaryWarehouseConnectionIds(project: KtxLocalProject): string[] {
const configuredPrimaryIds = project.config.setup?.database_connection_ids ?? [];
const configured = configuredPrimaryIds.filter((connectionId) =>
Boolean(localConnectionToWarehouseDescriptor(connectionId, project.config.connections[connectionId])),
);
if (configured.length > 0) {
return [...new Set(configured)];
}
return Object.entries(project.config.connections)
.filter(([connectionId, connection]) => Boolean(localConnectionToWarehouseDescriptor(connectionId, connection)))
.map(([connectionId]) => connectionId)
.sort((left, right) => left.localeCompare(right));
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}