import { localConnectionToWarehouseDescriptor } from '@ktx/context/connections'; import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultLookerConnectionClientFactory, DefaultMetabaseConnectionClientFactory, KtxYamlMetabaseSourceStateReader, LocalLookerRuntimeStore, LocalMetabaseDiscoveryCache, computeLookerMappingDrift, computeMetabaseMappingDrift, discoverLookerConnections, discoverMetabaseDatabases, lookerCredentialsFromLocalConnection, metabaseRuntimeConfigFromLocalConnection, planMetabaseFanoutChildren, seedLocalMappingStateFromKtxYaml, validateLookerMappings, validateMappingPhysicalMatch, type LookerMappingClient, type LocalMetabaseMappingListRow, type MetabaseRuntimeClient, } from '@ktx/context/ingest'; import { type KtxLocalProject, ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { profileMark } from './startup-profile.js'; profileMark('module:source-mapping'); export type KtxSourceMappingArgs = | { command: 'list'; projectDir: string; connectionId: string; json: boolean } | { command: 'refresh'; projectDir: string; connectionId: string; autoAccept: boolean } | { command: 'validate'; projectDir: string; connectionId: string }; interface KtxSourceMappingDeps { createMetabaseClient?: ( project: KtxLocalProject, connectionId: string, ) => Promise>; createLookerClient?: ( project: KtxLocalProject, connectionId: string, ) => Promise & { cleanup?(): Promise }>; } async function createDefaultMetabaseClient( project: KtxLocalProject, connectionId: string, ): Promise> { const factory = new DefaultMetabaseConnectionClientFactory( (metabaseConnectionId) => metabaseRuntimeConfigFromLocalConnection(metabaseConnectionId, project.config.connections[metabaseConnectionId]), DEFAULT_METABASE_CLIENT_CONFIG, ); return factory.createClient(connectionId); } async function createDefaultLookerClient( project: KtxLocalProject, connectionId: string, ): Promise & { cleanup?(): Promise }> { const factory = new DefaultLookerConnectionClientFactory({ async resolve(lookerConnectionId) { return lookerCredentialsFromLocalConnection(lookerConnectionId, project.config.connections[lookerConnectionId]); }, }); return factory.createClient(connectionId) as unknown as Pick & { cleanup?(): Promise; }; } function isLookerConnection(project: KtxLocalProject, connectionId: string): boolean { return String(project.config.connections[connectionId]?.driver ?? '').toLowerCase() === 'looker'; } function assertMetabaseConnection(project: KtxLocalProject, connectionId: string): void { const connection = project.config.connections[connectionId]; if (!connection || String(connection.driver).toLowerCase() !== 'metabase') { throw new Error(`Connection "${connectionId}" is not a Metabase connection`); } } function targetPhysicalInfo(project: KtxLocalProject, connectionId: string) { const descriptor = localConnectionToWarehouseDescriptor(connectionId, project.config.connections[connectionId]); if (!descriptor) { return { connection_type: 'UNKNOWN' }; } return { connection_type: descriptor.connection_type, host: descriptor.host ?? null, database: descriptor.database ?? null, account: descriptor.account ?? null, project_id: descriptor.project_id ?? null, dataset_id: descriptor.dataset_id ?? null, ...descriptor.connection_params, }; } function renderMapping(row: LocalMetabaseMappingListRow): string { const name = row.metabaseDatabaseName ?? 'unhydrated'; const target = row.targetConnectionId ?? '[unmapped]'; return `${row.metabaseDatabaseId} -> ${target} (${name}, sync: ${row.syncEnabled ? 'on' : 'off'}, source: ${ row.source })`; } function renderLookerMapping(row: Awaited>[number]): string { const target = row.ktxConnectionId ?? '[unmapped]'; const metadata = [row.lookerDialect, row.lookerHost, row.lookerDatabase].filter(Boolean).join(', '); return `${row.lookerConnectionName} -> ${target}${metadata ? ` (${metadata}, source: ${row.source})` : ` (source: ${row.source})`}`; } export async function runKtxSourceMapping( args: KtxSourceMappingArgs, io: KtxCliIo = process, deps: KtxSourceMappingDeps = {}, ): Promise { try { const project = await loadKtxProject({ projectDir: args.projectDir }); await seedLocalMappingStateFromKtxYaml(project, args.connectionId); if (isLookerConnection(project, args.connectionId)) { const store = new LocalLookerRuntimeStore({ dbPath: ktxLocalStateDbPath(project) }); if (args.command === 'list') { const rows = await store.listConnectionMappings(args.connectionId); io.stdout.write(args.json ? `${JSON.stringify(rows, null, 2)}\n` : `${rows.map(renderLookerMapping).join('\n')}\n`); return 0; } if (args.command === 'refresh') { const client = await (deps.createLookerClient ?? createDefaultLookerClient)(project, args.connectionId); try { const discovered = await discoverLookerConnections(client); const drift = computeLookerMappingDrift({ storedMappings: await store.readMappings(args.connectionId), discovered, }); if (args.autoAccept) { await store.refreshDiscoveredConnections({ lookerConnectionId: args.connectionId, discovered }); } io.stdout.write(`Discovery: ${discovered.length} ${discovered.length === 1 ? 'connection' : 'connections'}\n`); io.stdout.write(`Unmapped discovered: ${drift.unmappedDiscovered.length}\n`); io.stdout.write(`Stale mappings: ${drift.staleMappings.length}\n`); return 0; } finally { await client.cleanup?.(); } } const knownKtxConnectionIds = new Set(Object.keys(project.config.connections)); const knownConnectionTypes = new Map( Object.entries(project.config.connections).map(([id]) => [id, targetPhysicalInfo(project, id).connection_type]), ); const validation = validateLookerMappings({ mappings: await store.readMappings(args.connectionId), knownKtxConnectionIds, knownConnectionTypes, }); if (!validation.ok) { for (const error of validation.errors) { io.stderr.write(`${error.key}: ${error.reason}\n`); } return 1; } io.stdout.write(`Mapping validation passed: ${args.connectionId}\n`); return 0; } assertMetabaseConnection(project, args.connectionId); const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); const store = new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }); if (args.command === 'list') { const rows = await store.listDatabaseMappings(args.connectionId); io.stdout.write(args.json ? `${JSON.stringify(rows, null, 2)}\n` : `${rows.map(renderMapping).join('\n')}\n`); return 0; } if (args.command === 'refresh') { const client = await (deps.createMetabaseClient ?? createDefaultMetabaseClient)(project, args.connectionId); try { const discovered = await discoverMetabaseDatabases(client); const existing = Object.fromEntries( (await store.listDatabaseMappings(args.connectionId)).map((row) => [ String(row.metabaseDatabaseId), row.targetConnectionId, ]), ); const drift = computeMetabaseMappingDrift({ currentMappings: existing, discovered }); if (args.autoAccept) { await discoveryCache.refreshDiscoveredDatabases({ connectionId: args.connectionId, discovered }); } io.stdout.write(`Discovery: ${discovered.length} ${discovered.length === 1 ? 'database' : 'databases'}\n`); io.stdout.write(`Unmapped discovered: ${drift.unmappedDiscovered.length}\n`); io.stdout.write(`Stale mappings: ${drift.staleMappings.length}\n`); return 0; } finally { await client.cleanup(); } } const rows = await store.listDatabaseMappings(args.connectionId); planMetabaseFanoutChildren({ metabaseConnectionId: args.connectionId, mappings: rows.map((row) => ({ metabaseDatabaseId: row.metabaseDatabaseId, targetConnectionId: row.targetConnectionId, syncEnabled: row.syncEnabled, })), }); const failures = rows.flatMap((row) => { if (!row.targetConnectionId) { return []; } const reason = validateMappingPhysicalMatch( { metabaseEngine: row.metabaseEngine, metabaseDbName: row.metabaseDbName, metabaseHost: row.metabaseHost }, project.config.connections[row.targetConnectionId] ? targetPhysicalInfo(project, row.targetConnectionId) : { connection_type: 'UNKNOWN' }, ); return reason ? [`${row.metabaseDatabaseId}: ${reason}`] : []; }); if (failures.length > 0) { for (const failure of failures) { io.stderr.write(`${failure}\n`); } return 1; } io.stdout.write(`Mapping validation passed: ${args.connectionId}\n`); return 0; } catch (error) { io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); return 1; } }