diff --git a/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md b/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md index 5fdde49b..e15b65d2 100644 --- a/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md +++ b/packages/context/prompts/memory_agent_bundle_ingest_work_unit.md @@ -24,6 +24,7 @@ All wiki writes go to the GLOBAL scope. Bundle ingests are not personal. The `wi - Do not read peer files; only files listed in `rawFiles` or `dependencyPaths` are accessible. `read_raw_file` will reject everything else. - Do not invent measures/joins/rules not declared in the raw files. - Do not invent physical column names or grain keys. For table-backed SL sources, every `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr` column must come from raw-file column declarations or warehouse-backed discovery (`wiki_sl_search`, `sl_discover`, `sl_describe_table`). If column names are not confirmed, capture the business context in wiki instead of writing a full SL source. +- Do not write context-source overlays into the context source connection just because that is the current WorkUnit connection. Use `sl_discover` across data sources and write the SL artifact to the warehouse/data-source connection that owns the matching manifest. If there is no confirmed target connection, use `emit_unmapped_fallback` and wiki capture. - Do not duplicate an artifact that prior provenance says you already produced; update it. - Do not silently accept a name collision with a prior WU's write when the formula differs. Trigger `ingest_triage`. diff --git a/packages/context/skills/dbt_ingest/SKILL.md b/packages/context/skills/dbt_ingest/SKILL.md index 0d189661..d9310873 100644 --- a/packages/context/skills/dbt_ingest/SKILL.md +++ b/packages/context/skills/dbt_ingest/SKILL.md @@ -23,6 +23,8 @@ Use this skill for **uploaded** dbt projects (`dbt_project.yml` at stage root, ` dbt YAML is documentation and test metadata; it is not permission to invent physical columns. Before writing any table-backed SL source, confirm the real warehouse shape with `wiki_sl_search`, `sl_discover`, or `sl_describe_table` and use only confirmed column names in `columns:`, `grain:`, `joins:`, `segments:`, and `measures[].expr`. +For dbt context-source ingest, the dbt connection is usually not the warehouse connection. Call `sl_discover` without `connectionId` first, then write overlays to the connection that owns the matching manifest-backed source (for example `postgres-warehouse`), not to the dbt connection (for example `dbt-main`). If no matching manifest-backed source is visible on any warehouse connection, do not call `sl_write_source`; record `emit_unmapped_fallback` and keep the fact wiki-only. + If a `models:` entry has no `columns:` block, or the available raw files do not confirm the physical column names, do **not** synthesize a full standalone source. Write a wiki note or a description-only overlay for the resolved manifest table instead. If a business metric is described but its referenced column is not confirmed in the warehouse schema, omit the measure and capture the unresolved intent in the wiki. After every `sl_write_source`, call `sl_validate`. A validation error saying a declared column or measure reference is absent from the physical table is a hard stop: re-read the warehouse-backed source and rewrite with confirmed names, or remove the invalid SL fields. diff --git a/packages/context/src/ingest/adapters/dbt/dbt.adapter.test.ts b/packages/context/src/ingest/adapters/dbt/dbt.adapter.test.ts index dad64232..2851318e 100644 --- a/packages/context/src/ingest/adapters/dbt/dbt.adapter.test.ts +++ b/packages/context/src/ingest/adapters/dbt/dbt.adapter.test.ts @@ -48,4 +48,10 @@ describe('DbtSourceAdapter', () => { it('implements fetch() for git-backed dbt source setup', () => { expect(adapter.fetch).toBeTypeOf('function'); }); + + it('reports mapped warehouse targets for bundle SL discovery', async () => { + adapter = new DbtSourceAdapter({ targetConnectionIds: ['postgres-warehouse', 'postgres-warehouse'] }); + + await expect(adapter.listTargetConnectionIds?.(stagedDir)).resolves.toEqual(['postgres-warehouse']); + }); }); diff --git a/packages/context/src/ingest/adapters/dbt/dbt.adapter.ts b/packages/context/src/ingest/adapters/dbt/dbt.adapter.ts index 8ad71ac4..ef1c798c 100644 --- a/packages/context/src/ingest/adapters/dbt/dbt.adapter.ts +++ b/packages/context/src/ingest/adapters/dbt/dbt.adapter.ts @@ -11,6 +11,7 @@ import { parseDbtStagedDir } from './parse.js'; interface DbtSourceAdapterOptions { homeDir?: string; + targetConnectionIds?: string[]; } export class DbtSourceAdapter implements SourceAdapter { @@ -24,6 +25,10 @@ export class DbtSourceAdapter implements SourceAdapter { return detectDbtStagedDir(stagedDir); } + async listTargetConnectionIds(_stagedDir: string): Promise { + return [...new Set(this.options.targetConnectionIds ?? [])].sort((left, right) => left.localeCompare(right)); + } + async fetch(pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise { const config = pullConfig as DbtPullConfig | undefined; if (!config?.repoUrl) { diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index ead6704d..a02f1022 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -184,7 +184,11 @@ const makeDeps = () => { .mockImplementation((connectionId: string) => Promise.resolve(connectionId === 'warehouse-2' ? ['looker__orders.yaml'] : []), ), - loadAllSources: vi.fn().mockResolvedValue([]), + loadAllSources: vi + .fn() + .mockImplementation((connectionId: string) => + Promise.resolve(connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : []), + ), }; const slSearchService = { indexSources: vi.fn().mockResolvedValue(undefined), @@ -1261,8 +1265,8 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { ([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu', ); expect(deps.adapter.listTargetConnectionIds).toHaveBeenCalledWith('/tmp/stage/upload-x'); - expect(deps.semanticLayerService.listFilesForConnection).toHaveBeenCalledWith('looker-run'); - expect(deps.semanticLayerService.listFilesForConnection).toHaveBeenCalledWith('warehouse-2'); + expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('looker-run'); + expect(deps.semanticLayerService.loadAllSources).toHaveBeenCalledWith('warehouse-2'); expect(workUnitCall?.[0].userPrompt).toContain('looker__orders'); expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['looker-run', 'warehouse-2']); }); @@ -1556,6 +1560,36 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(deps.knowledgeIndex.listPagesForUser).toHaveBeenCalledWith('system'); }); + it('includes manifest-backed target sources in WorkUnit prompts', async () => { + const deps = makeDeps(); + deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['postgres-warehouse']); + deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) => + Promise.resolve(connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : []), + ); + + const runner = buildRunner(deps); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([['models/schema.yml', 'h1']]), + rawDirInWorktree: 'raw-sources/dbt-main/dbt/s', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); + + await runner.run({ + jobId: 'j1', + connectionId: 'dbt-main', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }); + + const workUnitCall = deps.agentRunner.runLoop.mock.calls.find( + ([params]: any[]) => params.telemetryTags.operationName === 'ingest-bundle-wu', + ); + expect(workUnitCall?.[0].userPrompt).toContain('## postgres-warehouse'); + expect(workUnitCall?.[0].userPrompt).toContain('stg_accounts'); + expect(deps.canonicalPins.listPins).toHaveBeenCalledWith(['dbt-main', 'postgres-warehouse']); + }); + it('passes relevant canonical pins into the reconciliation system prompt', async () => { const deps = makeDeps(); deps.diffSetService.compute.mockResolvedValue({ diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 5cc209cd..2d79db6c 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -259,12 +259,22 @@ export class IngestBundleRunner { const blocks = await Promise.all( connectionIds.map(async (connectionId) => { try { - const files = await this.deps.semanticLayerService.listFilesForConnection(connectionId); - const names = files.filter((f) => !f.startsWith('_schema/')).map((f) => f.replace(/\.yaml$/, '')); + const sources = await this.deps.semanticLayerService.loadAllSources(connectionId); + const names = sources.map((source) => source.name).sort((left, right) => left.localeCompare(right)); const body = names.length > 0 ? names.join('\n') : '(no sources yet)'; return `## ${connectionId}\n${body}`; } catch { - return `## ${connectionId}\n(empty)`; + try { + const files = await this.deps.semanticLayerService.listFilesForConnection(connectionId); + const names = files + .filter((f) => !f.startsWith('_schema/')) + .map((f) => f.replace(/\.yaml$/, '')) + .sort((left, right) => left.localeCompare(right)); + const body = names.length > 0 ? names.join('\n') : '(no sources yet)'; + return `## ${connectionId}\n${body}`; + } catch { + return `## ${connectionId}\n(empty)`; + } } }), ); diff --git a/packages/context/src/ingest/local-adapters.test.ts b/packages/context/src/ingest/local-adapters.test.ts index 48bb2a80..f8dd7da7 100644 --- a/packages/context/src/ingest/local-adapters.test.ts +++ b/packages/context/src/ingest/local-adapters.test.ts @@ -466,6 +466,38 @@ describe('local ingest adapters', () => { }); }); + it('exposes configured primary warehouses as dbt target connections', async () => { + const dbtProject: KtxLocalProject = { + ...projectWithConnections({ + warehouse: { + driver: 'postgres', + url: 'postgresql://example/db', + }, + analytics_dbt: { + driver: 'dbt', + source_dir: '/repo/dbt', + }, + }), + config: { + ...project.config, + setup: { database_connection_ids: ['warehouse'], completed_steps: [] }, + connections: { + warehouse: { + driver: 'postgres', + url: 'postgresql://example/db', + }, + analytics_dbt: { + driver: 'dbt', + source_dir: '/repo/dbt', + }, + }, + }, + }; + const adapter = createDefaultLocalIngestAdapters(dbtProject).find((candidate) => candidate.source === 'dbt'); + + await expect(adapter?.listTargetConnectionIds?.('/tmp/staged-dbt')).resolves.toEqual(['warehouse']); + }); + it('resolves MetricFlow auth_token_ref without writing literal tokens to config', async () => { const project = projectWithConnections({ metricflow_main: { diff --git a/packages/context/src/ingest/local-adapters.ts b/packages/context/src/ingest/local-adapters.ts index 93d6b063..5ab92eaa 100644 --- a/packages/context/src/ingest/local-adapters.ts +++ b/packages/context/src/ingest/local-adapters.ts @@ -76,7 +76,10 @@ export function createDefaultLocalIngestAdapters( }), }), new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), - new DbtSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache') }), + new DbtSourceAdapter({ + homeDir: join(project.projectDir, '.ktx/cache'), + targetConnectionIds: primaryWarehouseConnectionIds(project), + }), createLocalMetabaseSourceAdapter(project), new LookerSourceAdapter({ clientFactory: { @@ -111,6 +114,21 @@ export function createDefaultLocalIngestAdapters( return adapters; } +function primaryWarehouseConnectionIds(project: KtxLocalProject): string[] { + const configuredPrimaryIds = project.config.setup?.database_connection_ids ?? []; + const configured = configuredPrimaryIds.filter((connectionId) => + Boolean(localConnectionToWarehouseDescriptor(connectionId, project.config.connections[connectionId])), + ); + if (configured.length > 0) { + return [...new Set(configured)]; + } + + return Object.entries(project.config.connections) + .filter(([connectionId, connection]) => Boolean(localConnectionToWarehouseDescriptor(connectionId, connection))) + .map(([connectionId]) => connectionId) + .sort((left, right) => left.localeCompare(right)); +} + function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); }