diff --git a/packages/cli/src/commands/connection-mapping.test.ts b/packages/cli/src/commands/connection-mapping.test.ts index 7d76cc9d..825c3c4c 100644 --- a/packages/cli/src/commands/connection-mapping.test.ts +++ b/packages/cli/src/commands/connection-mapping.test.ts @@ -1,8 +1,8 @@ -import { mkdtemp, rm } from 'node:fs/promises'; +import { mkdtemp, readFile, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { LocalMetabaseSourceStateReader } from '@ktx/context/ingest'; -import { initKtxProject, loadKtxProject, serializeKtxProjectConfig } from '@ktx/context/project'; +import { LocalMetabaseDiscoveryCache } from '@ktx/context/ingest'; +import { initKtxProject, loadKtxProject, parseKtxProjectConfig, serializeKtxProjectConfig } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { runKtxConnectionMapping } from './connection-mapping.js'; @@ -79,19 +79,24 @@ describe('runKtxConnectionMapping', () => { it('sets, lists, disables, and clears local Metabase mappings', async () => { const io = makeIo(); - await expect( - runKtxConnectionMapping( - { - command: 'set', - projectDir, - connectionId: 'prod-metabase', - field: 'databaseMappings', - key: '1', - value: 'prod-warehouse', - }, - io.io, - ), - ).resolves.toBe(0); + const setCode = await runKtxConnectionMapping( + { + command: 'set', + projectDir, + connectionId: 'prod-metabase', + field: 'databaseMappings', + key: '1', + value: 'prod-warehouse', + }, + io.io, + ); + expect(setCode, io.stderr()).toBe(0); + + let config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toMatchObject({ + databaseMappings: { '1': 'prod-warehouse' }, + syncEnabled: { '1': true }, + }); const listIo = makeIo(); await expect( @@ -113,6 +118,12 @@ describe('runKtxConnectionMapping', () => { ), ).resolves.toBe(0); + config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toMatchObject({ + databaseMappings: { '1': 'prod-warehouse' }, + syncEnabled: { '1': false }, + }); + await expect( runKtxConnectionMapping( { @@ -124,6 +135,9 @@ describe('runKtxConnectionMapping', () => { makeIo().io, ), ).resolves.toBe(0); + + config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toBeUndefined(); }); it('lists Metabase yaml mapping bootstrap rows before any SQLite command writes', async () => { @@ -194,9 +208,11 @@ describe('runKtxConnectionMapping', () => { expect(io.stdout()).toContain('Discovery: 1 database'); expect(client.cleanup).toHaveBeenCalledTimes(1); - const store = new LocalMetabaseSourceStateReader({ dbPath: join(projectDir, '.ktx', 'db.sqlite') }); - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { metabaseDatabaseId: 1, metabaseDatabaseName: 'Analytics', source: 'refresh' }, + const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toBeUndefined(); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(projectDir, '.ktx', 'db.sqlite') }); + await expect(discoveryCache.listDiscoveredDatabases('prod-metabase')).resolves.toMatchObject([ + { id: 1, name: 'Analytics', engine: 'postgres' }, ]); }); diff --git a/packages/cli/src/commands/connection-mapping.ts b/packages/cli/src/commands/connection-mapping.ts index b35bf40f..9389ccd1 100644 --- a/packages/cli/src/commands/connection-mapping.ts +++ b/packages/cli/src/commands/connection-mapping.ts @@ -4,8 +4,9 @@ import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultLookerConnectionClientFactory, DefaultMetabaseConnectionClientFactory, + KtxYamlMetabaseSourceStateReader, LocalLookerRuntimeStore, - LocalMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, computeLookerMappingDrift, computeMetabaseMappingDrift, discoverLookerConnections, @@ -16,10 +17,19 @@ import { validateLookerMappings, validateMappingPhysicalMatch, type LookerMappingClient, + type LocalMetabaseMappingListRow, type MetabaseRuntimeClient, type MetabaseSyncMode, } from '@ktx/context/ingest'; -import { type KtxLocalProject, ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project'; +import { + type KtxLocalProject, + type KtxProjectConfig, + ktxLocalStateDbPath, + loadKtxProject, + parseMetabaseMappingBootstrap, + serializeKtxProjectConfig, + stripKtxSetupCompletedSteps, +} from '@ktx/context/project'; import type { KtxCliIo } from '../index.js'; import { profileMark } from '../startup-profile.js'; @@ -84,6 +94,89 @@ function parseId(value: string, label: string): number { return parsed; } +interface MetabaseMappingsBlock { + databaseMappings: Record; + syncEnabled: Record; + syncMode: MetabaseSyncMode; + selections: { collections: number[]; items: number[] }; + defaultTagNames: string[]; +} + +function currentMetabaseMappings(project: KtxLocalProject, connectionId: string): MetabaseMappingsBlock { + const connection = project.config.connections[connectionId]; + if (!connection) { + throw new Error(`Connection "${connectionId}" is not configured in ktx.yaml`); + } + const bootstrap = parseMetabaseMappingBootstrap(connectionId, connection); + return { + databaseMappings: { ...bootstrap.databaseMappings }, + syncEnabled: { ...bootstrap.syncEnabled }, + syncMode: bootstrap.syncMode, + selections: { + collections: [...bootstrap.selections.collections], + items: [...bootstrap.selections.items], + }, + defaultTagNames: [...bootstrap.defaultTagNames], + }; +} + +function hasMetabaseMappings(block: MetabaseMappingsBlock): boolean { + return ( + Object.keys(block.databaseMappings).length > 0 || + Object.keys(block.syncEnabled).length > 0 || + block.syncMode !== 'ALL' || + block.selections.collections.length > 0 || + block.selections.items.length > 0 || + block.defaultTagNames.length > 0 + ); +} + +function serializeMetabaseMappingsBlock(block: MetabaseMappingsBlock): Record | undefined { + if (!hasMetabaseMappings(block)) { + return undefined; + } + return { + databaseMappings: block.databaseMappings, + syncEnabled: block.syncEnabled, + syncMode: block.syncMode, + selections: block.selections, + defaultTagNames: block.defaultTagNames, + }; +} + +async function writeMetabaseMappings( + project: KtxLocalProject, + connectionId: string, + block: MetabaseMappingsBlock, + message: string, +): Promise { + const connection = project.config.connections[connectionId]; + if (!connection) { + throw new Error(`Connection "${connectionId}" is not configured in ktx.yaml`); + } + const mappings = serializeMetabaseMappingsBlock(block); + const nextConnection = { ...connection }; + if (mappings) { + nextConnection.mappings = mappings; + } else { + delete nextConnection.mappings; + } + const nextConfig: KtxProjectConfig = { + ...project.config, + connections: { + ...project.config.connections, + [connectionId]: nextConnection, + }, + }; + await project.fileStore.writeFile( + 'ktx.yaml', + serializeKtxProjectConfig(stripKtxSetupCompletedSteps(nextConfig)), + 'ktx', + 'ktx@example.com', + message, + ); +} + async function createDefaultMetabaseClient( project: KtxLocalProject, connectionId: string, @@ -149,9 +242,7 @@ function targetPhysicalInfo(project: KtxLocalProject, connectionId: string) { }; } -function renderMapping( - row: Awaited>[number], -): string { +function renderMapping(row: LocalMetabaseMappingListRow): string { const name = row.metabaseDatabaseName ?? 'unhydrated'; const target = row.targetConnectionId ?? '[unmapped]'; return `${row.metabaseDatabaseId} -> ${target} (${name}, sync: ${row.syncEnabled ? 'on' : 'off'}, source: ${ @@ -255,92 +346,78 @@ export async function runKtxConnectionMapping( } assertMetabaseConnection(project, args.connectionId); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + const metabaseStateReader = new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }); if (args.command === 'list') { - const rows = await store.listDatabaseMappings(args.connectionId); + const rows = await metabaseStateReader.listDatabaseMappings(args.connectionId); io.stdout.write(args.json ? `${JSON.stringify(rows, null, 2)}\n` : `${rows.map(renderMapping).join('\n')}\n`); return 0; } if (args.command === 'set') { + if (args.field !== 'databaseMappings') { + throw new Error('Metabase mapping set requires databaseMappings ='); + } assertTargetConnection(project, args.value); - await store.upsertDatabaseMapping({ - connectionId: args.connectionId, - metabaseDatabaseId: parseId(args.key, 'metabaseDatabaseId'), - targetConnectionId: args.value, - syncEnabled: true, - source: 'cli', - }); + const block = currentMetabaseMappings(project, args.connectionId); + const metabaseDatabaseId = String(parseId(args.key, 'metabaseDatabaseId')); + block.databaseMappings[metabaseDatabaseId] = args.value; + block.syncEnabled[metabaseDatabaseId] = true; + await writeMetabaseMappings(project, args.connectionId, block, `Set Metabase mapping ${args.connectionId}.${metabaseDatabaseId}`); io.stdout.write(`Set databaseMappings.${args.key} = ${args.value}\n`); return 0; } if (args.command === 'apply-bulk') { const payload = JSON.parse(await readFile(args.filePath, 'utf8')) as MetabaseBulkMappingPayload; - const existingState = await store.getSourceState(args.connectionId); - const existingRows = await store.listDatabaseMappings(args.connectionId); - const existingById = new Map(existingRows.map((row) => [row.metabaseDatabaseId, row])); + const block = currentMetabaseMappings(project, args.connectionId); const databaseMappings = payload.databaseMappings ?? {}; for (const targetConnectionId of Object.values(databaseMappings)) { if (targetConnectionId) { assertTargetConnection(project, targetConnectionId); } } - const mappingIds = new Set([ - ...existingRows.map((row) => row.metabaseDatabaseId), - ...Object.keys(databaseMappings).map((id) => parseId(id, 'metabaseDatabaseId')), - ...Object.keys(payload.syncEnabled ?? {}).map((id) => parseId(id, 'metabaseDatabaseId')), - ]); - await store.replaceSourceState({ - connectionId: args.connectionId, - syncMode: payload.syncMode ?? existingState.syncMode, - defaultTagNames: payload.defaultTagNames ?? existingState.defaultTagNames, - selections: - payload.selections === undefined - ? existingState.selections - : [ - ...(payload.selections.collections ?? []).map((id) => ({ - selectionType: 'collection' as const, - metabaseObjectId: id, - })), - ...(payload.selections.items ?? []).map((id) => ({ - selectionType: 'item' as const, - metabaseObjectId: id, - })), - ], - mappings: [...mappingIds] - .sort((a, b) => a - b) - .map((id) => { - const existing = existingById.get(id); - return { - metabaseDatabaseId: id, - metabaseDatabaseName: existing?.metabaseDatabaseName ?? null, - metabaseEngine: existing?.metabaseEngine ?? null, - metabaseHost: existing?.metabaseHost ?? null, - metabaseDbName: existing?.metabaseDbName ?? null, - targetConnectionId: databaseMappings[String(id)] ?? existing?.targetConnectionId ?? null, - syncEnabled: payload.syncEnabled?.[String(id)] ?? existing?.syncEnabled ?? false, - source: 'cli', - }; - }), - }); + for (const id of Object.keys(databaseMappings)) { + parseId(id, 'metabaseDatabaseId'); + block.databaseMappings[id] = databaseMappings[id] ?? null; + } + for (const [id, enabled] of Object.entries(payload.syncEnabled ?? {})) { + parseId(id, 'metabaseDatabaseId'); + block.syncEnabled[id] = enabled; + } + if (payload.syncMode !== undefined) { + block.syncMode = payload.syncMode; + } + if (payload.defaultTagNames !== undefined) { + block.defaultTagNames = payload.defaultTagNames; + } + if (payload.selections !== undefined) { + block.selections = { + collections: payload.selections.collections ?? [], + items: payload.selections.items ?? [], + }; + } + await writeMetabaseMappings(project, args.connectionId, block, `Apply Metabase mappings ${args.connectionId}`); io.stdout.write(`Applied bulk mappings for ${args.connectionId}\n`); return 0; } if (args.command === 'set-sync-enabled') { - await store.setMappingSyncEnabled({ - connectionId: args.connectionId, - metabaseDatabaseId: args.metabaseDatabaseId, - syncEnabled: args.enabled, - }); + const block = currentMetabaseMappings(project, args.connectionId); + block.syncEnabled[String(args.metabaseDatabaseId)] = args.enabled; + await writeMetabaseMappings( + project, + args.connectionId, + block, + `Set Metabase sync ${args.connectionId}.${args.metabaseDatabaseId}`, + ); io.stdout.write(`Set syncEnabled.${args.metabaseDatabaseId} = ${args.enabled}\n`); return 0; } if (args.command === 'sync-state-get') { - const state = await store.getSourceState(args.connectionId); + const state = await metabaseStateReader.getSourceState(args.connectionId); const payload = { syncMode: state.syncMode, selections: state.selections, @@ -351,15 +428,11 @@ export async function runKtxConnectionMapping( } if (args.command === 'sync-state-set') { - await store.setSyncState({ - connectionId: args.connectionId, - syncMode: args.syncMode, - defaultTagNames: args.tagNames, - selections: [ - ...args.collectionIds.map((id) => ({ selectionType: 'collection' as const, metabaseObjectId: id })), - ...args.itemIds.map((id) => ({ selectionType: 'item' as const, metabaseObjectId: id })), - ], - }); + const block = currentMetabaseMappings(project, args.connectionId); + block.syncMode = args.syncMode; + block.defaultTagNames = args.tagNames; + block.selections = { collections: args.collectionIds, items: args.itemIds }; + await writeMetabaseMappings(project, args.connectionId, block, `Set Metabase sync state ${args.connectionId}`); io.stdout.write(`Set sync state for ${args.connectionId}\n`); return 0; } @@ -368,15 +441,11 @@ export async function runKtxConnectionMapping( const client = await (deps.createMetabaseClient ?? createDefaultMetabaseClient)(project, args.connectionId); try { const discovered = await discoverMetabaseDatabases(client); - const existing = Object.fromEntries( - (await store.listDatabaseMappings(args.connectionId)).map((row) => [ - String(row.metabaseDatabaseId), - row.targetConnectionId, - ]), - ); + const block = currentMetabaseMappings(project, args.connectionId); + const existing = block.databaseMappings; const drift = computeMetabaseMappingDrift({ currentMappings: existing, discovered }); if (args.autoAccept) { - await store.refreshDiscoveredDatabases({ connectionId: args.connectionId, discovered }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: args.connectionId, discovered }); } io.stdout.write(`Discovery: ${discovered.length} ${discovered.length === 1 ? 'database' : 'databases'}\n`); io.stdout.write(`Unmapped discovered: ${drift.unmappedDiscovered.length}\n`); @@ -388,7 +457,9 @@ export async function runKtxConnectionMapping( } if (args.command === 'validate') { - const rows = await store.listDatabaseMappings(args.connectionId); + const rows = (await metabaseStateReader.listDatabaseMappings(args.connectionId)).filter( + (row) => row.source === 'ktx.yaml', + ); const failures = rows.flatMap((row) => { if (!row.targetConnectionId) { return []; @@ -412,7 +483,18 @@ export async function runKtxConnectionMapping( } const metabaseDatabaseId = args.metabaseDatabaseId ?? (args.mappingKey ? parseId(args.mappingKey, 'metabaseDatabaseId') : undefined); - await store.clearDatabaseMappings({ connectionId: args.connectionId, metabaseDatabaseId }); + const block = currentMetabaseMappings(project, args.connectionId); + if (metabaseDatabaseId === undefined) { + block.databaseMappings = {}; + block.syncEnabled = {}; + block.syncMode = 'ALL'; + block.selections = { collections: [], items: [] }; + block.defaultTagNames = []; + } else { + delete block.databaseMappings[String(metabaseDatabaseId)]; + delete block.syncEnabled[String(metabaseDatabaseId)]; + } + await writeMetabaseMappings(project, args.connectionId, block, `Clear Metabase mappings ${args.connectionId}`); io.stdout.write( metabaseDatabaseId ? `Cleared databaseMappings.${metabaseDatabaseId}\n` diff --git a/packages/cli/src/commands/connection-metabase-setup.test.ts b/packages/cli/src/commands/connection-metabase-setup.test.ts index 9d462bbd..7b7b7b84 100644 --- a/packages/cli/src/commands/connection-metabase-setup.test.ts +++ b/packages/cli/src/commands/connection-metabase-setup.test.ts @@ -1,7 +1,7 @@ import { mkdtemp, readFile, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { LocalMetabaseSourceStateReader } from '@ktx/context/ingest'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from '@ktx/context/ingest'; import { initKtxProject, ktxLocalStateDbPath, loadKtxProject, serializeKtxProjectConfig } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; @@ -9,6 +9,12 @@ import { runKtxConnectionMetabaseSetup } from './connection-metabase-setup.js'; const CANCEL_PROMPT = Symbol('cancel'); +async function metabaseMappingRows(projectDir: string, connectionId = 'metabase') { + const project = await loadKtxProject({ projectDir }); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + return new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }).listDatabaseMappings(connectionId); +} + function createTestMetabaseSetupPromptAdapter(options: { selects?: Array; multiselects?: Array | typeof CANCEL_PROMPT>; @@ -238,10 +244,7 @@ describe('runKtxConnectionMetabaseSetup', () => { expect(config).toContain('driver: metabase'); expect(config).toContain('api_url: http://metabase.example.test:3000'); expect(config).toContain('api_key: mb_example'); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, metabaseDatabaseName: 'Analytics', @@ -294,10 +297,7 @@ describe('runKtxConnectionMetabaseSetup', () => { { createMetabaseClient: async () => metabaseClient as never }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', syncEnabled: true }, ]); }); @@ -369,10 +369,7 @@ describe('runKtxConnectionMetabaseSetup', () => { { createMetabaseClient: async () => metabaseClient as never }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', syncEnabled: true }, ]); }); @@ -659,10 +656,7 @@ describe('runKtxConnectionMetabaseSetup', () => { { createMetabaseClient: async () => metabaseClient as never }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 1, targetConnectionId: 'orbit', syncEnabled: true }, { metabaseDatabaseId: 2, targetConnectionId: null, syncEnabled: false }, ]); @@ -785,10 +779,7 @@ describe('runKtxConnectionMetabaseSetup', () => { const config = await readFile(join(projectDir, 'ktx.yaml'), 'utf-8'); expect(config).toContain('driver: metabase'); expect(io.stderr()).toContain(`ktx ingest run --connection-id metabase --adapter metabase --project-dir ${projectDir}`); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit' }, ]); }); @@ -886,10 +877,7 @@ describe('runKtxConnectionMetabaseSetup', () => { expect(config).toContain('driver: metabase'); expect(config).toContain('api_url: http://metabase.example.test:3000'); expect(config).toContain(`api_key: ${interactiveMetabaseCredential}`); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', @@ -957,10 +945,7 @@ describe('runKtxConnectionMetabaseSetup', () => { }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', syncEnabled: true }, { metabaseDatabaseId: 3, targetConnectionId: 'warehouse2', syncEnabled: false }, ]); @@ -1128,9 +1113,6 @@ describe('runKtxConnectionMetabaseSetup', () => { const afterConfig = await readFile(join(projectDir, 'ktx.yaml'), 'utf-8'); expect(afterConfig).toBe(beforeConfig); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toEqual([]); + await expect(metabaseMappingRows(projectDir)).resolves.toEqual([]); }); }); diff --git a/packages/cli/src/commands/connection-metabase-setup.ts b/packages/cli/src/commands/connection-metabase-setup.ts index 2321ea3d..6e80200c 100644 --- a/packages/cli/src/commands/connection-metabase-setup.ts +++ b/packages/cli/src/commands/connection-metabase-setup.ts @@ -16,7 +16,8 @@ import { localConnectionToWarehouseDescriptor } from '@ktx/context/connections'; import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory, - LocalMetabaseSourceStateReader, + KtxYamlMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, MetabaseClient, type MetabaseDatabase, type MetabaseRuntimeClient, @@ -29,7 +30,9 @@ import { type KtxProjectConnectionConfig, ktxLocalStateDbPath, loadKtxProject, + parseMetabaseMappingBootstrap, serializeKtxProjectConfig, + stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import { createClackSpinner, type KtxCliSpinner } from '../clack.js'; @@ -338,6 +341,33 @@ function noteMetabaseSetupSummary(options: { ); } +function metabaseMappingsBlockForSetup(options: { + connectionId: string; + connection: KtxProjectConnectionConfig; + mappings: MetabaseSetupMappingAssignment[]; + syncEnabledDatabaseIds: number[]; + syncMode: MetabaseSetupSyncMode; +}): Record { + const existing = parseMetabaseMappingBootstrap(options.connectionId, options.connection); + const databaseMappings = { ...existing.databaseMappings }; + const syncEnabled = { ...existing.syncEnabled }; + for (const mapping of options.mappings) { + const key = String(mapping.metabaseDatabaseId); + databaseMappings[key] = mapping.targetConnectionId; + syncEnabled[key] = false; + } + for (const metabaseDatabaseId of options.syncEnabledDatabaseIds) { + syncEnabled[String(metabaseDatabaseId)] = true; + } + return { + databaseMappings, + syncEnabled, + syncMode: options.syncMode, + selections: existing.selections, + defaultTagNames: existing.defaultTagNames, + }; +} + export async function runKtxConnectionMetabaseSetup( args: KtxConnectionMetabaseSetupArgs, io: KtxCliIo, @@ -674,54 +704,37 @@ export async function runKtxConnectionMetabaseSetup( } } + const finalConnectionConfig: KtxProjectConnectionConfig = { + ...transientConnectionConfig, + mappings: metabaseMappingsBlockForSetup({ + connectionId, + connection: transientConnectionConfig, + mappings: resolvedMappings, + syncEnabledDatabaseIds: resolvedSyncEnabledDatabaseIds, + syncMode: args.syncMode, + }), + }; + const finalConfig = { + ...configWithTransient, + connections: { + ...configWithTransient.connections, + [connectionId]: finalConnectionConfig, + }, + }; await project.fileStore.writeFile( 'ktx.yaml', - serializeKtxProjectConfig(configWithTransient), + serializeKtxProjectConfig(stripKtxSetupCompletedSteps(finalConfig)), 'ktx', 'ktx@example.com', `Setup Metabase connection ${connectionId}`, ); const updatedProject = await loadKtxProject({ projectDir: args.projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - - await store.refreshDiscoveredDatabases({ connectionId, discovered }); - - for (const mapping of resolvedMappings) { - await store.upsertDatabaseMapping({ - connectionId, - metabaseDatabaseId: mapping.metabaseDatabaseId, - targetConnectionId: mapping.targetConnectionId, - syncEnabled: false, - source: 'cli', - }); - } - - for (const metabaseDatabaseId of resolvedSyncEnabledDatabaseIds) { - await store.setMappingSyncEnabled({ - connectionId, - metabaseDatabaseId, - syncEnabled: true, - }); - } - - const existingSyncState = await store.getSourceState(connectionId); - await store.setSyncState({ + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(updatedProject) }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId, discovered }); + const rows = await new KtxYamlMetabaseSourceStateReader(updatedProject, { discoveryCache }).listDatabaseMappings( connectionId, - syncMode: args.syncMode, - defaultTagNames: existingSyncState.defaultTagNames, - selections: existingSyncState.selections, - }); - - const unhydrated = await store.getUnhydratedSyncEnabledMappingIds(connectionId); - if (unhydrated.length > 0) { - io.stderr.write( - `Sync-enabled mappings are missing discovery metadata; run ktx connection mapping refresh ${connectionId} --auto-accept\n`, - ); - return 1; - } - - const rows = await store.listDatabaseMappings(connectionId); + ); const physicalFailures = rows.flatMap((row) => { if (!row.targetConnectionId) { return []; diff --git a/packages/cli/src/ingest.test-utils.ts b/packages/cli/src/ingest.test-utils.ts index 71d85c6c..09a1fe17 100644 --- a/packages/cli/src/ingest.test-utils.ts +++ b/packages/cli/src/ingest.test-utils.ts @@ -5,7 +5,8 @@ import { join } from 'node:path'; import { AgentRunnerService, type RunLoopParams } from '@ktx/context/agent'; import { LocalLookerRuntimeStore, - LocalMetabaseSourceStateReader, + KtxYamlMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, MetabaseSourceAdapter, getLocalIngestStatus, type ChunkResult, @@ -485,6 +486,23 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync ' driver: metabase', ' api_url: https://metabase.example.test', ' api_key: literal-test-key', + ' mappings:', + ' databaseMappings:', + ' "1": warehouse_a', + ' syncEnabled:', + ' "1": true', + ` syncMode: ${input.syncMode}`, + ' selections:', + ` collections: [${input.selections + .filter((selection) => selection.selectionType === 'collection') + .map((selection) => selection.metabaseObjectId) + .join(', ')}]`, + ` items: [${input.selections + .filter((selection) => selection.selectionType === 'item') + .map((selection) => selection.metabaseObjectId) + .join(', ')}]`, + ' defaultTagNames:', + ' - sync-mode-smoke', ' warehouse_a:', ' driver: postgres', ' url: postgresql://readonly@db.example.test/warehouse_a', @@ -499,29 +517,15 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync ); const project = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); - await store.replaceSourceState({ + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', - syncMode: input.syncMode, - defaultTagNames: ['sync-mode-smoke'], - selections: input.selections, - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: 'db.example.test', - metabaseDbName: 'warehouse_a', - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'refresh', - }, - ], + discovered: [{ id: 1, name: 'Warehouse A', engine: 'postgres', host: 'db.example.test', dbName: 'warehouse_a' }], }); const adapter = new MetabaseSourceAdapter({ clientFactory: new StaticMetabaseClientFactory(createSyncModeMetabaseClient()), - sourceStateReader: store, + sourceStateReader: new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }), }); const jobId = `metabase-sync-mode-${input.name}-child`; const io = makeIo(); diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 9fffdf0c..ab126991 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -3,7 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { LocalLookerRuntimeStore, - LocalMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, getLocalIngestStatus, type LocalIngestResult, type LocalMetabaseFanoutProgress, @@ -437,6 +437,16 @@ describe('runKtxIngest', () => { ' driver: metabase', ' api_url: https://metabase.example.test', ' api_key: literal-test-key', + ' mappings:', + ' databaseMappings:', + ' "1": warehouse_a', + ' "2": warehouse_b', + ' syncEnabled:', + ' "1": true', + ' "2": true', + ' syncMode: ALL', + ' defaultTagNames:', + ' - ktx', ' warehouse_a:', ' driver: postgres', ' url: postgresql://readonly@db.example.test/warehouse_a', @@ -453,33 +463,12 @@ describe('runKtxIngest', () => { 'utf-8', ); const project = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); - await store.replaceSourceState({ + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', - syncMode: 'ALL', - defaultTagNames: ['ktx'], - selections: [], - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: 'db.example.test', - metabaseDbName: 'warehouse_a', - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'refresh', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Warehouse B', - metabaseEngine: 'postgres', - metabaseHost: 'db.example.test', - metabaseDbName: 'warehouse_b', - targetConnectionId: 'warehouse_b', - syncEnabled: true, - source: 'refresh', - }, + discovered: [ + { id: 1, name: 'Warehouse A', engine: 'postgres', host: 'db.example.test', dbName: 'warehouse_a' }, + { id: 2, name: 'Warehouse B', engine: 'postgres', host: 'db.example.test', dbName: 'warehouse_b' }, ], }); const adapter = new CliMetabaseSourceAdapter(); diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index 46506ae7..3b6d013e 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -1124,7 +1124,6 @@ describe('setup databases step', () => { }); expect(config.setup).toEqual({ database_connection_ids: ['warehouse'], - completed_steps: [], }); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); expect(io.stdout()).toContain('Primary source ready'); @@ -1163,7 +1162,6 @@ describe('setup databases step', () => { }); expect(config.setup).toEqual({ database_connection_ids: ['warehouse'], - completed_steps: [], }); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); }); @@ -1213,7 +1211,7 @@ describe('setup databases step', () => { expect(scanConnection).toHaveBeenCalledTimes(2); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); expect(config.setup?.database_connection_ids).toEqual(['warehouse', 'analytics']); - expect(config.setup?.completed_steps).toEqual([]); + expect(config.setup?.completed_steps).toBeUndefined(); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); }); diff --git a/packages/cli/src/setup-project.test.ts b/packages/cli/src/setup-project.test.ts index 9c01402c..f4d3a1d8 100644 --- a/packages/cli/src/setup-project.test.ts +++ b/packages/cli/src/setup-project.test.ts @@ -94,7 +94,6 @@ describe('setup project step', () => { const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); expect(config.setup).toEqual({ database_connection_ids: ['warehouse'], - completed_steps: [], }); expect(await readKtxSetupState(projectDir)).toEqual({ completed_steps: ['llm', 'project'] }); }); diff --git a/packages/cli/src/setup-sources.test.ts b/packages/cli/src/setup-sources.test.ts index 76ba5d0f..bc65c95e 100644 --- a/packages/cli/src/setup-sources.test.ts +++ b/packages/cli/src/setup-sources.test.ts @@ -171,7 +171,7 @@ describe('setup sources step', () => { source_dir: '/repo/dbt', project_name: 'analytics', }); - expect(config.setup?.completed_steps).toEqual([]); + expect(config.setup?.completed_steps).toBeUndefined(); expect((await readKtxSetupState(projectDir)).completed_steps).toContain('sources'); expect(runInitialIngest).toHaveBeenCalledWith(projectDir, 'analytics_dbt', io.io, { inputMode: 'disabled' }); }); @@ -190,7 +190,7 @@ describe('setup sources step', () => { source: 'metabase', sourceConnectionId: 'prod_metabase', sourceUrl: 'https://metabase.example.com', - sourceApiKeyRef: 'env:METABASE_API_KEY', + sourceApiKeyRef: 'env:METABASE_API_KEY', // pragma: allowlist secret sourceWarehouseConnectionId: 'warehouse', metabaseDatabaseId: 1, runInitialSourceIngest: false, @@ -204,7 +204,7 @@ describe('setup sources step', () => { expect((await readConfig()).connections.prod_metabase).toMatchObject({ driver: 'metabase', api_url: 'https://metabase.example.com', - api_key_ref: 'env:METABASE_API_KEY', + api_key_ref: 'env:METABASE_API_KEY', // pragma: allowlist secret mappings: { databaseMappings: { '1': 'warehouse' }, syncEnabled: { '1': true }, @@ -225,7 +225,7 @@ describe('setup sources step', () => { inputMode: 'disabled', source: 'notion', sourceConnectionId: 'notion-main', - sourceApiKeyRef: 'env:NOTION_TOKEN', + sourceApiKeyRef: 'env:NOTION_TOKEN', // pragma: allowlist secret notionCrawlMode: 'selected_roots', notionRootPageIds: ['page-1'], runInitialSourceIngest: false, @@ -256,7 +256,7 @@ describe('setup sources step', () => { inputMode: 'disabled', source: 'notion', sourceConnectionId: 'notion-main', - sourceApiKeyRef: 'env:NOTION_TOKEN', + sourceApiKeyRef: 'env:NOTION_TOKEN', // pragma: allowlist secret notionCrawlMode: 'all_accessible', notionRootPageIds: ['page-1'], runInitialSourceIngest: false, @@ -766,7 +766,7 @@ describe('setup sources step', () => { connection: { driver: 'metabase', api_url: 'https://metabase.example.com', - api_key_ref: 'env:METABASE_API_KEY', + api_key_ref: 'env:METABASE_API_KEY', // pragma: allowlist secret mappings: { databaseMappings: { '1': 'warehouse' }, syncEnabled: { '1': true }, @@ -786,7 +786,7 @@ describe('setup sources step', () => { driver: 'looker', base_url: 'https://looker.example.com', client_id: 'client-id', - client_secret_ref: 'env:LOOKER_CLIENT_SECRET', + client_secret_ref: 'env:LOOKER_CLIENT_SECRET', // pragma: allowlist secret mappings: { connectionMappings: { warehouse: 'warehouse' } }, }, deps: { diff --git a/packages/context/src/ingest/adapters/metabase/fetch.test.ts b/packages/context/src/ingest/adapters/metabase/fetch.test.ts index c8d4f4fb..7e7e4e4a 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.test.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.test.ts @@ -36,6 +36,12 @@ function makeMockClient() { ), getCollectionTree: vi.fn().mockResolvedValue([{ id: 5, name: 'Orders Team', parent_id: null, children: [] }]), getCollectionItems: vi.fn().mockResolvedValue([]), + getDatabase: vi.fn().mockResolvedValue({ + id: 42, + name: 'Analytics', + engine: 'postgres', + details: { host: 'db.example.test', dbname: 'analytics' }, + }), cleanup: vi.fn().mockResolvedValue(undefined), }; } @@ -253,7 +259,7 @@ describe('fetchMetabaseBundle', () => { ).rejects.toThrow(/mapping.*does not point to connection/); }); - it('throws when the matching mapping has a null metabaseDatabaseName (unhydrated)', async () => { + it('hydrates missing mapping metadata from Metabase instead of requiring a prior refresh', async () => { sourceStateReader.getSourceState.mockResolvedValue({ syncMode: 'ALL', selections: [], @@ -268,15 +274,22 @@ describe('fetchMetabaseBundle', () => { ], defaultTagNames: [], }); - await expect( - fetchMetabaseBundle({ - pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, - stagedDir, - ctx: makeFetchContext(), - clientFactory, - sourceStateReader, - }), - ).rejects.toThrow(/unhydrated.*ktx connection mapping refresh/); + await fetchMetabaseBundle({ + pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, + stagedDir, + ctx: makeFetchContext(), + clientFactory, + sourceStateReader, + }); + + expect(clientFactory.__client.getDatabase).toHaveBeenCalledWith(42); + const databaseFile = JSON.parse(await readFile(join(stagedDir, 'databases/42.json'), 'utf-8')); + expect(databaseFile).toMatchObject({ + metabaseDatabaseId: 42, + metabaseDatabaseName: 'Analytics', + metabaseEngine: 'postgres', + targetConnectionId, + }); }); it('skips cards whose getResolvedSql returns null and records them in unresolved-cards.json', async () => { diff --git a/packages/context/src/ingest/adapters/metabase/fetch.ts b/packages/context/src/ingest/adapters/metabase/fetch.ts index 9ccb2be6..d4e8b59b 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.ts @@ -97,15 +97,16 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr `mapping for database ${pullConfig.metabaseDatabaseId} does not point to connection ${params.ctx.connectionId} (points to ${mapping.targetConnectionId})`, ); } - if (mapping.metabaseDatabaseName === null) { - throw new IngestInputError( - `mapping for database ${pullConfig.metabaseDatabaseId} on Metabase connection ${pullConfig.metabaseConnectionId} is unhydrated; run \`ktx connection mapping refresh ${pullConfig.metabaseConnectionId}\` to populate metabaseDatabaseName before ingest.`, - ); - } - const mappingDatabaseName: string = mapping.metabaseDatabaseName; const client = await params.clientFactory.createClient(pullConfig, params.ctx); try { + let mappingDatabaseName = mapping.metabaseDatabaseName; + let mappingEngine = mapping.metabaseEngine; + if (mappingDatabaseName === null) { + const database = await client.getDatabase(pullConfig.metabaseDatabaseId); + mappingDatabaseName = database.name; + mappingEngine = database.engine ?? null; + } const stagedForScope: StagedSyncConfig = { metabaseConnectionId: pullConfig.metabaseConnectionId, metabaseDatabaseId: pullConfig.metabaseDatabaseId, @@ -118,7 +119,7 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr mapping: { metabaseDatabaseId: mapping.metabaseDatabaseId, metabaseDatabaseName: mappingDatabaseName, - metabaseEngine: mapping.metabaseEngine, + metabaseEngine: mappingEngine, targetConnectionId: mapping.targetConnectionId, }, }; @@ -233,7 +234,7 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr const databaseFile: StagedDatabaseFile = { metabaseDatabaseId: mapping.metabaseDatabaseId, metabaseDatabaseName: mappingDatabaseName, - metabaseEngine: mapping.metabaseEngine, + metabaseEngine: mappingEngine, targetConnectionId: mapping.targetConnectionId, }; await writeFile( diff --git a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts index ec5e163e..a7ffc5de 100644 --- a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts +++ b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts @@ -12,7 +12,7 @@ import { type MetabaseClientRuntimeConfig, } from './client-port.js'; import type { MetabaseFetchLogger } from './fetch.js'; -import { LocalMetabaseSourceStateReader } from './local-source-state-store.js'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from './local-source-state-store.js'; import { MetabaseSourceAdapter } from './metabase.adapter.js'; function stringField(value: unknown): string | null { @@ -62,7 +62,8 @@ export function createLocalMetabaseSourceAdapter( project: KtxLocalProject, options: CreateLocalMetabaseSourceAdapterOptions = {}, ): MetabaseSourceAdapter { - const sourceStateReader = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + const sourceStateReader = new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }); const connectionFactory = new DefaultMetabaseConnectionClientFactory( (metabaseConnectionId) => metabaseRuntimeConfigFromLocalConnection( diff --git a/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts b/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts index f5aef74c..1139ea4d 100644 --- a/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts +++ b/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts @@ -2,313 +2,112 @@ import { mkdtemp, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { LocalMetabaseSourceStateReader } from './local-source-state-store.js'; +import { buildDefaultKtxProjectConfig } from '../../../project/index.js'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from './local-source-state-store.js'; -describe('LocalMetabaseSourceStateReader', () => { +describe('Metabase YAML source state and discovery cache', () => { let tempDir: string; - let store: LocalMetabaseSourceStateReader; + let discoveryCache: LocalMetabaseDiscoveryCache; beforeEach(async () => { - tempDir = await mkdtemp(join(tmpdir(), 'ktx-metabase-local-state-')); - store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); + tempDir = await mkdtemp(join(tmpdir(), 'ktx-metabase-cache-')); + discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); }); afterEach(async () => { await rm(tempDir, { recursive: true, force: true }); }); - it('round-trips hydrated source state through SQLite', async () => { - await store.replaceSourceState({ - connectionId: 'prod-metabase', + function projectWithMetabaseMappings(mappings: Record) { + return { + config: { + ...buildDefaultKtxProjectConfig('metabase-cache-test'), + connections: { + 'prod-metabase': { + driver: 'metabase', + mappings, + }, + }, + }, + }; + } + + it('reads Metabase mapping intent from ktx.yaml config', async () => { + const reader = new KtxYamlMetabaseSourceStateReader( + projectWithMetabaseMappings({ + databaseMappings: { '2': 'warehouse' }, + syncEnabled: { '2': true }, + syncMode: 'ONLY', + selections: { collections: [12], items: [99] }, + defaultTagNames: ['analytics'], + }), + { discoveryCache }, + ); + + await expect(reader.getSourceState('prod-metabase')).resolves.toEqual({ syncMode: 'ONLY', - defaultTagNames: ['analytics', 'curated'], + defaultTagNames: ['analytics'], selections: [ - { selectionType: 'collection', metabaseObjectId: 10 }, + { selectionType: 'collection', metabaseObjectId: 12 }, { selectionType: 'item', metabaseObjectId: 99 }, ], mappings: [ { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - metabaseHost: 'warehouse.internal', - metabaseDbName: 'analytics', - targetConnectionId: 'warehouse', - syncEnabled: true, - source: 'cli', - }, - ], - }); - - await expect(store.getSourceState('prod-metabase')).resolves.toEqual({ - syncMode: 'ONLY', - defaultTagNames: ['analytics', 'curated'], - selections: [ - { selectionType: 'collection', metabaseObjectId: 10 }, - { selectionType: 'item', metabaseObjectId: 99 }, - ], - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - targetConnectionId: 'warehouse', - syncEnabled: true, - }, - ], - }); - }); - - it('excludes unhydrated mappings from getSourceState and exposes them through the side accessor', async () => { - await store.replaceSourceState({ - connectionId: 'prod-metabase', - syncMode: 'ALL', - defaultTagNames: [], - selections: [], - mappings: [ - { - metabaseDatabaseId: 1, + metabaseDatabaseId: 2, metabaseDatabaseName: null, metabaseEngine: null, metabaseHost: null, metabaseDbName: null, targetConnectionId: 'warehouse', syncEnabled: true, - source: 'ktx.yaml', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'warehouse.internal', - metabaseDbName: 'sandbox', - targetConnectionId: 'warehouse', - syncEnabled: true, - source: 'refresh', }, ], }); - - const state = await store.getSourceState('prod-metabase'); - expect(state.mappings.map((mapping) => mapping.metabaseDatabaseId)).toEqual([2]); - await expect(store.getUnhydratedSyncEnabledMappingIds('prod-metabase')).resolves.toEqual([1]); }); - it('defaults missing sync config to ALL with no tags or selections', async () => { - await store.replaceSourceState({ + it('enriches YAML mapping rows with recreatable discovery metadata', async () => { + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 3, - metabaseDatabaseName: 'Warehouse', - metabaseEngine: 'postgres', - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: null, - syncEnabled: false, - source: 'refresh', - }, - ], + discovered: [{ id: 2, name: 'Analytics', engine: 'postgres', host: 'pg.internal', dbName: 'analytics' }], }); + const reader = new KtxYamlMetabaseSourceStateReader( + projectWithMetabaseMappings({ + databaseMappings: { '2': 'warehouse' }, + syncEnabled: { '2': true }, + }), + { discoveryCache }, + ); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ALL', - defaultTagNames: [], - selections: [], - }); - }); - - it('supports command-sized mapping writes and reads', async () => { - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 1, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'cli', - }); - await store.setSyncState({ - connectionId: 'prod-metabase', - syncMode: 'ONLY', - defaultTagNames: ['analytics'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toEqual([ + await expect(reader.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ { - metabaseDatabaseId: 1, - metabaseDatabaseName: null, - metabaseEngine: null, - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'cli', - }, - ]); - await expect(store.getUnhydratedSyncEnabledMappingIds('prod-metabase')).resolves.toEqual([1]); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ONLY', - defaultTagNames: ['analytics'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - mappings: [], - }); - }); - - it('refreshes discovered database metadata while preserving user mapping intent', async () => { - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 1, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'cli', - }); - - await store.refreshDiscoveredDatabases({ - connectionId: 'prod-metabase', - discovered: [ - { id: 1, name: 'Analytics', engine: 'postgres', host: 'pg.internal', dbName: 'analytics' }, - { id: 2, name: 'Sandbox', engine: 'postgres', host: 'pg.internal', dbName: 'sandbox' }, - ], - }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toEqual([ - { - metabaseDatabaseId: 1, + metabaseDatabaseId: 2, metabaseDatabaseName: 'Analytics', metabaseEngine: 'postgres', metabaseHost: 'pg.internal', metabaseDbName: 'analytics', - targetConnectionId: 'prod-warehouse', + targetConnectionId: 'warehouse', syncEnabled: true, - source: 'cli', + source: 'ktx.yaml', }, + ]); + }); + + it('lists discovered-only rows as refresh cache data without turning them into config state', async () => { + await discoveryCache.refreshDiscoveredDatabases({ + connectionId: 'prod-metabase', + discovered: [{ id: 7, name: 'Unmapped', engine: 'mysql', host: 'mysql.internal', dbName: 'sales' }], + }); + const reader = new KtxYamlMetabaseSourceStateReader(projectWithMetabaseMappings({}), { discoveryCache }); + + await expect(reader.getSourceState('prod-metabase')).resolves.toMatchObject({ mappings: [] }); + await expect(reader.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'sandbox', + metabaseDatabaseId: 7, + metabaseDatabaseName: 'Unmapped', targetConnectionId: null, syncEnabled: false, source: 'refresh', }, ]); }); - - it('updates sync-enabled, clears scoped rows, and applies bulk state in one call', async () => { - await store.replaceSourceState({ - connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'analytics', - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'refresh', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'sandbox', - targetConnectionId: 'staging-warehouse', - syncEnabled: true, - source: 'refresh', - }, - ], - }); - - await store.setMappingSyncEnabled({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 2, - syncEnabled: false, - }); - await store.clearDatabaseMappings({ connectionId: 'prod-metabase', metabaseDatabaseId: 1 }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toEqual([ - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'sandbox', - targetConnectionId: 'staging-warehouse', - syncEnabled: false, - source: 'refresh', - }, - ]); - }); - - it('seeds unhydrated yaml intent without exposing it through getSourceState', async () => { - await store.applyYamlBootstrap({ - connectionId: 'prod-metabase', - syncMode: 'ALL', - defaultTagNames: ['ktx'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - mappings: [{ metabaseDatabaseId: 1, targetConnectionId: 'prod-warehouse', syncEnabled: true }], - }); - - await expect(store.getUnhydratedSyncEnabledMappingIds('prod-metabase')).resolves.toEqual([1]); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ALL', - defaultTagNames: ['ktx'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - mappings: [], - }); - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: null, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'ktx.yaml', - }, - ]); - }); - - it('applies yaml target intent onto refresh metadata but does not overwrite cli rows', async () => { - await store.refreshDiscoveredDatabases({ - connectionId: 'prod-metabase', - discovered: [{ id: 1, name: 'Analytics', engine: 'postgres', host: 'db.test', dbName: 'analytics' }], - }); - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 2, - targetConnectionId: 'cli-warehouse', - syncEnabled: true, - source: 'cli', - }); - - await store.applyYamlBootstrap({ - connectionId: 'prod-metabase', - syncMode: 'EXCEPT', - defaultTagNames: [], - selections: [{ selectionType: 'item', metabaseObjectId: 99 }], - mappings: [ - { metabaseDatabaseId: 1, targetConnectionId: 'yaml-warehouse', syncEnabled: true }, - { metabaseDatabaseId: 2, targetConnectionId: 'yaml-warehouse', syncEnabled: false }, - ], - }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - targetConnectionId: 'yaml-warehouse', - syncEnabled: true, - source: 'ktx.yaml', - }, - { - metabaseDatabaseId: 2, - targetConnectionId: 'cli-warehouse', - syncEnabled: true, - source: 'cli', - }, - ]); - }); }); diff --git a/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts b/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts index 246d5f33..f8026db6 100644 --- a/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts +++ b/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts @@ -1,17 +1,31 @@ import { mkdirSync } from 'node:fs'; import { dirname } from 'node:path'; import Database from 'better-sqlite3'; +import { + parseMetabaseMappingBootstrap, + type KtxLocalProject, + type MetabaseMappingBootstrap, +} from '../../../project/index.js'; +import type { DiscoveredMetabaseDatabase } from './mapping.js'; import type { MetabaseSourceState, MetabaseSourceStateReader, MetabaseSourceStateSelection } from './source-state-port.js'; -import type { MetabaseSyncMode } from './types.js'; -export type LocalMetabaseMappingSource = 'ktx.yaml' | 'cli' | 'refresh'; +export type LocalMetabaseMappingSource = 'ktx.yaml' | 'refresh'; -interface LocalMetabaseSourceStateStoreOptions { +interface LocalMetabaseDiscoveryCacheOptions { dbPath: string; now?: () => Date; } -export interface LocalMetabaseSourceStateMappingInput { +export interface RefreshLocalMetabaseDiscoveredDatabasesInput { + connectionId: string; + discovered: DiscoveredMetabaseDatabase[]; +} + +export interface LocalMetabaseDiscoveredDatabaseRow extends DiscoveredMetabaseDatabase { + updatedAt: string; +} + +export interface LocalMetabaseMappingListRow { metabaseDatabaseId: number; metabaseDatabaseName: string | null; metabaseEngine: string | null; @@ -22,443 +36,86 @@ export interface LocalMetabaseSourceStateMappingInput { source: LocalMetabaseMappingSource; } -export interface ReplaceLocalMetabaseSourceStateInput { - connectionId: string; - syncMode?: MetabaseSyncMode; - defaultTagNames?: string[]; - selections?: MetabaseSourceStateSelection[]; - mappings: LocalMetabaseSourceStateMappingInput[]; -} - -interface ApplyLocalMetabaseYamlBootstrapInput { - connectionId: string; - syncMode: MetabaseSyncMode; - defaultTagNames: string[]; - selections: MetabaseSourceStateSelection[]; - mappings: Array<{ - metabaseDatabaseId: number; - targetConnectionId: string | null; - syncEnabled: boolean; - }>; -} - -export interface LocalMetabaseMappingListRow extends LocalMetabaseSourceStateMappingInput {} - -export interface UpsertLocalMetabaseDatabaseMappingInput { - connectionId: string; - metabaseDatabaseId: number; - targetConnectionId: string | null; - syncEnabled: boolean; - source: LocalMetabaseMappingSource; -} - -export interface SetLocalMetabaseMappingSyncEnabledInput { - connectionId: string; - metabaseDatabaseId: number; - syncEnabled: boolean; -} - -export interface SetLocalMetabaseSyncStateInput { - connectionId: string; - syncMode: MetabaseSyncMode; - defaultTagNames: string[]; - selections: MetabaseSourceStateSelection[]; -} - -export interface RefreshLocalMetabaseDiscoveredDatabasesInput { - connectionId: string; - discovered: Array<{ - id: number; - name: string; - engine: string; - host: string | null; - dbName: string | null; - }>; -} - -export interface ClearLocalMetabaseMappingsInput { - connectionId: string; - metabaseDatabaseId?: number; -} - -interface SelectionRow { - selection_type: 'collection' | 'item'; - metabase_object_id: number; -} - -interface MappingRow { +interface DiscoveryRow { metabase_database_id: number; - metabase_database_name: string | null; - metabase_engine: string | null; - target_connection_id: string | null; - sync_enabled: number; + metabase_database_name: string; + metabase_engine: string; + metabase_host: string | null; + metabase_db_name: string | null; + updated_at: string; } -interface SyncConfigRow { - sync_mode: MetabaseSyncMode; - default_tag_names_json: string; +function selectionState(bootstrap: MetabaseMappingBootstrap): MetabaseSourceStateSelection[] { + return [ + ...bootstrap.selections.collections.map((id) => ({ selectionType: 'collection' as const, metabaseObjectId: id })), + ...bootstrap.selections.items.map((id) => ({ selectionType: 'item' as const, metabaseObjectId: id })), + ]; } -function parseDefaultTagNames(raw: string): string[] { - const parsed = JSON.parse(raw); - return Array.isArray(parsed) ? parsed.filter((value): value is string => typeof value === 'string') : []; +function configuredMappingIds(bootstrap: MetabaseMappingBootstrap): number[] { + return [...new Set([...Object.keys(bootstrap.databaseMappings), ...Object.keys(bootstrap.syncEnabled)].map(Number))].sort( + (left, right) => left - right, + ); } -export class LocalMetabaseSourceStateReader implements MetabaseSourceStateReader { +function discoveredRowToDatabase(row: DiscoveryRow): LocalMetabaseDiscoveredDatabaseRow { + return { + id: row.metabase_database_id, + name: row.metabase_database_name, + engine: row.metabase_engine, + host: row.metabase_host, + dbName: row.metabase_db_name, + updatedAt: row.updated_at, + }; +} + +function emptyMetabaseSourceState(): MetabaseSourceState { + return { + syncMode: 'ALL', + selections: [], + defaultTagNames: [], + mappings: [], + }; +} + +export class LocalMetabaseDiscoveryCache { private readonly db: Database.Database; private readonly now: () => Date; - constructor(options: LocalMetabaseSourceStateStoreOptions) { + constructor(options: LocalMetabaseDiscoveryCacheOptions) { mkdirSync(dirname(options.dbPath), { recursive: true }); this.db = new Database(options.dbPath); this.db.pragma('journal_mode = WAL'); this.db.pragma('foreign_keys = ON'); this.now = options.now ?? (() => new Date()); this.db.exec(` - CREATE TABLE IF NOT EXISTS local_metabase_sync_config ( - metabase_connection_id TEXT PRIMARY KEY, - sync_mode TEXT NOT NULL, - default_tag_names_json TEXT NOT NULL, - updated_at TEXT NOT NULL - ); - - CREATE TABLE IF NOT EXISTS local_metabase_selections ( - metabase_connection_id TEXT NOT NULL, - selection_type TEXT NOT NULL, - metabase_object_id INTEGER NOT NULL, - PRIMARY KEY (metabase_connection_id, selection_type, metabase_object_id) - ); - - CREATE TABLE IF NOT EXISTS local_metabase_database_mappings ( + CREATE TABLE IF NOT EXISTS local_metabase_discovered_databases ( metabase_connection_id TEXT NOT NULL, metabase_database_id INTEGER NOT NULL, - metabase_database_name TEXT, - metabase_engine TEXT, + metabase_database_name TEXT NOT NULL, + metabase_engine TEXT NOT NULL, metabase_host TEXT, metabase_db_name TEXT, - target_connection_id TEXT, - sync_enabled INTEGER NOT NULL DEFAULT 0, - source TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (metabase_connection_id, metabase_database_id) ); `); } - async applyYamlBootstrap(input: ApplyLocalMetabaseYamlBootstrapInput): Promise { - const timestamp = this.now().toISOString(); - const apply = this.db.transaction(() => { - const syncConfigExists = this.db - .prepare('SELECT 1 FROM local_metabase_sync_config WHERE metabase_connection_id = ?') - .get(input.connectionId); - if (!syncConfigExists) { - this.db - .prepare( - ` - INSERT INTO local_metabase_sync_config ( - metabase_connection_id, - sync_mode, - default_tag_names_json, - updated_at - ) - VALUES (?, ?, ?, ?) - `, - ) - .run(input.connectionId, input.syncMode, JSON.stringify(input.defaultTagNames), timestamp); - - const insertSelection = this.db.prepare(` - INSERT INTO local_metabase_selections ( - metabase_connection_id, - selection_type, - metabase_object_id - ) - VALUES (?, ?, ?) - `); - for (const selection of input.selections) { - insertSelection.run(input.connectionId, selection.selectionType, selection.metabaseObjectId); - } - } - - const existing = this.db.prepare(` - SELECT target_connection_id, source - FROM local_metabase_database_mappings - WHERE metabase_connection_id = ? AND metabase_database_id = ? - `); - const insert = this.db.prepare(` - INSERT INTO local_metabase_database_mappings ( - metabase_connection_id, - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source, - updated_at - ) - VALUES (?, ?, NULL, NULL, NULL, NULL, ?, ?, 'ktx.yaml', ?) - `); - const updateRefreshRow = this.db.prepare(` - UPDATE local_metabase_database_mappings - SET target_connection_id = ?, - sync_enabled = ?, - source = 'ktx.yaml', - updated_at = ? - WHERE metabase_connection_id = ? - AND metabase_database_id = ? - AND source = 'refresh' - AND target_connection_id IS NULL - `); - - for (const mapping of input.mappings) { - const row = existing.get(input.connectionId, mapping.metabaseDatabaseId) as - | { target_connection_id: string | null; source: LocalMetabaseMappingSource } - | undefined; - if (!row) { - insert.run( - input.connectionId, - mapping.metabaseDatabaseId, - mapping.targetConnectionId, - mapping.syncEnabled ? 1 : 0, - timestamp, - ); - continue; - } - if (row.source === 'refresh' && row.target_connection_id === null) { - updateRefreshRow.run( - mapping.targetConnectionId, - mapping.syncEnabled ? 1 : 0, - timestamp, - input.connectionId, - mapping.metabaseDatabaseId, - ); - } - } - }); - - apply(); - } - - async replaceSourceState(input: ReplaceLocalMetabaseSourceStateInput): Promise { - const timestamp = this.now().toISOString(); - const syncMode = input.syncMode ?? 'ALL'; - const selections = input.selections ?? []; - const defaultTagNames = input.defaultTagNames ?? []; - - const replace = this.db.transaction(() => { - this.db - .prepare( - ` - INSERT INTO local_metabase_sync_config ( - metabase_connection_id, - sync_mode, - default_tag_names_json, - updated_at - ) - VALUES (?, ?, ?, ?) - ON CONFLICT(metabase_connection_id) DO UPDATE SET - sync_mode = excluded.sync_mode, - default_tag_names_json = excluded.default_tag_names_json, - updated_at = excluded.updated_at - `, - ) - .run(input.connectionId, syncMode, JSON.stringify(defaultTagNames), timestamp); - - this.db.prepare('DELETE FROM local_metabase_selections WHERE metabase_connection_id = ?').run(input.connectionId); - const insertSelection = this.db.prepare(` - INSERT INTO local_metabase_selections ( - metabase_connection_id, - selection_type, - metabase_object_id - ) - VALUES (?, ?, ?) - `); - for (const selection of selections) { - insertSelection.run(input.connectionId, selection.selectionType, selection.metabaseObjectId); - } - - this.db - .prepare('DELETE FROM local_metabase_database_mappings WHERE metabase_connection_id = ?') - .run(input.connectionId); - const insertMapping = this.db.prepare(` - INSERT INTO local_metabase_database_mappings ( - metabase_connection_id, - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source, - updated_at - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `); - for (const mapping of input.mappings) { - insertMapping.run( - input.connectionId, - mapping.metabaseDatabaseId, - mapping.metabaseDatabaseName, - mapping.metabaseEngine, - mapping.metabaseHost, - mapping.metabaseDbName, - mapping.targetConnectionId, - mapping.syncEnabled ? 1 : 0, - mapping.source, - timestamp, - ); - } - }); - - replace(); - } - - async listDatabaseMappings(connectionId: string): Promise { - const rows = this.db - .prepare( - ` - SELECT - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source - FROM local_metabase_database_mappings - WHERE metabase_connection_id = ? - ORDER BY metabase_database_id - `, - ) - .all(connectionId) as Array<{ - metabase_database_id: number; - metabase_database_name: string | null; - metabase_engine: string | null; - metabase_host: string | null; - metabase_db_name: string | null; - target_connection_id: string | null; - sync_enabled: number; - source: LocalMetabaseMappingSource; - }>; - - return rows.map((row) => ({ - metabaseDatabaseId: row.metabase_database_id, - metabaseDatabaseName: row.metabase_database_name, - metabaseEngine: row.metabase_engine, - metabaseHost: row.metabase_host, - metabaseDbName: row.metabase_db_name, - targetConnectionId: row.target_connection_id, - syncEnabled: row.sync_enabled === 1, - source: row.source, - })); - } - - async upsertDatabaseMapping(input: UpsertLocalMetabaseDatabaseMappingInput): Promise { - const timestamp = this.now().toISOString(); - this.db - .prepare( - ` - INSERT INTO local_metabase_database_mappings ( - metabase_connection_id, - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source, - updated_at - ) - VALUES (?, ?, NULL, NULL, NULL, NULL, ?, ?, ?, ?) - ON CONFLICT(metabase_connection_id, metabase_database_id) DO UPDATE SET - target_connection_id = excluded.target_connection_id, - sync_enabled = excluded.sync_enabled, - source = excluded.source, - updated_at = excluded.updated_at - `, - ) - .run( - input.connectionId, - input.metabaseDatabaseId, - input.targetConnectionId, - input.syncEnabled ? 1 : 0, - input.source, - timestamp, - ); - } - - async setMappingSyncEnabled(input: SetLocalMetabaseMappingSyncEnabledInput): Promise { - const timestamp = this.now().toISOString(); - this.db - .prepare( - ` - UPDATE local_metabase_database_mappings - SET sync_enabled = ?, updated_at = ? - WHERE metabase_connection_id = ? AND metabase_database_id = ? - `, - ) - .run(input.syncEnabled ? 1 : 0, timestamp, input.connectionId, input.metabaseDatabaseId); - } - - async setSyncState(input: SetLocalMetabaseSyncStateInput): Promise { - const timestamp = this.now().toISOString(); - const write = this.db.transaction(() => { - this.db - .prepare( - ` - INSERT INTO local_metabase_sync_config ( - metabase_connection_id, - sync_mode, - default_tag_names_json, - updated_at - ) - VALUES (?, ?, ?, ?) - ON CONFLICT(metabase_connection_id) DO UPDATE SET - sync_mode = excluded.sync_mode, - default_tag_names_json = excluded.default_tag_names_json, - updated_at = excluded.updated_at - `, - ) - .run(input.connectionId, input.syncMode, JSON.stringify(input.defaultTagNames), timestamp); - - this.db.prepare('DELETE FROM local_metabase_selections WHERE metabase_connection_id = ?').run(input.connectionId); - const insertSelection = this.db.prepare(` - INSERT INTO local_metabase_selections ( - metabase_connection_id, - selection_type, - metabase_object_id - ) - VALUES (?, ?, ?) - `); - for (const selection of input.selections) { - insertSelection.run(input.connectionId, selection.selectionType, selection.metabaseObjectId); - } - }); - - write(); - } - async refreshDiscoveredDatabases(input: RefreshLocalMetabaseDiscoveredDatabasesInput): Promise { const timestamp = this.now().toISOString(); const refresh = this.db.transaction(() => { const upsert = this.db.prepare(` - INSERT INTO local_metabase_database_mappings ( + INSERT INTO local_metabase_discovered_databases ( metabase_connection_id, metabase_database_id, metabase_database_name, metabase_engine, metabase_host, metabase_db_name, - target_connection_id, - sync_enabled, - source, updated_at ) - VALUES (?, ?, ?, ?, ?, ?, NULL, 0, 'refresh', ?) + VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(metabase_connection_id, metabase_database_id) DO UPDATE SET metabase_database_name = excluded.metabase_database_name, metabase_engine = excluded.metabase_engine, @@ -483,78 +140,116 @@ export class LocalMetabaseSourceStateReader implements MetabaseSourceStateReader refresh(); } - async clearDatabaseMappings(input: ClearLocalMetabaseMappingsInput): Promise { - if (input.metabaseDatabaseId === undefined) { - this.db.prepare('DELETE FROM local_metabase_database_mappings WHERE metabase_connection_id = ?').run(input.connectionId); - return; - } - this.db - .prepare('DELETE FROM local_metabase_database_mappings WHERE metabase_connection_id = ? AND metabase_database_id = ?') - .run(input.connectionId, input.metabaseDatabaseId); - } - - async getUnhydratedSyncEnabledMappingIds(connectionId: string): Promise { + async listDiscoveredDatabases(connectionId: string): Promise { const rows = this.db - .prepare( - ` - SELECT metabase_database_id - FROM local_metabase_database_mappings - WHERE metabase_connection_id = ? - AND sync_enabled = 1 - AND target_connection_id IS NOT NULL - AND metabase_database_name IS NULL - ORDER BY metabase_database_id - `, - ) - .all(connectionId) as Array<{ metabase_database_id: number }>; - return rows.map((row) => row.metabase_database_id); - } - - async getSourceState(connectionId: string): Promise { - const config = this.db - .prepare('SELECT sync_mode, default_tag_names_json FROM local_metabase_sync_config WHERE metabase_connection_id = ?') - .get(connectionId) as SyncConfigRow | undefined; - const selections = this.db - .prepare( - ` - SELECT selection_type, metabase_object_id - FROM local_metabase_selections - WHERE metabase_connection_id = ? - ORDER BY selection_type, metabase_object_id - `, - ) - .all(connectionId) as SelectionRow[]; - const mappings = this.db .prepare( ` SELECT metabase_database_id, metabase_database_name, metabase_engine, - target_connection_id, - sync_enabled - FROM local_metabase_database_mappings + metabase_host, + metabase_db_name, + updated_at + FROM local_metabase_discovered_databases WHERE metabase_connection_id = ? - AND metabase_database_name IS NOT NULL ORDER BY metabase_database_id `, ) - .all(connectionId) as MappingRow[]; + .all(connectionId) as DiscoveryRow[]; + return rows.map(discoveredRowToDatabase); + } - return { - syncMode: config?.sync_mode ?? 'ALL', - defaultTagNames: config ? parseDefaultTagNames(config.default_tag_names_json) : [], - selections: selections.map((selection) => ({ - selectionType: selection.selection_type, - metabaseObjectId: selection.metabase_object_id, - })), - mappings: mappings.map((mapping) => ({ - metabaseDatabaseId: mapping.metabase_database_id, - metabaseDatabaseName: mapping.metabase_database_name, - metabaseEngine: mapping.metabase_engine, - targetConnectionId: mapping.target_connection_id, - syncEnabled: mapping.sync_enabled === 1, - })), - }; + async getDiscoveredDatabase( + connectionId: string, + metabaseDatabaseId: number, + ): Promise { + const row = this.db + .prepare( + ` + SELECT + metabase_database_id, + metabase_database_name, + metabase_engine, + metabase_host, + metabase_db_name, + updated_at + FROM local_metabase_discovered_databases + WHERE metabase_connection_id = ? AND metabase_database_id = ? + `, + ) + .get(connectionId, metabaseDatabaseId) as DiscoveryRow | undefined; + return row ? discoveredRowToDatabase(row) : null; + } +} + +export class KtxYamlMetabaseSourceStateReader implements MetabaseSourceStateReader { + constructor( + private readonly project: Pick, + private readonly options: { discoveryCache?: LocalMetabaseDiscoveryCache } = {}, + ) {} + + async getSourceState(connectionId: string): Promise { + const connection = this.project.config.connections[connectionId]; + if (!connection || String(connection.driver ?? '').toLowerCase() !== 'metabase') { + return emptyMetabaseSourceState(); + } + + const bootstrap = parseMetabaseMappingBootstrap(connectionId, connection); + const discovered = new Map( + (await this.options.discoveryCache?.listDiscoveredDatabases(connectionId))?.map((database) => [database.id, database]) ?? + [], + ); + + return { + syncMode: bootstrap.syncMode, + selections: selectionState(bootstrap), + defaultTagNames: bootstrap.defaultTagNames, + mappings: configuredMappingIds(bootstrap).map((id) => { + const metadata = discovered.get(id); + return { + metabaseDatabaseId: id, + metabaseDatabaseName: metadata?.name ?? null, + metabaseEngine: metadata?.engine ?? null, + metabaseHost: metadata?.host ?? null, + metabaseDbName: metadata?.dbName ?? null, + targetConnectionId: bootstrap.databaseMappings[String(id)] ?? null, + syncEnabled: bootstrap.syncEnabled[String(id)] ?? false, + }; + }), + }; + } + + async listDatabaseMappings(connectionId: string): Promise { + const state = await this.getSourceState(connectionId); + const configuredRows: LocalMetabaseMappingListRow[] = state.mappings.map((mapping) => ({ + metabaseDatabaseId: mapping.metabaseDatabaseId, + metabaseDatabaseName: mapping.metabaseDatabaseName, + metabaseEngine: mapping.metabaseEngine, + metabaseHost: mapping.metabaseHost ?? null, + metabaseDbName: mapping.metabaseDbName ?? null, + targetConnectionId: mapping.targetConnectionId, + syncEnabled: mapping.syncEnabled, + source: 'ktx.yaml', + })); + + const configuredIds = new Set(configuredRows.map((row) => row.metabaseDatabaseId)); + const discoveredRows = + (await this.options.discoveryCache?.listDiscoveredDatabases(connectionId))?.filter( + (database) => !configuredIds.has(database.id), + ) ?? []; + return [ + ...configuredRows, + ...discoveredRows.map((database) => ({ + metabaseDatabaseId: database.id, + metabaseDatabaseName: database.name, + metabaseEngine: database.engine, + metabaseHost: database.host, + metabaseDbName: database.dbName, + targetConnectionId: null, + syncEnabled: false, + source: 'refresh' as const, + })), + ].sort((left, right) => left.metabaseDatabaseId - right.metabaseDatabaseId); } } diff --git a/packages/context/src/ingest/adapters/metabase/source-state-port.ts b/packages/context/src/ingest/adapters/metabase/source-state-port.ts index 7c872f8d..16de9369 100644 --- a/packages/context/src/ingest/adapters/metabase/source-state-port.ts +++ b/packages/context/src/ingest/adapters/metabase/source-state-port.ts @@ -9,6 +9,8 @@ export interface MetabaseSourceStateMapping { metabaseDatabaseId: number; metabaseDatabaseName: string | null; metabaseEngine: string | null; + metabaseHost?: string | null; + metabaseDbName?: string | null; targetConnectionId: string | null; syncEnabled: boolean; } diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index d2336ae9..3c238d98 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -240,17 +240,15 @@ export { createLocalMetabaseSourceAdapter, metabaseRuntimeConfigFromLocalConnection, } from './adapters/metabase/local-metabase.adapter.js'; -export { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; +export { + KtxYamlMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, +} from './adapters/metabase/local-source-state-store.js'; export type { - ClearLocalMetabaseMappingsInput, + LocalMetabaseDiscoveredDatabaseRow, LocalMetabaseMappingListRow, LocalMetabaseMappingSource, - LocalMetabaseSourceStateMappingInput, - ReplaceLocalMetabaseSourceStateInput, RefreshLocalMetabaseDiscoveredDatabasesInput, - SetLocalMetabaseMappingSyncEnabledInput, - SetLocalMetabaseSyncStateInput, - UpsertLocalMetabaseDatabaseMappingInput, } from './adapters/metabase/local-source-state-store.js'; export { metabaseLocalConnectionIdSchema, metabasePullConfigSchema, parseMetabasePullConfig } from './adapters/metabase/types.js'; export type { MetabasePullConfig, MetabaseSyncMode } from './adapters/metabase/types.js'; diff --git a/packages/context/src/ingest/local-ingest.ts b/packages/context/src/ingest/local-ingest.ts index bc6294c4..2ebb3e7b 100644 --- a/packages/context/src/ingest/local-ingest.ts +++ b/packages/context/src/ingest/local-ingest.ts @@ -9,10 +9,9 @@ import type { KtxLocalProject } from '../project/index.js'; import { ktxLocalStateDbPath } from '../project/index.js'; import type { KtxQueryResult } from '../sl/index.js'; import { planMetabaseFanoutChildren } from './adapters/metabase/fanout-planner.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from './adapters/metabase/local-source-state-store.js'; import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } from './local-adapters.js'; import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js'; -import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; import type { MemoryFlowEventSink } from './memory-flow/types.js'; import { buildSyncId } from './raw-sources-paths.js'; import type { IngestReportBody, IngestReportSnapshot } from './reports.js'; @@ -364,16 +363,10 @@ export async function runLocalMetabaseIngest( const metabaseConnectionId = safeSegment('metabase connection id', options.metabaseConnectionId); assertConfigured(options.project, 'metabase', metabaseConnectionId); - await seedLocalMappingStateFromKtxYaml(options.project, metabaseConnectionId); const adapter = findAdapter(options.adapters, 'metabase'); - const sourceStateReader = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(options.project) }); - - const unhydrated = await sourceStateReader.getUnhydratedSyncEnabledMappingIds(metabaseConnectionId); - if (unhydrated.length > 0) { - throw new Error( - `Metabase mappings ${unhydrated.join(', ')} are not hydrated; run \`ktx connection mapping refresh ${metabaseConnectionId}\` before local Metabase ingest.`, - ); - } + const sourceStateReader = new KtxYamlMetabaseSourceStateReader(options.project, { + discoveryCache: new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(options.project) }), + }); const state = await sourceStateReader.getSourceState(metabaseConnectionId); const childPlans = planMetabaseFanoutChildren({ diff --git a/packages/context/src/ingest/local-mapping-reconcile.test.ts b/packages/context/src/ingest/local-mapping-reconcile.test.ts index c0f8dcac..4a5d2740 100644 --- a/packages/context/src/ingest/local-mapping-reconcile.test.ts +++ b/packages/context/src/ingest/local-mapping-reconcile.test.ts @@ -4,7 +4,6 @@ import { join } from 'node:path'; import { afterEach, describe, expect, it } from 'vitest'; import { ktxLocalStateDbPath, type KtxLocalProject } from '../project/index.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; describe('local mapping yaml reconciliation bridge', () => { @@ -23,7 +22,7 @@ describe('local mapping yaml reconciliation bridge', () => { } as KtxLocalProject; } - it('seeds Metabase local state from ktx.yaml mapping intent', async () => { + it('does not copy Metabase mapping intent into local SQLite state', async () => { tempDir = await mkdtemp(join(tmpdir(), 'ktx-metabase-yaml-seed-')); const project = projectWithConnections({ 'prod-metabase': { @@ -39,17 +38,7 @@ describe('local mapping yaml reconciliation bridge', () => { 'prod-warehouse': { driver: 'postgres', url: 'postgresql://readonly@db.test/analytics' }, }); - await seedLocalMappingStateFromKtxYaml(project, 'prod-metabase'); - - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { metabaseDatabaseId: 1, targetConnectionId: 'prod-warehouse', syncEnabled: true, source: 'ktx.yaml' }, - ]); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ONLY', - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - defaultTagNames: ['ktx'], - }); + await expect(seedLocalMappingStateFromKtxYaml(project, 'prod-metabase')).resolves.toBeUndefined(); }); it('seeds Looker local mappings from ktx.yaml mapping intent', async () => { diff --git a/packages/context/src/ingest/local-mapping-reconcile.ts b/packages/context/src/ingest/local-mapping-reconcile.ts index a1bae2fc..1a58af95 100644 --- a/packages/context/src/ingest/local-mapping-reconcile.ts +++ b/packages/context/src/ingest/local-mapping-reconcile.ts @@ -3,29 +3,8 @@ import { parseConnectionMappingBootstrap, type KtxLocalProject, type LookerMappingBootstrap, - type MetabaseMappingBootstrap, } from '../project/index.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; - -function metabaseSelections(bootstrap: MetabaseMappingBootstrap) { - return [ - ...bootstrap.selections.collections.map((id) => ({ selectionType: 'collection' as const, metabaseObjectId: id })), - ...bootstrap.selections.items.map((id) => ({ selectionType: 'item' as const, metabaseObjectId: id })), - ]; -} - -function metabaseMappings(bootstrap: MetabaseMappingBootstrap) { - const ids = new Set([...Object.keys(bootstrap.databaseMappings), ...Object.keys(bootstrap.syncEnabled)]); - return [...ids] - .map((id) => Number(id)) - .sort((a, b) => a - b) - .map((id) => ({ - metabaseDatabaseId: id, - targetConnectionId: bootstrap.databaseMappings[String(id)] ?? null, - syncEnabled: bootstrap.syncEnabled[String(id)] ?? false, - })); -} function lookerMappings(bootstrap: LookerMappingBootstrap) { return Object.entries(bootstrap.connectionMappings) @@ -44,20 +23,12 @@ export async function seedLocalMappingStateFromKtxYaml(project: KtxLocalProject, return; } - const dbPath = ktxLocalStateDbPath(project); if (bootstrap.adapter === 'metabase') { - await new LocalMetabaseSourceStateReader({ dbPath }).applyYamlBootstrap({ - connectionId, - syncMode: bootstrap.syncMode, - defaultTagNames: bootstrap.defaultTagNames, - selections: metabaseSelections(bootstrap), - mappings: metabaseMappings(bootstrap), - }); return; } if (bootstrap.adapter === 'looker') { - await new LocalLookerRuntimeStore({ dbPath }).applyYamlBootstrap({ + await new LocalLookerRuntimeStore({ dbPath: ktxLocalStateDbPath(project) }).applyYamlBootstrap({ lookerConnectionId: connectionId, mappings: lookerMappings(bootstrap), }); diff --git a/packages/context/src/ingest/local-metabase-ingest.test.ts b/packages/context/src/ingest/local-metabase-ingest.test.ts index da00c7ec..fe3bd80b 100644 --- a/packages/context/src/ingest/local-metabase-ingest.test.ts +++ b/packages/context/src/ingest/local-metabase-ingest.test.ts @@ -4,7 +4,7 @@ import { join } from 'node:path'; import { AgentRunnerService } from '../agent/index.js'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { initKtxProject, type KtxLocalProject } from '../project/index.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; +import { LocalMetabaseDiscoveryCache } from './adapters/metabase/local-source-state-store.js'; import { getLocalIngestStatus, runLocalMetabaseIngest } from './local-ingest.js'; import type { ChunkResult, FetchContext, SourceAdapter } from './types.js'; @@ -94,33 +94,19 @@ describe('runLocalMetabaseIngest', () => { }); async function seedMetabaseState(): Promise { - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.replaceSourceState({ - connectionId: 'prod-metabase', + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': 'warehouse_a', '2': 'warehouse_b' }, + syncEnabled: { '1': true, '2': true }, syncMode: 'ALL', defaultTagNames: ['ktx'], - selections: [], - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: 'localhost', - metabaseDbName: 'a', - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'refresh', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Warehouse B', - metabaseEngine: 'postgres', - metabaseHost: 'localhost', - metabaseDbName: 'b', - targetConnectionId: 'warehouse_b', - syncEnabled: true, - source: 'refresh', - }, + selections: { collections: [], items: [] }, + }; + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); + await discoveryCache.refreshDiscoveredDatabases({ + connectionId: 'prod-metabase', + discovered: [ + { id: 1, name: 'Warehouse A', engine: 'postgres', host: 'localhost', dbName: 'a' }, + { id: 2, name: 'Warehouse B', engine: 'postgres', host: 'localhost', dbName: 'b' }, ], }); } @@ -151,22 +137,10 @@ describe('runLocalMetabaseIngest', () => { }); it('throws before runner work when there are no sync-enabled mapped rows', async () => { - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.replaceSourceState({ - connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: null, - syncEnabled: true, - source: 'refresh', - }, - ], - }); + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': null }, + syncEnabled: { '1': true }, + }; await expect( runLocalMetabaseIngest({ @@ -178,59 +152,28 @@ describe('runLocalMetabaseIngest', () => { ).rejects.toThrow('no sync-enabled mappings with a target connection'); }); - it('throws with refresh guidance for unhydrated sync-enabled rows', async () => { - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.replaceSourceState({ - connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 7, - metabaseDatabaseName: null, - metabaseEngine: null, - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'ktx.yaml', - }, - ], + it('seeds yaml-only Metabase mappings before the unhydrated fan-out preflight', async () => { + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': 'warehouse_a' }, + syncEnabled: { '1': true }, + }; + + const result = await runLocalMetabaseIngest({ + project, + adapters: [new FakeMetabaseSourceAdapter()], + metabaseConnectionId: 'prod-metabase', + agentRunner: new TestAgentRunner(), + jobIdFactory: () => 'metabase-child-1', }); - await expect( - runLocalMetabaseIngest({ - project, - adapters: [new FakeMetabaseSourceAdapter()], + expect(result.status).toBe('all_succeeded'); + expect(result.children).toMatchObject([ + { metabaseConnectionId: 'prod-metabase', - agentRunner: new TestAgentRunner(), - }), - ).rejects.toThrow('run `ktx connection mapping refresh prod-metabase`'); - }); - - it('seeds yaml-only Metabase mappings before the unhydrated fan-out preflight', async () => { - const project = { - projectDir: tempDir, - config: { - ingest: { adapters: ['metabase'] }, - connections: { - 'prod-metabase': { - driver: 'metabase', - mappings: { - databaseMappings: { '1': 'prod-warehouse' }, - syncEnabled: { '1': true }, - }, - }, - 'prod-warehouse': { driver: 'postgres', url: 'postgresql://readonly@db.test/analytics' }, - }, + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', }, - } as never; - - await expect( - runLocalMetabaseIngest({ - project, - adapters: [new FakeMetabaseSourceAdapter()], - metabaseConnectionId: 'prod-metabase', - }), - ).rejects.toThrow('run `ktx connection mapping refresh prod-metabase`'); + ]); }); it('rejects source-dir uploads through the Metabase fan-out runner', async () => { @@ -266,15 +209,15 @@ describe('runLocalMetabaseIngest', () => { it('captures fetch-time child failures and continues later mappings', async () => { await seedMetabaseState(); project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' }; - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 3, - targetConnectionId: 'warehouse_c', - syncEnabled: true, - source: 'cli', - }); - await store.refreshDiscoveredDatabases({ + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': 'warehouse_a', '2': 'warehouse_b', '3': 'warehouse_c' }, + syncEnabled: { '1': true, '2': true, '3': true }, + syncMode: 'ALL', + defaultTagNames: ['ktx'], + selections: { collections: [], items: [] }, + }; + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', discovered: [ { id: 1, name: 'Warehouse A', engine: 'postgres', host: 'localhost', dbName: 'a' }, diff --git a/packages/context/src/package-exports.test.ts b/packages/context/src/package-exports.test.ts index 4fd7e502..9689744e 100644 --- a/packages/context/src/package-exports.test.ts +++ b/packages/context/src/package-exports.test.ts @@ -199,7 +199,9 @@ describe('@ktx/context package exports', () => { expect(ingest.stagedSyncConfigSchema).toBeDefined(); expect(ingest.stagedLookerScopeFileSchema).toBeDefined(); expect(ingest.stagedLookerFetchReportSchema).toBeDefined(); - expect(ingest.LocalMetabaseSourceStateReader).toBeTypeOf('function'); + expect('LocalMetabaseSourceStateReader' in ingest).toBe(false); + expect(ingest.KtxYamlMetabaseSourceStateReader).toBeTypeOf('function'); + expect(ingest.LocalMetabaseDiscoveryCache).toBeTypeOf('function'); expect(ingest.createLocalMetabaseSourceAdapter).toBeTypeOf('function'); expect(ingest.metabaseRuntimeConfigFromLocalConnection).toBeTypeOf('function'); expect(ingest.IngestMetabaseClientFactory).toBeTypeOf('function'); diff --git a/packages/context/src/project/config.ts b/packages/context/src/project/config.ts index f1aa9d71..2412683f 100644 --- a/packages/context/src/project/config.ts +++ b/packages/context/src/project/config.ts @@ -505,13 +505,15 @@ export function parseKtxProjectConfig(raw: string): KtxProjectConfig { return { project: project.trim(), ...(setup - ? { - setup: { - database_connection_ids: stringArray(setup.database_connection_ids, []), - completed_steps: stringArray(setup.completed_steps, []), - }, - } - : {}), + ? { + setup: { + database_connection_ids: stringArray(setup.database_connection_ids, []), + ...(setup.completed_steps !== undefined + ? { completed_steps: stringArray(setup.completed_steps, []) } + : {}), + }, + } + : {}), connections: isRecord(parsed.connections) ? (parsed.connections as Record) : defaults.connections, diff --git a/packages/context/src/project/setup-config.ts b/packages/context/src/project/setup-config.ts index a426caf7..be1e8817 100644 --- a/packages/context/src/project/setup-config.ts +++ b/packages/context/src/project/setup-config.ts @@ -95,7 +95,6 @@ export function setKtxSetupDatabaseConnectionIds( ...config, setup: { database_connection_ids: uniqueConnectionIds, - ...(config.setup?.completed_steps ? { completed_steps: [...config.setup.completed_steps] } : {}), }, }; }