diff --git a/AGENTS.md b/AGENTS.md index 1e5480f2..4a235864 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,6 +24,9 @@ database migrations, ORPC contracts, or `python-service/` layout exist here. - **MUST**: Keep package/public API changes intentional. Do not add compatibility wrappers for old KTX names unless the user explicitly asks for a migration bridge. +- **MUST**: Treat KTX as having no public users unless the user says otherwise. + Legacy support is not necessary by default; prefer clean breaking changes over + compatibility shims, migration bridges, or preserved stale behavior. ### Absolute Prohibitions diff --git a/packages/cli/src/commands/connection-mapping.test.ts b/packages/cli/src/commands/connection-mapping.test.ts index 7d76cc9d..825c3c4c 100644 --- a/packages/cli/src/commands/connection-mapping.test.ts +++ b/packages/cli/src/commands/connection-mapping.test.ts @@ -1,8 +1,8 @@ -import { mkdtemp, rm } from 'node:fs/promises'; +import { mkdtemp, readFile, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { LocalMetabaseSourceStateReader } from '@ktx/context/ingest'; -import { initKtxProject, loadKtxProject, serializeKtxProjectConfig } from '@ktx/context/project'; +import { LocalMetabaseDiscoveryCache } from '@ktx/context/ingest'; +import { initKtxProject, loadKtxProject, parseKtxProjectConfig, serializeKtxProjectConfig } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { runKtxConnectionMapping } from './connection-mapping.js'; @@ -79,19 +79,24 @@ describe('runKtxConnectionMapping', () => { it('sets, lists, disables, and clears local Metabase mappings', async () => { const io = makeIo(); - await expect( - runKtxConnectionMapping( - { - command: 'set', - projectDir, - connectionId: 'prod-metabase', - field: 'databaseMappings', - key: '1', - value: 'prod-warehouse', - }, - io.io, - ), - ).resolves.toBe(0); + const setCode = await runKtxConnectionMapping( + { + command: 'set', + projectDir, + connectionId: 'prod-metabase', + field: 'databaseMappings', + key: '1', + value: 'prod-warehouse', + }, + io.io, + ); + expect(setCode, io.stderr()).toBe(0); + + let config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toMatchObject({ + databaseMappings: { '1': 'prod-warehouse' }, + syncEnabled: { '1': true }, + }); const listIo = makeIo(); await expect( @@ -113,6 +118,12 @@ describe('runKtxConnectionMapping', () => { ), ).resolves.toBe(0); + config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toMatchObject({ + databaseMappings: { '1': 'prod-warehouse' }, + syncEnabled: { '1': false }, + }); + await expect( runKtxConnectionMapping( { @@ -124,6 +135,9 @@ describe('runKtxConnectionMapping', () => { makeIo().io, ), ).resolves.toBe(0); + + config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toBeUndefined(); }); it('lists Metabase yaml mapping bootstrap rows before any SQLite command writes', async () => { @@ -194,9 +208,11 @@ describe('runKtxConnectionMapping', () => { expect(io.stdout()).toContain('Discovery: 1 database'); expect(client.cleanup).toHaveBeenCalledTimes(1); - const store = new LocalMetabaseSourceStateReader({ dbPath: join(projectDir, '.ktx', 'db.sqlite') }); - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { metabaseDatabaseId: 1, metabaseDatabaseName: 'Analytics', source: 'refresh' }, + const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['prod-metabase']?.mappings).toBeUndefined(); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(projectDir, '.ktx', 'db.sqlite') }); + await expect(discoveryCache.listDiscoveredDatabases('prod-metabase')).resolves.toMatchObject([ + { id: 1, name: 'Analytics', engine: 'postgres' }, ]); }); diff --git a/packages/cli/src/commands/connection-mapping.ts b/packages/cli/src/commands/connection-mapping.ts index b35bf40f..5bae8e6e 100644 --- a/packages/cli/src/commands/connection-mapping.ts +++ b/packages/cli/src/commands/connection-mapping.ts @@ -4,8 +4,9 @@ import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultLookerConnectionClientFactory, DefaultMetabaseConnectionClientFactory, + KtxYamlMetabaseSourceStateReader, LocalLookerRuntimeStore, - LocalMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, computeLookerMappingDrift, computeMetabaseMappingDrift, discoverLookerConnections, @@ -16,10 +17,18 @@ import { validateLookerMappings, validateMappingPhysicalMatch, type LookerMappingClient, + type LocalMetabaseMappingListRow, type MetabaseRuntimeClient, type MetabaseSyncMode, } from '@ktx/context/ingest'; -import { type KtxLocalProject, ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project'; +import { + type KtxLocalProject, + type KtxProjectConfig, + ktxLocalStateDbPath, + loadKtxProject, + parseMetabaseMappingBootstrap, + serializeKtxProjectConfig, +} from '@ktx/context/project'; import type { KtxCliIo } from '../index.js'; import { profileMark } from '../startup-profile.js'; @@ -84,6 +93,89 @@ function parseId(value: string, label: string): number { return parsed; } +interface MetabaseMappingsBlock { + databaseMappings: Record; + syncEnabled: Record; + syncMode: MetabaseSyncMode; + selections: { collections: number[]; items: number[] }; + defaultTagNames: string[]; +} + +function currentMetabaseMappings(project: KtxLocalProject, connectionId: string): MetabaseMappingsBlock { + const connection = project.config.connections[connectionId]; + if (!connection) { + throw new Error(`Connection "${connectionId}" is not configured in ktx.yaml`); + } + const bootstrap = parseMetabaseMappingBootstrap(connectionId, connection); + return { + databaseMappings: { ...bootstrap.databaseMappings }, + syncEnabled: { ...bootstrap.syncEnabled }, + syncMode: bootstrap.syncMode, + selections: { + collections: [...bootstrap.selections.collections], + items: [...bootstrap.selections.items], + }, + defaultTagNames: [...bootstrap.defaultTagNames], + }; +} + +function hasMetabaseMappings(block: MetabaseMappingsBlock): boolean { + return ( + Object.keys(block.databaseMappings).length > 0 || + Object.keys(block.syncEnabled).length > 0 || + block.syncMode !== 'ALL' || + block.selections.collections.length > 0 || + block.selections.items.length > 0 || + block.defaultTagNames.length > 0 + ); +} + +function serializeMetabaseMappingsBlock(block: MetabaseMappingsBlock): Record | undefined { + if (!hasMetabaseMappings(block)) { + return undefined; + } + return { + databaseMappings: block.databaseMappings, + syncEnabled: block.syncEnabled, + syncMode: block.syncMode, + selections: block.selections, + defaultTagNames: block.defaultTagNames, + }; +} + +async function writeMetabaseMappings( + project: KtxLocalProject, + connectionId: string, + block: MetabaseMappingsBlock, + message: string, +): Promise { + const connection = project.config.connections[connectionId]; + if (!connection) { + throw new Error(`Connection "${connectionId}" is not configured in ktx.yaml`); + } + const mappings = serializeMetabaseMappingsBlock(block); + const nextConnection = { ...connection }; + if (mappings) { + nextConnection.mappings = mappings; + } else { + delete nextConnection.mappings; + } + const nextConfig: KtxProjectConfig = { + ...project.config, + connections: { + ...project.config.connections, + [connectionId]: nextConnection, + }, + }; + await project.fileStore.writeFile( + 'ktx.yaml', + serializeKtxProjectConfig(nextConfig), + 'ktx', + 'ktx@example.com', + message, + ); +} + async function createDefaultMetabaseClient( project: KtxLocalProject, connectionId: string, @@ -149,9 +241,7 @@ function targetPhysicalInfo(project: KtxLocalProject, connectionId: string) { }; } -function renderMapping( - row: Awaited>[number], -): string { +function renderMapping(row: LocalMetabaseMappingListRow): string { const name = row.metabaseDatabaseName ?? 'unhydrated'; const target = row.targetConnectionId ?? '[unmapped]'; return `${row.metabaseDatabaseId} -> ${target} (${name}, sync: ${row.syncEnabled ? 'on' : 'off'}, source: ${ @@ -255,92 +345,78 @@ export async function runKtxConnectionMapping( } assertMetabaseConnection(project, args.connectionId); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + const metabaseStateReader = new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }); if (args.command === 'list') { - const rows = await store.listDatabaseMappings(args.connectionId); + const rows = await metabaseStateReader.listDatabaseMappings(args.connectionId); io.stdout.write(args.json ? `${JSON.stringify(rows, null, 2)}\n` : `${rows.map(renderMapping).join('\n')}\n`); return 0; } if (args.command === 'set') { + if (args.field !== 'databaseMappings') { + throw new Error('Metabase mapping set requires databaseMappings ='); + } assertTargetConnection(project, args.value); - await store.upsertDatabaseMapping({ - connectionId: args.connectionId, - metabaseDatabaseId: parseId(args.key, 'metabaseDatabaseId'), - targetConnectionId: args.value, - syncEnabled: true, - source: 'cli', - }); + const block = currentMetabaseMappings(project, args.connectionId); + const metabaseDatabaseId = String(parseId(args.key, 'metabaseDatabaseId')); + block.databaseMappings[metabaseDatabaseId] = args.value; + block.syncEnabled[metabaseDatabaseId] = true; + await writeMetabaseMappings(project, args.connectionId, block, `Set Metabase mapping ${args.connectionId}.${metabaseDatabaseId}`); io.stdout.write(`Set databaseMappings.${args.key} = ${args.value}\n`); return 0; } if (args.command === 'apply-bulk') { const payload = JSON.parse(await readFile(args.filePath, 'utf8')) as MetabaseBulkMappingPayload; - const existingState = await store.getSourceState(args.connectionId); - const existingRows = await store.listDatabaseMappings(args.connectionId); - const existingById = new Map(existingRows.map((row) => [row.metabaseDatabaseId, row])); + const block = currentMetabaseMappings(project, args.connectionId); const databaseMappings = payload.databaseMappings ?? {}; for (const targetConnectionId of Object.values(databaseMappings)) { if (targetConnectionId) { assertTargetConnection(project, targetConnectionId); } } - const mappingIds = new Set([ - ...existingRows.map((row) => row.metabaseDatabaseId), - ...Object.keys(databaseMappings).map((id) => parseId(id, 'metabaseDatabaseId')), - ...Object.keys(payload.syncEnabled ?? {}).map((id) => parseId(id, 'metabaseDatabaseId')), - ]); - await store.replaceSourceState({ - connectionId: args.connectionId, - syncMode: payload.syncMode ?? existingState.syncMode, - defaultTagNames: payload.defaultTagNames ?? existingState.defaultTagNames, - selections: - payload.selections === undefined - ? existingState.selections - : [ - ...(payload.selections.collections ?? []).map((id) => ({ - selectionType: 'collection' as const, - metabaseObjectId: id, - })), - ...(payload.selections.items ?? []).map((id) => ({ - selectionType: 'item' as const, - metabaseObjectId: id, - })), - ], - mappings: [...mappingIds] - .sort((a, b) => a - b) - .map((id) => { - const existing = existingById.get(id); - return { - metabaseDatabaseId: id, - metabaseDatabaseName: existing?.metabaseDatabaseName ?? null, - metabaseEngine: existing?.metabaseEngine ?? null, - metabaseHost: existing?.metabaseHost ?? null, - metabaseDbName: existing?.metabaseDbName ?? null, - targetConnectionId: databaseMappings[String(id)] ?? existing?.targetConnectionId ?? null, - syncEnabled: payload.syncEnabled?.[String(id)] ?? existing?.syncEnabled ?? false, - source: 'cli', - }; - }), - }); + for (const id of Object.keys(databaseMappings)) { + parseId(id, 'metabaseDatabaseId'); + block.databaseMappings[id] = databaseMappings[id] ?? null; + } + for (const [id, enabled] of Object.entries(payload.syncEnabled ?? {})) { + parseId(id, 'metabaseDatabaseId'); + block.syncEnabled[id] = enabled; + } + if (payload.syncMode !== undefined) { + block.syncMode = payload.syncMode; + } + if (payload.defaultTagNames !== undefined) { + block.defaultTagNames = payload.defaultTagNames; + } + if (payload.selections !== undefined) { + block.selections = { + collections: payload.selections.collections ?? [], + items: payload.selections.items ?? [], + }; + } + await writeMetabaseMappings(project, args.connectionId, block, `Apply Metabase mappings ${args.connectionId}`); io.stdout.write(`Applied bulk mappings for ${args.connectionId}\n`); return 0; } if (args.command === 'set-sync-enabled') { - await store.setMappingSyncEnabled({ - connectionId: args.connectionId, - metabaseDatabaseId: args.metabaseDatabaseId, - syncEnabled: args.enabled, - }); + const block = currentMetabaseMappings(project, args.connectionId); + block.syncEnabled[String(args.metabaseDatabaseId)] = args.enabled; + await writeMetabaseMappings( + project, + args.connectionId, + block, + `Set Metabase sync ${args.connectionId}.${args.metabaseDatabaseId}`, + ); io.stdout.write(`Set syncEnabled.${args.metabaseDatabaseId} = ${args.enabled}\n`); return 0; } if (args.command === 'sync-state-get') { - const state = await store.getSourceState(args.connectionId); + const state = await metabaseStateReader.getSourceState(args.connectionId); const payload = { syncMode: state.syncMode, selections: state.selections, @@ -351,15 +427,11 @@ export async function runKtxConnectionMapping( } if (args.command === 'sync-state-set') { - await store.setSyncState({ - connectionId: args.connectionId, - syncMode: args.syncMode, - defaultTagNames: args.tagNames, - selections: [ - ...args.collectionIds.map((id) => ({ selectionType: 'collection' as const, metabaseObjectId: id })), - ...args.itemIds.map((id) => ({ selectionType: 'item' as const, metabaseObjectId: id })), - ], - }); + const block = currentMetabaseMappings(project, args.connectionId); + block.syncMode = args.syncMode; + block.defaultTagNames = args.tagNames; + block.selections = { collections: args.collectionIds, items: args.itemIds }; + await writeMetabaseMappings(project, args.connectionId, block, `Set Metabase sync state ${args.connectionId}`); io.stdout.write(`Set sync state for ${args.connectionId}\n`); return 0; } @@ -368,15 +440,11 @@ export async function runKtxConnectionMapping( const client = await (deps.createMetabaseClient ?? createDefaultMetabaseClient)(project, args.connectionId); try { const discovered = await discoverMetabaseDatabases(client); - const existing = Object.fromEntries( - (await store.listDatabaseMappings(args.connectionId)).map((row) => [ - String(row.metabaseDatabaseId), - row.targetConnectionId, - ]), - ); + const block = currentMetabaseMappings(project, args.connectionId); + const existing = block.databaseMappings; const drift = computeMetabaseMappingDrift({ currentMappings: existing, discovered }); if (args.autoAccept) { - await store.refreshDiscoveredDatabases({ connectionId: args.connectionId, discovered }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: args.connectionId, discovered }); } io.stdout.write(`Discovery: ${discovered.length} ${discovered.length === 1 ? 'database' : 'databases'}\n`); io.stdout.write(`Unmapped discovered: ${drift.unmappedDiscovered.length}\n`); @@ -388,7 +456,9 @@ export async function runKtxConnectionMapping( } if (args.command === 'validate') { - const rows = await store.listDatabaseMappings(args.connectionId); + const rows = (await metabaseStateReader.listDatabaseMappings(args.connectionId)).filter( + (row) => row.source === 'ktx.yaml', + ); const failures = rows.flatMap((row) => { if (!row.targetConnectionId) { return []; @@ -412,7 +482,18 @@ export async function runKtxConnectionMapping( } const metabaseDatabaseId = args.metabaseDatabaseId ?? (args.mappingKey ? parseId(args.mappingKey, 'metabaseDatabaseId') : undefined); - await store.clearDatabaseMappings({ connectionId: args.connectionId, metabaseDatabaseId }); + const block = currentMetabaseMappings(project, args.connectionId); + if (metabaseDatabaseId === undefined) { + block.databaseMappings = {}; + block.syncEnabled = {}; + block.syncMode = 'ALL'; + block.selections = { collections: [], items: [] }; + block.defaultTagNames = []; + } else { + delete block.databaseMappings[String(metabaseDatabaseId)]; + delete block.syncEnabled[String(metabaseDatabaseId)]; + } + await writeMetabaseMappings(project, args.connectionId, block, `Clear Metabase mappings ${args.connectionId}`); io.stdout.write( metabaseDatabaseId ? `Cleared databaseMappings.${metabaseDatabaseId}\n` diff --git a/packages/cli/src/commands/connection-metabase-setup.test.ts b/packages/cli/src/commands/connection-metabase-setup.test.ts index 9d462bbd..7b7b7b84 100644 --- a/packages/cli/src/commands/connection-metabase-setup.test.ts +++ b/packages/cli/src/commands/connection-metabase-setup.test.ts @@ -1,7 +1,7 @@ import { mkdtemp, readFile, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { LocalMetabaseSourceStateReader } from '@ktx/context/ingest'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from '@ktx/context/ingest'; import { initKtxProject, ktxLocalStateDbPath, loadKtxProject, serializeKtxProjectConfig } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; @@ -9,6 +9,12 @@ import { runKtxConnectionMetabaseSetup } from './connection-metabase-setup.js'; const CANCEL_PROMPT = Symbol('cancel'); +async function metabaseMappingRows(projectDir: string, connectionId = 'metabase') { + const project = await loadKtxProject({ projectDir }); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + return new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }).listDatabaseMappings(connectionId); +} + function createTestMetabaseSetupPromptAdapter(options: { selects?: Array; multiselects?: Array | typeof CANCEL_PROMPT>; @@ -238,10 +244,7 @@ describe('runKtxConnectionMetabaseSetup', () => { expect(config).toContain('driver: metabase'); expect(config).toContain('api_url: http://metabase.example.test:3000'); expect(config).toContain('api_key: mb_example'); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, metabaseDatabaseName: 'Analytics', @@ -294,10 +297,7 @@ describe('runKtxConnectionMetabaseSetup', () => { { createMetabaseClient: async () => metabaseClient as never }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', syncEnabled: true }, ]); }); @@ -369,10 +369,7 @@ describe('runKtxConnectionMetabaseSetup', () => { { createMetabaseClient: async () => metabaseClient as never }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', syncEnabled: true }, ]); }); @@ -659,10 +656,7 @@ describe('runKtxConnectionMetabaseSetup', () => { { createMetabaseClient: async () => metabaseClient as never }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 1, targetConnectionId: 'orbit', syncEnabled: true }, { metabaseDatabaseId: 2, targetConnectionId: null, syncEnabled: false }, ]); @@ -785,10 +779,7 @@ describe('runKtxConnectionMetabaseSetup', () => { const config = await readFile(join(projectDir, 'ktx.yaml'), 'utf-8'); expect(config).toContain('driver: metabase'); expect(io.stderr()).toContain(`ktx ingest run --connection-id metabase --adapter metabase --project-dir ${projectDir}`); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit' }, ]); }); @@ -886,10 +877,7 @@ describe('runKtxConnectionMetabaseSetup', () => { expect(config).toContain('driver: metabase'); expect(config).toContain('api_url: http://metabase.example.test:3000'); expect(config).toContain(`api_key: ${interactiveMetabaseCredential}`); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', @@ -957,10 +945,7 @@ describe('runKtxConnectionMetabaseSetup', () => { }, ), ).resolves.toBe(0); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toMatchObject([ + await expect(metabaseMappingRows(projectDir)).resolves.toMatchObject([ { metabaseDatabaseId: 2, targetConnectionId: 'orbit', syncEnabled: true }, { metabaseDatabaseId: 3, targetConnectionId: 'warehouse2', syncEnabled: false }, ]); @@ -1128,9 +1113,6 @@ describe('runKtxConnectionMetabaseSetup', () => { const afterConfig = await readFile(join(projectDir, 'ktx.yaml'), 'utf-8'); expect(afterConfig).toBe(beforeConfig); - - const updatedProject = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - await expect(store.listDatabaseMappings('metabase')).resolves.toEqual([]); + await expect(metabaseMappingRows(projectDir)).resolves.toEqual([]); }); }); diff --git a/packages/cli/src/commands/connection-metabase-setup.ts b/packages/cli/src/commands/connection-metabase-setup.ts index 2321ea3d..b0980c3b 100644 --- a/packages/cli/src/commands/connection-metabase-setup.ts +++ b/packages/cli/src/commands/connection-metabase-setup.ts @@ -16,7 +16,8 @@ import { localConnectionToWarehouseDescriptor } from '@ktx/context/connections'; import { DEFAULT_METABASE_CLIENT_CONFIG, DefaultMetabaseConnectionClientFactory, - LocalMetabaseSourceStateReader, + KtxYamlMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, MetabaseClient, type MetabaseDatabase, type MetabaseRuntimeClient, @@ -29,6 +30,7 @@ import { type KtxProjectConnectionConfig, ktxLocalStateDbPath, loadKtxProject, + parseMetabaseMappingBootstrap, serializeKtxProjectConfig, } from '@ktx/context/project'; @@ -338,6 +340,33 @@ function noteMetabaseSetupSummary(options: { ); } +function metabaseMappingsBlockForSetup(options: { + connectionId: string; + connection: KtxProjectConnectionConfig; + mappings: MetabaseSetupMappingAssignment[]; + syncEnabledDatabaseIds: number[]; + syncMode: MetabaseSetupSyncMode; +}): Record { + const existing = parseMetabaseMappingBootstrap(options.connectionId, options.connection); + const databaseMappings = { ...existing.databaseMappings }; + const syncEnabled = { ...existing.syncEnabled }; + for (const mapping of options.mappings) { + const key = String(mapping.metabaseDatabaseId); + databaseMappings[key] = mapping.targetConnectionId; + syncEnabled[key] = false; + } + for (const metabaseDatabaseId of options.syncEnabledDatabaseIds) { + syncEnabled[String(metabaseDatabaseId)] = true; + } + return { + databaseMappings, + syncEnabled, + syncMode: options.syncMode, + selections: existing.selections, + defaultTagNames: existing.defaultTagNames, + }; +} + export async function runKtxConnectionMetabaseSetup( args: KtxConnectionMetabaseSetupArgs, io: KtxCliIo, @@ -674,54 +703,37 @@ export async function runKtxConnectionMetabaseSetup( } } + const finalConnectionConfig: KtxProjectConnectionConfig = { + ...transientConnectionConfig, + mappings: metabaseMappingsBlockForSetup({ + connectionId, + connection: transientConnectionConfig, + mappings: resolvedMappings, + syncEnabledDatabaseIds: resolvedSyncEnabledDatabaseIds, + syncMode: args.syncMode, + }), + }; + const finalConfig = { + ...configWithTransient, + connections: { + ...configWithTransient.connections, + [connectionId]: finalConnectionConfig, + }, + }; await project.fileStore.writeFile( 'ktx.yaml', - serializeKtxProjectConfig(configWithTransient), + serializeKtxProjectConfig(finalConfig), 'ktx', 'ktx@example.com', `Setup Metabase connection ${connectionId}`, ); const updatedProject = await loadKtxProject({ projectDir: args.projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(updatedProject) }); - - await store.refreshDiscoveredDatabases({ connectionId, discovered }); - - for (const mapping of resolvedMappings) { - await store.upsertDatabaseMapping({ - connectionId, - metabaseDatabaseId: mapping.metabaseDatabaseId, - targetConnectionId: mapping.targetConnectionId, - syncEnabled: false, - source: 'cli', - }); - } - - for (const metabaseDatabaseId of resolvedSyncEnabledDatabaseIds) { - await store.setMappingSyncEnabled({ - connectionId, - metabaseDatabaseId, - syncEnabled: true, - }); - } - - const existingSyncState = await store.getSourceState(connectionId); - await store.setSyncState({ + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(updatedProject) }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId, discovered }); + const rows = await new KtxYamlMetabaseSourceStateReader(updatedProject, { discoveryCache }).listDatabaseMappings( connectionId, - syncMode: args.syncMode, - defaultTagNames: existingSyncState.defaultTagNames, - selections: existingSyncState.selections, - }); - - const unhydrated = await store.getUnhydratedSyncEnabledMappingIds(connectionId); - if (unhydrated.length > 0) { - io.stderr.write( - `Sync-enabled mappings are missing discovery metadata; run ktx connection mapping refresh ${connectionId} --auto-accept\n`, - ); - return 1; - } - - const rows = await store.listDatabaseMappings(connectionId); + ); const physicalFailures = rows.flatMap((row) => { if (!row.targetConnectionId) { return []; diff --git a/packages/cli/src/ingest.test-utils.ts b/packages/cli/src/ingest.test-utils.ts index 9241fa34..3596d215 100644 --- a/packages/cli/src/ingest.test-utils.ts +++ b/packages/cli/src/ingest.test-utils.ts @@ -3,7 +3,8 @@ import { mkdir, writeFile } from 'node:fs/promises'; import { join } from 'node:path'; import { AgentRunnerService, type RunLoopParams } from '@ktx/context/agent'; import { - LocalMetabaseSourceStateReader, + KtxYamlMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, MetabaseSourceAdapter, getLocalIngestStatus, type ChunkResult, @@ -493,6 +494,23 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync ' driver: metabase', ' api_url: https://metabase.example.test', ' api_key: literal-test-key', + ' mappings:', + ' databaseMappings:', + ' "1": warehouse_a', + ' syncEnabled:', + ' "1": true', + ` syncMode: ${input.syncMode}`, + ' selections:', + ` collections: [${input.selections + .filter((selection) => selection.selectionType === 'collection') + .map((selection) => selection.metabaseObjectId) + .join(', ')}]`, + ` items: [${input.selections + .filter((selection) => selection.selectionType === 'item') + .map((selection) => selection.metabaseObjectId) + .join(', ')}]`, + ' defaultTagNames:', + ' - sync-mode-smoke', ' warehouse_a:', ' driver: postgres', ' url: postgresql://readonly@db.example.test/warehouse_a', @@ -507,29 +525,15 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync ); const project = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); - await store.replaceSourceState({ + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', - syncMode: input.syncMode, - defaultTagNames: ['sync-mode-smoke'], - selections: input.selections, - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: 'db.example.test', - metabaseDbName: 'warehouse_a', - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'refresh', - }, - ], + discovered: [{ id: 1, name: 'Warehouse A', engine: 'postgres', host: 'db.example.test', dbName: 'warehouse_a' }], }); const adapter = new MetabaseSourceAdapter({ clientFactory: new StaticMetabaseClientFactory(createSyncModeMetabaseClient()), - sourceStateReader: store, + sourceStateReader: new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }), }); const jobId = `metabase-sync-mode-${input.name}-child`; const io = makeIo(); diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index de226bc4..24f8c1ca 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -3,7 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { LocalLookerRuntimeStore, - LocalMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, type LocalIngestResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, @@ -433,6 +433,16 @@ describe('runKtxIngest', () => { ' driver: metabase', ' api_url: https://metabase.example.test', ' api_key: literal-test-key', + ' mappings:', + ' databaseMappings:', + ' "1": warehouse_a', + ' "2": warehouse_b', + ' syncEnabled:', + ' "1": true', + ' "2": true', + ' syncMode: ALL', + ' defaultTagNames:', + ' - ktx', ' warehouse_a:', ' driver: postgres', ' url: postgresql://readonly@db.example.test/warehouse_a', @@ -449,33 +459,12 @@ describe('runKtxIngest', () => { 'utf-8', ); const project = await loadKtxProject({ projectDir }); - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); - await store.replaceSourceState({ + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', - syncMode: 'ALL', - defaultTagNames: ['ktx'], - selections: [], - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: 'db.example.test', - metabaseDbName: 'warehouse_a', - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'refresh', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Warehouse B', - metabaseEngine: 'postgres', - metabaseHost: 'db.example.test', - metabaseDbName: 'warehouse_b', - targetConnectionId: 'warehouse_b', - syncEnabled: true, - source: 'refresh', - }, + discovered: [ + { id: 1, name: 'Warehouse A', engine: 'postgres', host: 'db.example.test', dbName: 'warehouse_a' }, + { id: 2, name: 'Warehouse B', engine: 'postgres', host: 'db.example.test', dbName: 'warehouse_b' }, ], }); const adapter = new CliMetabaseSourceAdapter(); diff --git a/packages/cli/src/setup-agents.ts b/packages/cli/src/setup-agents.ts index b4202ed6..3c7829c7 100644 --- a/packages/cli/src/setup-agents.ts +++ b/packages/cli/src/setup-agents.ts @@ -6,7 +6,6 @@ import { loadKtxProject, markKtxSetupStateStepComplete, serializeKtxProjectConfig, - stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { withMenuOptionsSpacing, withMultiselectNavigation } from './prompt-navigation.js'; @@ -364,7 +363,7 @@ async function installTarget(input: { async function markAgentsComplete(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(project.config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'agents'); } diff --git a/packages/cli/src/setup-context.test.ts b/packages/cli/src/setup-context.test.ts index 1ef044ae..9115d7a5 100644 --- a/packages/cli/src/setup-context.test.ts +++ b/packages/cli/src/setup-context.test.ts @@ -1,7 +1,7 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { readKtxSetupState } from '@ktx/context/project'; +import { readKtxSetupState, writeKtxSetupState } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { @@ -40,12 +40,6 @@ async function writeReadyProject(projectDir: string) { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - project', - ' - llm', - ' - embeddings', - ' - databases', - ' - sources', 'connections:', ' warehouse:', ' driver: postgres', @@ -71,6 +65,9 @@ async function writeReadyProject(projectDir: string) { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(projectDir, { + completed_steps: ['project', 'llm', 'embeddings', 'databases', 'sources'], + }); } async function writeScanReport( diff --git a/packages/cli/src/setup-context.ts b/packages/cli/src/setup-context.ts index 04a572ac..94589bdc 100644 --- a/packages/cli/src/setup-context.ts +++ b/packages/cli/src/setup-context.ts @@ -5,11 +5,9 @@ import { cancel, isCancel, select } from '@clack/prompts'; import { type KtxLocalProject, loadKtxProject, - ktxSetupCompletedSteps, markKtxSetupStateStepComplete, readKtxSetupState, serializeKtxProjectConfig, - stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { buildPublicIngestPlan } from './public-ingest.js'; @@ -470,7 +468,7 @@ async function defaultVerifyContextReady(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(project.config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'context'); } @@ -704,7 +702,7 @@ export async function runKtxSetupContextStep( try { const project = await loadKtxProject({ projectDir: args.projectDir }); const existingState = await readKtxSetupContextState(args.projectDir); - const completedSteps = ktxSetupCompletedSteps(project.config, await readKtxSetupState(args.projectDir)); + const completedSteps = (await readKtxSetupState(args.projectDir)).completed_steps; if (completedSteps.includes('context') && existingState.status === 'completed') { return { status: 'ready', projectDir: args.projectDir, runId: existingState.runId ?? 'setup-context-completed' }; } diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index 46506ae7..65ee191a 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -1,7 +1,7 @@ import { mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join, resolve } from 'node:path'; -import { initKtxProject, parseKtxProjectConfig, readKtxSetupState } from '@ktx/context/project'; +import { initKtxProject, parseKtxProjectConfig, readKtxSetupState, writeKtxSetupState } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { type KtxSetupDatabaseDriver, @@ -548,12 +548,11 @@ describe('setup databases step', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - databases', '', ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['databases'] }); const prompts = makePromptAdapter({ multiselectValues: [['back']], selectValues: ['continue'] }); const testConnection = vi.fn(async () => 0); const scanConnection = vi.fn(async () => 0); @@ -590,12 +589,11 @@ describe('setup databases step', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - databases', '', ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['databases'] }); const prompts = makePromptAdapter({ selectValues: ['add', 'url', 'continue'], multiselectValues: [['mysql']], @@ -706,12 +704,11 @@ describe('setup databases step', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - databases', '', ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['databases'] }); const io = makeIo(); const prompts = makePromptAdapter({ multiselectValues: [[]], @@ -1124,7 +1121,6 @@ describe('setup databases step', () => { }); expect(config.setup).toEqual({ database_connection_ids: ['warehouse'], - completed_steps: [], }); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); expect(io.stdout()).toContain('Primary source ready'); @@ -1163,7 +1159,6 @@ describe('setup databases step', () => { }); expect(config.setup).toEqual({ database_connection_ids: ['warehouse'], - completed_steps: [], }); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); }); @@ -1213,7 +1208,7 @@ describe('setup databases step', () => { expect(scanConnection).toHaveBeenCalledTimes(2); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); expect(config.setup?.database_connection_ids).toEqual(['warehouse', 'analytics']); - expect(config.setup?.completed_steps).toEqual([]); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); }); @@ -1239,7 +1234,7 @@ describe('setup databases step', () => { expect(result.status).toBe('failed'); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); expect(config.connections.warehouse).toMatchObject({ driver: 'postgres', url: 'env:DATABASE_URL' }); - expect(config.setup?.completed_steps ?? []).not.toContain('databases'); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(io.stderr()).toContain('Structural scan failed for warehouse.'); }); @@ -1544,7 +1539,6 @@ describe('setup databases step', () => { expect(result.status).toBe('skipped'); expect(io.stdout()).toContain('KTX cannot work until you add a primary source.'); - const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps ?? []).not.toContain('databases'); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); }); }); diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index f770c5c4..eceaf5bb 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -7,7 +7,6 @@ import { markKtxSetupStateStepComplete, serializeKtxProjectConfig, setKtxSetupDatabaseConnectionIds, - stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import type { KtxTableListEntry } from '@ktx/context/scan'; import type { KtxCliIo } from './cli-runtime.js'; @@ -1020,7 +1019,7 @@ async function writeConnectionConfig(input: { [input.connectionId]: input.connection, }, }; - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); const historicSql = typeof input.connection.historicSql === 'object' && @@ -1314,7 +1313,7 @@ async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); const config = setKtxSetupDatabaseConnectionIds(project.config, unique(connectionIds)); - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'databases'); } diff --git a/packages/cli/src/setup-embeddings.test.ts b/packages/cli/src/setup-embeddings.test.ts index 67ef83b3..e66aa05a 100644 --- a/packages/cli/src/setup-embeddings.test.ts +++ b/packages/cli/src/setup-embeddings.test.ts @@ -1,7 +1,7 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { initKtxProject, parseKtxProjectConfig, readKtxSetupState } from '@ktx/context/project'; +import { initKtxProject, parseKtxProjectConfig, readKtxSetupState, writeKtxSetupState } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { type KtxSetupEmbeddingsPromptAdapter, runKtxSetupEmbeddingsStep } from './setup-embeddings.js'; @@ -172,7 +172,7 @@ describe('setup embeddings step', () => { sentenceTransformers: { base_url: 'managed:local-embeddings', pathPrefix: '' }, }); expect(config.scan.enrichment.embeddings).toMatchObject(config.ingest.embeddings); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('embeddings'); expect(spinnerEvents).toContainEqual( 'start:Testing local sentence-transformers embeddings (all-MiniLM-L6-v2, 384 dimensions). First run may take up to 60 seconds.', @@ -251,7 +251,7 @@ describe('setup embeddings step', () => { sentenceTransformers: { base_url: 'managed:local-embeddings', pathPrefix: '' }, }); expect(config.scan.enrichment.embeddings).toMatchObject(config.ingest.embeddings); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('embeddings'); }); @@ -301,7 +301,7 @@ describe('setup embeddings step', () => { expect(result.status).toBe('failed'); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps ?? []).not.toContain('embeddings'); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(config.ingest.embeddings.backend).toBe('deterministic'); expect(io.stderr()).toContain('Local embedding health check failed: 401 invalid api key [redacted]'); expect(io.stderr()).toContain('Prepare the runtime with: ktx dev runtime start --feature local-embeddings'); @@ -413,7 +413,7 @@ describe('setup embeddings step', () => { expect(result.status).toBe('skipped'); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps ?? []).not.toContain('embeddings'); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(config.ingest.embeddings.backend).toBe('deterministic'); }); @@ -450,10 +450,6 @@ describe('setup embeddings step', () => { 'project: warehouse', 'setup:', ' database_connection_ids: []', - ' completed_steps:', - ' - project', - ' - llm', - ' - embeddings', 'connections: {}', 'ingest:', ' embeddings:', @@ -466,6 +462,7 @@ describe('setup embeddings step', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'llm', 'embeddings'] }); const healthCheck = vi.fn(async () => ({ ok: true as const })); await expect( diff --git a/packages/cli/src/setup-embeddings.ts b/packages/cli/src/setup-embeddings.ts index 1b6a2381..ba3333f1 100644 --- a/packages/cli/src/setup-embeddings.ts +++ b/packages/cli/src/setup-embeddings.ts @@ -4,12 +4,10 @@ import { resolveKtxConfigReference } from '@ktx/context/core'; import { type KtxProjectConfig, type KtxProjectEmbeddingConfig, - ktxSetupCompletedSteps, loadKtxProject, markKtxSetupStateStepComplete, readKtxSetupState, serializeKtxProjectConfig, - stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import { type KtxEmbeddingConfig, type KtxEmbeddingHealthCheckResult, runKtxEmbeddingHealthCheck } from '@ktx/llm'; import type { KtxCliIo } from './cli-runtime.js'; @@ -110,7 +108,7 @@ function createPromptAdapter(): KtxSetupEmbeddingsPromptAdapter { async function hasCompletedEmbeddings(projectDir: string, config: KtxProjectConfig): Promise { return ( - ktxSetupCompletedSteps(config, await readKtxSetupState(projectDir)).includes('embeddings') && + (await readKtxSetupState(projectDir)).completed_steps.includes('embeddings') && config.ingest.embeddings.backend !== 'none' && config.ingest.embeddings.backend !== 'deterministic' && typeof config.ingest.embeddings.model === 'string' && @@ -184,22 +182,20 @@ function embeddingBackendDisplayName(backend: KtxSetupEmbeddingBackend): string async function persistEmbeddingConfig(projectDir: string, embeddings: KtxProjectEmbeddingConfig): Promise { const project = await loadKtxProject({ projectDir }); - const config = stripKtxSetupCompletedSteps( - { - ...project.config, - ingest: { - ...project.config.ingest, + const config = { + ...project.config, + ingest: { + ...project.config.ingest, + embeddings, + }, + scan: { + ...project.config.scan, + enrichment: { + ...project.config.scan.enrichment, embeddings, }, - scan: { - ...project.config.scan, - enrichment: { - ...project.config.scan.enrichment, - embeddings, - }, - }, }, - ); + }; await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'embeddings'); } diff --git a/packages/cli/src/setup-models.test.ts b/packages/cli/src/setup-models.test.ts index 82f82875..fb8acb47 100644 --- a/packages/cli/src/setup-models.test.ts +++ b/packages/cli/src/setup-models.test.ts @@ -1,7 +1,7 @@ import { mkdir, mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { initKtxProject, parseKtxProjectConfig, readKtxSetupState } from '@ktx/context/project'; +import { initKtxProject, parseKtxProjectConfig, readKtxSetupState, writeKtxSetupState } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { BUNDLED_ANTHROPIC_MODELS, @@ -160,7 +160,7 @@ describe('setup Anthropic model step', () => { promptCaching: { enabled: true }, }); expect(config.scan.enrichment.mode).toBe('llm'); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('llm'); expect(io.stdout()).toContain('LLM ready: yes'); expect(io.stdout()).not.toContain('sk-ant-test'); @@ -199,7 +199,7 @@ describe('setup Anthropic model step', () => { }, models: { default: 'claude-sonnet-4-6' }, }); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('llm'); expect(io.stdout()).not.toContain('sk-ant-file'); }); @@ -516,8 +516,7 @@ describe('setup Anthropic model step', () => { ); expect(result.status).toBe('failed'); - const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps ?? []).not.toContain('llm'); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(io.stderr()).toContain('Anthropic model health check failed: 401 invalid x-api-key [redacted]'); expect(io.stderr()).not.toContain('sk-ant-test'); }); @@ -553,7 +552,7 @@ describe('setup Anthropic model step', () => { expect(io.stderr()).toContain('Choose a different credential source or model, or Back.'); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); expect(config.llm.models.default).toBe('claude-sonnet-4-6'); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(tempDir)).completed_steps).toContain('llm'); expect(io.stderr()).not.toContain('sk-ant-test'); }); @@ -565,8 +564,7 @@ describe('setup Anthropic model step', () => { ); expect(result.status).toBe('skipped'); - const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps ?? []).not.toContain('llm'); + expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); }); it('returns back without writing config when Back is selected', async () => { @@ -650,9 +648,6 @@ describe('setup Anthropic model step', () => { 'project: warehouse', 'setup:', ' database_connection_ids: []', - ' completed_steps:', - ' - project', - ' - llm', 'connections: {}', 'llm:', ' provider:', @@ -669,6 +664,7 @@ describe('setup Anthropic model step', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'llm'] }); const healthCheck = vi.fn(async () => ({ ok: true as const })); await expect( @@ -698,9 +694,6 @@ describe('setup Anthropic model step', () => { 'project: warehouse', 'setup:', ' database_connection_ids: []', - ' completed_steps:', - ' - project', - ' - llm', 'connections: {}', 'llm:', ' provider:', @@ -715,6 +708,7 @@ describe('setup Anthropic model step', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'llm'] }); const healthCheck = vi.fn(async () => ({ ok: true as const })); const io = makeIo(); diff --git a/packages/cli/src/setup-models.ts b/packages/cli/src/setup-models.ts index 6d3c6757..221dbd14 100644 --- a/packages/cli/src/setup-models.ts +++ b/packages/cli/src/setup-models.ts @@ -8,7 +8,6 @@ import { loadKtxProject, markKtxSetupStateStepComplete, serializeKtxProjectConfig, - stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import { type KtxLlmConfig, type KtxLlmHealthCheckResult, runKtxLlmHealthCheck } from '@ktx/llm'; import type { KtxCliIo } from './cli-runtime.js'; @@ -362,19 +361,17 @@ async function chooseModel( async function persistLlmConfig(projectDir: string, credentialRef: string, model: string): Promise { const project = await loadKtxProject({ projectDir }); - const config = stripKtxSetupCompletedSteps( - { - ...project.config, - llm: buildProjectLlmConfig(project.config.llm, credentialRef, model), - scan: { - ...project.config.scan, - enrichment: { + const config = { + ...project.config, + llm: buildProjectLlmConfig(project.config.llm, credentialRef, model), + scan: { + ...project.config.scan, + enrichment: { ...project.config.scan.enrichment, - mode: 'llm', + mode: 'llm' as const, }, }, - }, - ); + }; await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'llm'); } diff --git a/packages/cli/src/setup-project.test.ts b/packages/cli/src/setup-project.test.ts index 9c01402c..70591077 100644 --- a/packages/cli/src/setup-project.test.ts +++ b/packages/cli/src/setup-project.test.ts @@ -59,8 +59,7 @@ describe('setup project step', () => { expect(result.status).toBe('ready'); expect(result.projectDir).toBe(projectDir); - const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(await readKtxSetupState(projectDir)).toEqual({ completed_steps: ['project'] }); await expect(stat(join(projectDir, '.git'))).resolves.toBeDefined(); await expect(readFile(join(projectDir, '.ktx/.gitignore'), 'utf-8')).resolves.toContain('secrets/'); @@ -68,7 +67,7 @@ describe('setup project step', () => { expect(testIo.stderr()).toBe(''); }); - it('loads an existing project with --existing and preserves existing setup metadata', async () => { + it('loads an existing project with --existing and drops config setup progress', async () => { const projectDir = join(tempDir, 'warehouse'); await initKtxProject({ projectDir, projectName: 'warehouse' }); await writeFile( @@ -94,9 +93,9 @@ describe('setup project step', () => { const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); expect(config.setup).toEqual({ database_connection_ids: ['warehouse'], - completed_steps: [], }); - expect(await readKtxSetupState(projectDir)).toEqual({ completed_steps: ['llm', 'project'] }); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); + expect(await readKtxSetupState(projectDir)).toEqual({ completed_steps: ['project'] }); }); it('creates a missing auto-mode project only when --yes is present in no-input mode', async () => { @@ -152,8 +151,7 @@ describe('setup project step', () => { }), ); expect(prompts.text).not.toHaveBeenCalled(); - const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); - expect(config.setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(await readKtxSetupState(projectDir)).toEqual({ completed_steps: ['project'] }); }); diff --git a/packages/cli/src/setup-project.ts b/packages/cli/src/setup-project.ts index 18512b03..4b2f71d9 100644 --- a/packages/cli/src/setup-project.ts +++ b/packages/cli/src/setup-project.ts @@ -5,15 +5,11 @@ import { basename, join, resolve } from 'node:path'; import { cancel, isCancel, select, text } from '@clack/prompts'; import { initKtxProject, - ktxSetupCompletedSteps, type KtxLocalProject, loadKtxProject, markKtxSetupStateStepComplete, mergeKtxSetupGitignoreEntries, - readKtxSetupState, serializeKtxProjectConfig, - stripKtxSetupCompletedSteps, - writeKtxSetupState, } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { withMenuOptionsSpacing, withTextInputNavigation } from './prompt-navigation.js'; @@ -170,10 +166,7 @@ async function normalizeSetupGitignore(projectDir: string): Promise { } async function persistProjectStep(project: KtxLocalProject): Promise { - const completedSteps = ktxSetupCompletedSteps(project.config, await readKtxSetupState(project.projectDir)); - const config = stripKtxSetupCompletedSteps(project.config); - await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); - await writeKtxSetupState(project.projectDir, { completed_steps: completedSteps }); + await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8'); await markKtxSetupStateStepComplete(project.projectDir, 'project'); await normalizeSetupGitignore(project.projectDir); return await loadKtxProject({ projectDir: project.projectDir }); diff --git a/packages/cli/src/setup-sources.test.ts b/packages/cli/src/setup-sources.test.ts index 76ba5d0f..27579bb3 100644 --- a/packages/cli/src/setup-sources.test.ts +++ b/packages/cli/src/setup-sources.test.ts @@ -102,7 +102,6 @@ describe('setup sources step', () => { }, setup: { ...config.setup, - completed_steps: config.setup?.completed_steps ?? [], database_connection_ids: ['warehouse'], }, }), @@ -137,7 +136,7 @@ describe('setup sources step', () => { projectDir, }); - expect((await readConfig()).setup?.completed_steps).toEqual(undefined); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(projectDir)).completed_steps).toContain('sources'); expect(io.stdout()).toContain('Context source setup skipped.'); }); @@ -171,7 +170,7 @@ describe('setup sources step', () => { source_dir: '/repo/dbt', project_name: 'analytics', }); - expect(config.setup?.completed_steps).toEqual([]); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect((await readKtxSetupState(projectDir)).completed_steps).toContain('sources'); expect(runInitialIngest).toHaveBeenCalledWith(projectDir, 'analytics_dbt', io.io, { inputMode: 'disabled' }); }); @@ -190,7 +189,7 @@ describe('setup sources step', () => { source: 'metabase', sourceConnectionId: 'prod_metabase', sourceUrl: 'https://metabase.example.com', - sourceApiKeyRef: 'env:METABASE_API_KEY', + sourceApiKeyRef: 'env:METABASE_API_KEY', // pragma: allowlist secret sourceWarehouseConnectionId: 'warehouse', metabaseDatabaseId: 1, runInitialSourceIngest: false, @@ -204,7 +203,7 @@ describe('setup sources step', () => { expect((await readConfig()).connections.prod_metabase).toMatchObject({ driver: 'metabase', api_url: 'https://metabase.example.com', - api_key_ref: 'env:METABASE_API_KEY', + api_key_ref: 'env:METABASE_API_KEY', // pragma: allowlist secret mappings: { databaseMappings: { '1': 'warehouse' }, syncEnabled: { '1': true }, @@ -225,7 +224,7 @@ describe('setup sources step', () => { inputMode: 'disabled', source: 'notion', sourceConnectionId: 'notion-main', - sourceApiKeyRef: 'env:NOTION_TOKEN', + sourceApiKeyRef: 'env:NOTION_TOKEN', // pragma: allowlist secret notionCrawlMode: 'selected_roots', notionRootPageIds: ['page-1'], runInitialSourceIngest: false, @@ -256,7 +255,7 @@ describe('setup sources step', () => { inputMode: 'disabled', source: 'notion', sourceConnectionId: 'notion-main', - sourceApiKeyRef: 'env:NOTION_TOKEN', + sourceApiKeyRef: 'env:NOTION_TOKEN', // pragma: allowlist secret notionCrawlMode: 'all_accessible', notionRootPageIds: ['page-1'], runInitialSourceIngest: false, @@ -480,7 +479,7 @@ describe('setup sources step', () => { ), ).resolves.toEqual({ status: 'failed', projectDir }); - expect((await readConfig()).setup?.completed_steps ?? []).not.toContain('sources'); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); expect(io.stderr()).toContain('No LookML files found'); }); @@ -766,7 +765,7 @@ describe('setup sources step', () => { connection: { driver: 'metabase', api_url: 'https://metabase.example.com', - api_key_ref: 'env:METABASE_API_KEY', + api_key_ref: 'env:METABASE_API_KEY', // pragma: allowlist secret mappings: { databaseMappings: { '1': 'warehouse' }, syncEnabled: { '1': true }, @@ -786,7 +785,7 @@ describe('setup sources step', () => { driver: 'looker', base_url: 'https://looker.example.com', client_id: 'client-id', - client_secret_ref: 'env:LOOKER_CLIENT_SECRET', + client_secret_ref: 'env:LOOKER_CLIENT_SECRET', // pragma: allowlist secret mappings: { connectionMappings: { warehouse: 'warehouse' } }, }, deps: { @@ -1032,7 +1031,7 @@ describe('setup sources step', () => { expect(testPrompts.multiselect).not.toHaveBeenCalled(); expect(io.stdout()).toContain('Connect a primary source before adding context sources.'); - expect((await readConfig()).setup?.completed_steps ?? []).not.toContain('sources'); + expect(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); }); it('auto-detects dbt_project.yml at the root of a local path', async () => { diff --git a/packages/cli/src/setup-sources.ts b/packages/cli/src/setup-sources.ts index 6674ef75..edf83b7b 100644 --- a/packages/cli/src/setup-sources.ts +++ b/packages/cli/src/setup-sources.ts @@ -25,7 +25,6 @@ import { loadKtxProject, markKtxSetupStateStepComplete, serializeKtxProjectConfig, - stripKtxSetupCompletedSteps, } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { runKtxConnectionMapping } from './commands/connection-mapping.js'; @@ -345,7 +344,7 @@ function fileRepoUrl(sourceDir: string): string { async function writeProjectConfig(projectDir: string, config: KtxProjectConfig): Promise { const project = await loadKtxProject({ projectDir }); - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); } async function writeSourceConnection( @@ -372,7 +371,7 @@ async function writeSourceConnection( : [...project.config.ingest.adapters, adapter], }, }; - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); return async () => { const latest = await loadKtxProject({ projectDir }); const connections = { ...latest.config.connections }; @@ -411,7 +410,7 @@ async function ensureSourceAdapterEnabled(projectDir: string, source: KtxSetupSo async function markSourcesComplete(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); - await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(project.config)), 'utf-8'); + await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'sources'); } diff --git a/packages/cli/src/setup.test.ts b/packages/cli/src/setup.test.ts index bf9c381f..0cad3ebc 100644 --- a/packages/cli/src/setup.test.ts +++ b/packages/cli/src/setup.test.ts @@ -3,6 +3,7 @@ import { mkdir, mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { promisify } from 'node:util'; +import { writeKtxSetupState } from '@ktx/context/project'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { localFakeBundleReport, persistLocalBundleReport } from './ingest.test-utils.js'; @@ -133,9 +134,6 @@ describe('setup status', () => { ' database_connection_ids:', ' - warehouse', ' - analytics', - ' completed_steps:', - ' - project', - ' - databases', 'connections:', ' warehouse:', ' driver: postgres', @@ -150,6 +148,7 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases'] }); await expect(readKtxSetupStatus(tempDir)).resolves.toMatchObject({ databases: [ @@ -167,8 +166,6 @@ describe('setup status', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - project', 'connections:', ' warehouse:', ' driver: postgres', @@ -178,6 +175,7 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project'] }); await expect(readKtxSetupStatus(tempDir)).resolves.toMatchObject({ databases: [{ connectionId: 'warehouse', ready: false }], @@ -190,9 +188,6 @@ describe('setup status', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - project', - ' - databases', 'connections:', ' warehouse:', ' driver: postgres', @@ -202,6 +197,7 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases'] }); await expect(readKtxSetupStatus(tempDir)).resolves.toMatchObject({ databases: [{ connectionId: 'warehouse', ready: true }], @@ -215,9 +211,6 @@ describe('setup status', () => { 'project: revenue', 'setup:', ' database_connection_ids: []', - ' completed_steps:', - ' - project', - ' - sources', 'connections:', ' docs:', ' driver: notion', @@ -230,6 +223,7 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'sources'] }); await expect(readKtxSetupStatus(tempDir)).resolves.toMatchObject({ sources: [{ connectionId: 'docs', type: 'notion', ready: true }], @@ -268,12 +262,6 @@ describe('setup status', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - project', - ' - llm', - ' - embeddings', - ' - databases', - ' - sources', 'connections:', ' warehouse:', ' driver: postgres', @@ -292,6 +280,9 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { + completed_steps: ['project', 'llm', 'embeddings', 'databases', 'sources'], + }); await writeKtxSetupContextState(tempDir, { runId: 'setup-context-local-abc123', status: 'running', @@ -324,10 +315,6 @@ describe('setup status', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - project', - ' - databases', - ' - sources', 'connections:', ' warehouse:', ' driver: postgres', @@ -354,6 +341,7 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases', 'sources'] }); await persistLocalBundleReport( tempDir, localFakeBundleReport('metabase-job-1', { @@ -1281,9 +1269,6 @@ describe('setup status', () => { 'setup:', ' database_connection_ids:', ' - warehouse', - ' completed_steps:', - ' - project', - ' - databases', 'connections:', ' warehouse:', ' driver: postgres', @@ -1296,6 +1281,7 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases'] }); await expect( runKtxSetup( @@ -1782,13 +1768,6 @@ describe('setup status', () => { [ 'project: revenue', 'setup:', - ' completed_steps:', - ' - project', - ' - llm', - ' - embeddings', - ' - sources', - ' - context', - ' - agents', ' database_connection_ids: []', 'connections: {}', 'llm:', @@ -1805,6 +1784,9 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { + completed_steps: ['project', 'llm', 'embeddings', 'sources', 'context', 'agents'], + }); await writeFile( join(tempDir, '.ktx/agents/install-manifest.json'), JSON.stringify( @@ -1893,12 +1875,6 @@ describe('setup status', () => { [ 'project: revenue', 'setup:', - ' completed_steps:', - ' - project', - ' - llm', - ' - embeddings', - ' - sources', - ' - context', ' database_connection_ids: []', 'connections: {}', 'llm:', @@ -1915,6 +1891,9 @@ describe('setup status', () => { ].join('\n'), 'utf-8', ); + await writeKtxSetupState(tempDir, { + completed_steps: ['project', 'llm', 'embeddings', 'sources', 'context'], + }); await writeKtxSetupContextState(tempDir, { runId: 'setup-context-local-ready', status: 'completed', diff --git a/packages/cli/src/setup.ts b/packages/cli/src/setup.ts index dec0f4d7..064da729 100644 --- a/packages/cli/src/setup.ts +++ b/packages/cli/src/setup.ts @@ -4,7 +4,6 @@ import { cancel, isCancel, select } from '@clack/prompts'; import { getLatestLocalIngestStatus, savedMemoryCountsForReport } from '@ktx/context/ingest'; import { ktxLocalStateDbPath, - ktxSetupCompletedSteps, loadKtxProject, readKtxSetupState, type KtxLocalProject, @@ -297,7 +296,7 @@ export async function readKtxSetupStatus(projectDir: string): Promise { ).rejects.toThrow(/mapping.*does not point to connection/); }); - it('throws when the matching mapping has a null metabaseDatabaseName (unhydrated)', async () => { + it('hydrates missing mapping metadata from Metabase instead of requiring a prior refresh', async () => { sourceStateReader.getSourceState.mockResolvedValue({ syncMode: 'ALL', selections: [], @@ -268,15 +274,22 @@ describe('fetchMetabaseBundle', () => { ], defaultTagNames: [], }); - await expect( - fetchMetabaseBundle({ - pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, - stagedDir, - ctx: makeFetchContext(), - clientFactory, - sourceStateReader, - }), - ).rejects.toThrow(/unhydrated.*ktx connection mapping refresh/); + await fetchMetabaseBundle({ + pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, + stagedDir, + ctx: makeFetchContext(), + clientFactory, + sourceStateReader, + }); + + expect(clientFactory.__client.getDatabase).toHaveBeenCalledWith(42); + const databaseFile = JSON.parse(await readFile(join(stagedDir, 'databases/42.json'), 'utf-8')); + expect(databaseFile).toMatchObject({ + metabaseDatabaseId: 42, + metabaseDatabaseName: 'Analytics', + metabaseEngine: 'postgres', + targetConnectionId, + }); }); it('skips cards whose getResolvedSql returns null and records them in unresolved-cards.json', async () => { diff --git a/packages/context/src/ingest/adapters/metabase/fetch.ts b/packages/context/src/ingest/adapters/metabase/fetch.ts index 9ccb2be6..d4e8b59b 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.ts @@ -97,15 +97,16 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr `mapping for database ${pullConfig.metabaseDatabaseId} does not point to connection ${params.ctx.connectionId} (points to ${mapping.targetConnectionId})`, ); } - if (mapping.metabaseDatabaseName === null) { - throw new IngestInputError( - `mapping for database ${pullConfig.metabaseDatabaseId} on Metabase connection ${pullConfig.metabaseConnectionId} is unhydrated; run \`ktx connection mapping refresh ${pullConfig.metabaseConnectionId}\` to populate metabaseDatabaseName before ingest.`, - ); - } - const mappingDatabaseName: string = mapping.metabaseDatabaseName; const client = await params.clientFactory.createClient(pullConfig, params.ctx); try { + let mappingDatabaseName = mapping.metabaseDatabaseName; + let mappingEngine = mapping.metabaseEngine; + if (mappingDatabaseName === null) { + const database = await client.getDatabase(pullConfig.metabaseDatabaseId); + mappingDatabaseName = database.name; + mappingEngine = database.engine ?? null; + } const stagedForScope: StagedSyncConfig = { metabaseConnectionId: pullConfig.metabaseConnectionId, metabaseDatabaseId: pullConfig.metabaseDatabaseId, @@ -118,7 +119,7 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr mapping: { metabaseDatabaseId: mapping.metabaseDatabaseId, metabaseDatabaseName: mappingDatabaseName, - metabaseEngine: mapping.metabaseEngine, + metabaseEngine: mappingEngine, targetConnectionId: mapping.targetConnectionId, }, }; @@ -233,7 +234,7 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr const databaseFile: StagedDatabaseFile = { metabaseDatabaseId: mapping.metabaseDatabaseId, metabaseDatabaseName: mappingDatabaseName, - metabaseEngine: mapping.metabaseEngine, + metabaseEngine: mappingEngine, targetConnectionId: mapping.targetConnectionId, }; await writeFile( diff --git a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts index ec5e163e..a7ffc5de 100644 --- a/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts +++ b/packages/context/src/ingest/adapters/metabase/local-metabase.adapter.ts @@ -12,7 +12,7 @@ import { type MetabaseClientRuntimeConfig, } from './client-port.js'; import type { MetabaseFetchLogger } from './fetch.js'; -import { LocalMetabaseSourceStateReader } from './local-source-state-store.js'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from './local-source-state-store.js'; import { MetabaseSourceAdapter } from './metabase.adapter.js'; function stringField(value: unknown): string | null { @@ -62,7 +62,8 @@ export function createLocalMetabaseSourceAdapter( project: KtxLocalProject, options: CreateLocalMetabaseSourceAdapterOptions = {}, ): MetabaseSourceAdapter { - const sourceStateReader = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(project) }); + const sourceStateReader = new KtxYamlMetabaseSourceStateReader(project, { discoveryCache }); const connectionFactory = new DefaultMetabaseConnectionClientFactory( (metabaseConnectionId) => metabaseRuntimeConfigFromLocalConnection( diff --git a/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts b/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts index f5aef74c..1139ea4d 100644 --- a/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts +++ b/packages/context/src/ingest/adapters/metabase/local-source-state-store.test.ts @@ -2,313 +2,112 @@ import { mkdtemp, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { LocalMetabaseSourceStateReader } from './local-source-state-store.js'; +import { buildDefaultKtxProjectConfig } from '../../../project/index.js'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from './local-source-state-store.js'; -describe('LocalMetabaseSourceStateReader', () => { +describe('Metabase YAML source state and discovery cache', () => { let tempDir: string; - let store: LocalMetabaseSourceStateReader; + let discoveryCache: LocalMetabaseDiscoveryCache; beforeEach(async () => { - tempDir = await mkdtemp(join(tmpdir(), 'ktx-metabase-local-state-')); - store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); + tempDir = await mkdtemp(join(tmpdir(), 'ktx-metabase-cache-')); + discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); }); afterEach(async () => { await rm(tempDir, { recursive: true, force: true }); }); - it('round-trips hydrated source state through SQLite', async () => { - await store.replaceSourceState({ - connectionId: 'prod-metabase', + function projectWithMetabaseMappings(mappings: Record) { + return { + config: { + ...buildDefaultKtxProjectConfig('metabase-cache-test'), + connections: { + 'prod-metabase': { + driver: 'metabase', + mappings, + }, + }, + }, + }; + } + + it('reads Metabase mapping intent from ktx.yaml config', async () => { + const reader = new KtxYamlMetabaseSourceStateReader( + projectWithMetabaseMappings({ + databaseMappings: { '2': 'warehouse' }, + syncEnabled: { '2': true }, + syncMode: 'ONLY', + selections: { collections: [12], items: [99] }, + defaultTagNames: ['analytics'], + }), + { discoveryCache }, + ); + + await expect(reader.getSourceState('prod-metabase')).resolves.toEqual({ syncMode: 'ONLY', - defaultTagNames: ['analytics', 'curated'], + defaultTagNames: ['analytics'], selections: [ - { selectionType: 'collection', metabaseObjectId: 10 }, + { selectionType: 'collection', metabaseObjectId: 12 }, { selectionType: 'item', metabaseObjectId: 99 }, ], mappings: [ { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - metabaseHost: 'warehouse.internal', - metabaseDbName: 'analytics', - targetConnectionId: 'warehouse', - syncEnabled: true, - source: 'cli', - }, - ], - }); - - await expect(store.getSourceState('prod-metabase')).resolves.toEqual({ - syncMode: 'ONLY', - defaultTagNames: ['analytics', 'curated'], - selections: [ - { selectionType: 'collection', metabaseObjectId: 10 }, - { selectionType: 'item', metabaseObjectId: 99 }, - ], - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - targetConnectionId: 'warehouse', - syncEnabled: true, - }, - ], - }); - }); - - it('excludes unhydrated mappings from getSourceState and exposes them through the side accessor', async () => { - await store.replaceSourceState({ - connectionId: 'prod-metabase', - syncMode: 'ALL', - defaultTagNames: [], - selections: [], - mappings: [ - { - metabaseDatabaseId: 1, + metabaseDatabaseId: 2, metabaseDatabaseName: null, metabaseEngine: null, metabaseHost: null, metabaseDbName: null, targetConnectionId: 'warehouse', syncEnabled: true, - source: 'ktx.yaml', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'warehouse.internal', - metabaseDbName: 'sandbox', - targetConnectionId: 'warehouse', - syncEnabled: true, - source: 'refresh', }, ], }); - - const state = await store.getSourceState('prod-metabase'); - expect(state.mappings.map((mapping) => mapping.metabaseDatabaseId)).toEqual([2]); - await expect(store.getUnhydratedSyncEnabledMappingIds('prod-metabase')).resolves.toEqual([1]); }); - it('defaults missing sync config to ALL with no tags or selections', async () => { - await store.replaceSourceState({ + it('enriches YAML mapping rows with recreatable discovery metadata', async () => { + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 3, - metabaseDatabaseName: 'Warehouse', - metabaseEngine: 'postgres', - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: null, - syncEnabled: false, - source: 'refresh', - }, - ], + discovered: [{ id: 2, name: 'Analytics', engine: 'postgres', host: 'pg.internal', dbName: 'analytics' }], }); + const reader = new KtxYamlMetabaseSourceStateReader( + projectWithMetabaseMappings({ + databaseMappings: { '2': 'warehouse' }, + syncEnabled: { '2': true }, + }), + { discoveryCache }, + ); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ALL', - defaultTagNames: [], - selections: [], - }); - }); - - it('supports command-sized mapping writes and reads', async () => { - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 1, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'cli', - }); - await store.setSyncState({ - connectionId: 'prod-metabase', - syncMode: 'ONLY', - defaultTagNames: ['analytics'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toEqual([ + await expect(reader.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ { - metabaseDatabaseId: 1, - metabaseDatabaseName: null, - metabaseEngine: null, - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'cli', - }, - ]); - await expect(store.getUnhydratedSyncEnabledMappingIds('prod-metabase')).resolves.toEqual([1]); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ONLY', - defaultTagNames: ['analytics'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - mappings: [], - }); - }); - - it('refreshes discovered database metadata while preserving user mapping intent', async () => { - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 1, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'cli', - }); - - await store.refreshDiscoveredDatabases({ - connectionId: 'prod-metabase', - discovered: [ - { id: 1, name: 'Analytics', engine: 'postgres', host: 'pg.internal', dbName: 'analytics' }, - { id: 2, name: 'Sandbox', engine: 'postgres', host: 'pg.internal', dbName: 'sandbox' }, - ], - }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toEqual([ - { - metabaseDatabaseId: 1, + metabaseDatabaseId: 2, metabaseDatabaseName: 'Analytics', metabaseEngine: 'postgres', metabaseHost: 'pg.internal', metabaseDbName: 'analytics', - targetConnectionId: 'prod-warehouse', + targetConnectionId: 'warehouse', syncEnabled: true, - source: 'cli', + source: 'ktx.yaml', }, + ]); + }); + + it('lists discovered-only rows as refresh cache data without turning them into config state', async () => { + await discoveryCache.refreshDiscoveredDatabases({ + connectionId: 'prod-metabase', + discovered: [{ id: 7, name: 'Unmapped', engine: 'mysql', host: 'mysql.internal', dbName: 'sales' }], + }); + const reader = new KtxYamlMetabaseSourceStateReader(projectWithMetabaseMappings({}), { discoveryCache }); + + await expect(reader.getSourceState('prod-metabase')).resolves.toMatchObject({ mappings: [] }); + await expect(reader.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'sandbox', + metabaseDatabaseId: 7, + metabaseDatabaseName: 'Unmapped', targetConnectionId: null, syncEnabled: false, source: 'refresh', }, ]); }); - - it('updates sync-enabled, clears scoped rows, and applies bulk state in one call', async () => { - await store.replaceSourceState({ - connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'analytics', - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'refresh', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'sandbox', - targetConnectionId: 'staging-warehouse', - syncEnabled: true, - source: 'refresh', - }, - ], - }); - - await store.setMappingSyncEnabled({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 2, - syncEnabled: false, - }); - await store.clearDatabaseMappings({ connectionId: 'prod-metabase', metabaseDatabaseId: 1 }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toEqual([ - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Sandbox', - metabaseEngine: 'postgres', - metabaseHost: 'pg.internal', - metabaseDbName: 'sandbox', - targetConnectionId: 'staging-warehouse', - syncEnabled: false, - source: 'refresh', - }, - ]); - }); - - it('seeds unhydrated yaml intent without exposing it through getSourceState', async () => { - await store.applyYamlBootstrap({ - connectionId: 'prod-metabase', - syncMode: 'ALL', - defaultTagNames: ['ktx'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - mappings: [{ metabaseDatabaseId: 1, targetConnectionId: 'prod-warehouse', syncEnabled: true }], - }); - - await expect(store.getUnhydratedSyncEnabledMappingIds('prod-metabase')).resolves.toEqual([1]); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ALL', - defaultTagNames: ['ktx'], - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - mappings: [], - }); - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: null, - targetConnectionId: 'prod-warehouse', - syncEnabled: true, - source: 'ktx.yaml', - }, - ]); - }); - - it('applies yaml target intent onto refresh metadata but does not overwrite cli rows', async () => { - await store.refreshDiscoveredDatabases({ - connectionId: 'prod-metabase', - discovered: [{ id: 1, name: 'Analytics', engine: 'postgres', host: 'db.test', dbName: 'analytics' }], - }); - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 2, - targetConnectionId: 'cli-warehouse', - syncEnabled: true, - source: 'cli', - }); - - await store.applyYamlBootstrap({ - connectionId: 'prod-metabase', - syncMode: 'EXCEPT', - defaultTagNames: [], - selections: [{ selectionType: 'item', metabaseObjectId: 99 }], - mappings: [ - { metabaseDatabaseId: 1, targetConnectionId: 'yaml-warehouse', syncEnabled: true }, - { metabaseDatabaseId: 2, targetConnectionId: 'yaml-warehouse', syncEnabled: false }, - ], - }); - - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Analytics', - metabaseEngine: 'postgres', - targetConnectionId: 'yaml-warehouse', - syncEnabled: true, - source: 'ktx.yaml', - }, - { - metabaseDatabaseId: 2, - targetConnectionId: 'cli-warehouse', - syncEnabled: true, - source: 'cli', - }, - ]); - }); }); diff --git a/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts b/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts index 246d5f33..f8026db6 100644 --- a/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts +++ b/packages/context/src/ingest/adapters/metabase/local-source-state-store.ts @@ -1,17 +1,31 @@ import { mkdirSync } from 'node:fs'; import { dirname } from 'node:path'; import Database from 'better-sqlite3'; +import { + parseMetabaseMappingBootstrap, + type KtxLocalProject, + type MetabaseMappingBootstrap, +} from '../../../project/index.js'; +import type { DiscoveredMetabaseDatabase } from './mapping.js'; import type { MetabaseSourceState, MetabaseSourceStateReader, MetabaseSourceStateSelection } from './source-state-port.js'; -import type { MetabaseSyncMode } from './types.js'; -export type LocalMetabaseMappingSource = 'ktx.yaml' | 'cli' | 'refresh'; +export type LocalMetabaseMappingSource = 'ktx.yaml' | 'refresh'; -interface LocalMetabaseSourceStateStoreOptions { +interface LocalMetabaseDiscoveryCacheOptions { dbPath: string; now?: () => Date; } -export interface LocalMetabaseSourceStateMappingInput { +export interface RefreshLocalMetabaseDiscoveredDatabasesInput { + connectionId: string; + discovered: DiscoveredMetabaseDatabase[]; +} + +export interface LocalMetabaseDiscoveredDatabaseRow extends DiscoveredMetabaseDatabase { + updatedAt: string; +} + +export interface LocalMetabaseMappingListRow { metabaseDatabaseId: number; metabaseDatabaseName: string | null; metabaseEngine: string | null; @@ -22,443 +36,86 @@ export interface LocalMetabaseSourceStateMappingInput { source: LocalMetabaseMappingSource; } -export interface ReplaceLocalMetabaseSourceStateInput { - connectionId: string; - syncMode?: MetabaseSyncMode; - defaultTagNames?: string[]; - selections?: MetabaseSourceStateSelection[]; - mappings: LocalMetabaseSourceStateMappingInput[]; -} - -interface ApplyLocalMetabaseYamlBootstrapInput { - connectionId: string; - syncMode: MetabaseSyncMode; - defaultTagNames: string[]; - selections: MetabaseSourceStateSelection[]; - mappings: Array<{ - metabaseDatabaseId: number; - targetConnectionId: string | null; - syncEnabled: boolean; - }>; -} - -export interface LocalMetabaseMappingListRow extends LocalMetabaseSourceStateMappingInput {} - -export interface UpsertLocalMetabaseDatabaseMappingInput { - connectionId: string; - metabaseDatabaseId: number; - targetConnectionId: string | null; - syncEnabled: boolean; - source: LocalMetabaseMappingSource; -} - -export interface SetLocalMetabaseMappingSyncEnabledInput { - connectionId: string; - metabaseDatabaseId: number; - syncEnabled: boolean; -} - -export interface SetLocalMetabaseSyncStateInput { - connectionId: string; - syncMode: MetabaseSyncMode; - defaultTagNames: string[]; - selections: MetabaseSourceStateSelection[]; -} - -export interface RefreshLocalMetabaseDiscoveredDatabasesInput { - connectionId: string; - discovered: Array<{ - id: number; - name: string; - engine: string; - host: string | null; - dbName: string | null; - }>; -} - -export interface ClearLocalMetabaseMappingsInput { - connectionId: string; - metabaseDatabaseId?: number; -} - -interface SelectionRow { - selection_type: 'collection' | 'item'; - metabase_object_id: number; -} - -interface MappingRow { +interface DiscoveryRow { metabase_database_id: number; - metabase_database_name: string | null; - metabase_engine: string | null; - target_connection_id: string | null; - sync_enabled: number; + metabase_database_name: string; + metabase_engine: string; + metabase_host: string | null; + metabase_db_name: string | null; + updated_at: string; } -interface SyncConfigRow { - sync_mode: MetabaseSyncMode; - default_tag_names_json: string; +function selectionState(bootstrap: MetabaseMappingBootstrap): MetabaseSourceStateSelection[] { + return [ + ...bootstrap.selections.collections.map((id) => ({ selectionType: 'collection' as const, metabaseObjectId: id })), + ...bootstrap.selections.items.map((id) => ({ selectionType: 'item' as const, metabaseObjectId: id })), + ]; } -function parseDefaultTagNames(raw: string): string[] { - const parsed = JSON.parse(raw); - return Array.isArray(parsed) ? parsed.filter((value): value is string => typeof value === 'string') : []; +function configuredMappingIds(bootstrap: MetabaseMappingBootstrap): number[] { + return [...new Set([...Object.keys(bootstrap.databaseMappings), ...Object.keys(bootstrap.syncEnabled)].map(Number))].sort( + (left, right) => left - right, + ); } -export class LocalMetabaseSourceStateReader implements MetabaseSourceStateReader { +function discoveredRowToDatabase(row: DiscoveryRow): LocalMetabaseDiscoveredDatabaseRow { + return { + id: row.metabase_database_id, + name: row.metabase_database_name, + engine: row.metabase_engine, + host: row.metabase_host, + dbName: row.metabase_db_name, + updatedAt: row.updated_at, + }; +} + +function emptyMetabaseSourceState(): MetabaseSourceState { + return { + syncMode: 'ALL', + selections: [], + defaultTagNames: [], + mappings: [], + }; +} + +export class LocalMetabaseDiscoveryCache { private readonly db: Database.Database; private readonly now: () => Date; - constructor(options: LocalMetabaseSourceStateStoreOptions) { + constructor(options: LocalMetabaseDiscoveryCacheOptions) { mkdirSync(dirname(options.dbPath), { recursive: true }); this.db = new Database(options.dbPath); this.db.pragma('journal_mode = WAL'); this.db.pragma('foreign_keys = ON'); this.now = options.now ?? (() => new Date()); this.db.exec(` - CREATE TABLE IF NOT EXISTS local_metabase_sync_config ( - metabase_connection_id TEXT PRIMARY KEY, - sync_mode TEXT NOT NULL, - default_tag_names_json TEXT NOT NULL, - updated_at TEXT NOT NULL - ); - - CREATE TABLE IF NOT EXISTS local_metabase_selections ( - metabase_connection_id TEXT NOT NULL, - selection_type TEXT NOT NULL, - metabase_object_id INTEGER NOT NULL, - PRIMARY KEY (metabase_connection_id, selection_type, metabase_object_id) - ); - - CREATE TABLE IF NOT EXISTS local_metabase_database_mappings ( + CREATE TABLE IF NOT EXISTS local_metabase_discovered_databases ( metabase_connection_id TEXT NOT NULL, metabase_database_id INTEGER NOT NULL, - metabase_database_name TEXT, - metabase_engine TEXT, + metabase_database_name TEXT NOT NULL, + metabase_engine TEXT NOT NULL, metabase_host TEXT, metabase_db_name TEXT, - target_connection_id TEXT, - sync_enabled INTEGER NOT NULL DEFAULT 0, - source TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (metabase_connection_id, metabase_database_id) ); `); } - async applyYamlBootstrap(input: ApplyLocalMetabaseYamlBootstrapInput): Promise { - const timestamp = this.now().toISOString(); - const apply = this.db.transaction(() => { - const syncConfigExists = this.db - .prepare('SELECT 1 FROM local_metabase_sync_config WHERE metabase_connection_id = ?') - .get(input.connectionId); - if (!syncConfigExists) { - this.db - .prepare( - ` - INSERT INTO local_metabase_sync_config ( - metabase_connection_id, - sync_mode, - default_tag_names_json, - updated_at - ) - VALUES (?, ?, ?, ?) - `, - ) - .run(input.connectionId, input.syncMode, JSON.stringify(input.defaultTagNames), timestamp); - - const insertSelection = this.db.prepare(` - INSERT INTO local_metabase_selections ( - metabase_connection_id, - selection_type, - metabase_object_id - ) - VALUES (?, ?, ?) - `); - for (const selection of input.selections) { - insertSelection.run(input.connectionId, selection.selectionType, selection.metabaseObjectId); - } - } - - const existing = this.db.prepare(` - SELECT target_connection_id, source - FROM local_metabase_database_mappings - WHERE metabase_connection_id = ? AND metabase_database_id = ? - `); - const insert = this.db.prepare(` - INSERT INTO local_metabase_database_mappings ( - metabase_connection_id, - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source, - updated_at - ) - VALUES (?, ?, NULL, NULL, NULL, NULL, ?, ?, 'ktx.yaml', ?) - `); - const updateRefreshRow = this.db.prepare(` - UPDATE local_metabase_database_mappings - SET target_connection_id = ?, - sync_enabled = ?, - source = 'ktx.yaml', - updated_at = ? - WHERE metabase_connection_id = ? - AND metabase_database_id = ? - AND source = 'refresh' - AND target_connection_id IS NULL - `); - - for (const mapping of input.mappings) { - const row = existing.get(input.connectionId, mapping.metabaseDatabaseId) as - | { target_connection_id: string | null; source: LocalMetabaseMappingSource } - | undefined; - if (!row) { - insert.run( - input.connectionId, - mapping.metabaseDatabaseId, - mapping.targetConnectionId, - mapping.syncEnabled ? 1 : 0, - timestamp, - ); - continue; - } - if (row.source === 'refresh' && row.target_connection_id === null) { - updateRefreshRow.run( - mapping.targetConnectionId, - mapping.syncEnabled ? 1 : 0, - timestamp, - input.connectionId, - mapping.metabaseDatabaseId, - ); - } - } - }); - - apply(); - } - - async replaceSourceState(input: ReplaceLocalMetabaseSourceStateInput): Promise { - const timestamp = this.now().toISOString(); - const syncMode = input.syncMode ?? 'ALL'; - const selections = input.selections ?? []; - const defaultTagNames = input.defaultTagNames ?? []; - - const replace = this.db.transaction(() => { - this.db - .prepare( - ` - INSERT INTO local_metabase_sync_config ( - metabase_connection_id, - sync_mode, - default_tag_names_json, - updated_at - ) - VALUES (?, ?, ?, ?) - ON CONFLICT(metabase_connection_id) DO UPDATE SET - sync_mode = excluded.sync_mode, - default_tag_names_json = excluded.default_tag_names_json, - updated_at = excluded.updated_at - `, - ) - .run(input.connectionId, syncMode, JSON.stringify(defaultTagNames), timestamp); - - this.db.prepare('DELETE FROM local_metabase_selections WHERE metabase_connection_id = ?').run(input.connectionId); - const insertSelection = this.db.prepare(` - INSERT INTO local_metabase_selections ( - metabase_connection_id, - selection_type, - metabase_object_id - ) - VALUES (?, ?, ?) - `); - for (const selection of selections) { - insertSelection.run(input.connectionId, selection.selectionType, selection.metabaseObjectId); - } - - this.db - .prepare('DELETE FROM local_metabase_database_mappings WHERE metabase_connection_id = ?') - .run(input.connectionId); - const insertMapping = this.db.prepare(` - INSERT INTO local_metabase_database_mappings ( - metabase_connection_id, - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source, - updated_at - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `); - for (const mapping of input.mappings) { - insertMapping.run( - input.connectionId, - mapping.metabaseDatabaseId, - mapping.metabaseDatabaseName, - mapping.metabaseEngine, - mapping.metabaseHost, - mapping.metabaseDbName, - mapping.targetConnectionId, - mapping.syncEnabled ? 1 : 0, - mapping.source, - timestamp, - ); - } - }); - - replace(); - } - - async listDatabaseMappings(connectionId: string): Promise { - const rows = this.db - .prepare( - ` - SELECT - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source - FROM local_metabase_database_mappings - WHERE metabase_connection_id = ? - ORDER BY metabase_database_id - `, - ) - .all(connectionId) as Array<{ - metabase_database_id: number; - metabase_database_name: string | null; - metabase_engine: string | null; - metabase_host: string | null; - metabase_db_name: string | null; - target_connection_id: string | null; - sync_enabled: number; - source: LocalMetabaseMappingSource; - }>; - - return rows.map((row) => ({ - metabaseDatabaseId: row.metabase_database_id, - metabaseDatabaseName: row.metabase_database_name, - metabaseEngine: row.metabase_engine, - metabaseHost: row.metabase_host, - metabaseDbName: row.metabase_db_name, - targetConnectionId: row.target_connection_id, - syncEnabled: row.sync_enabled === 1, - source: row.source, - })); - } - - async upsertDatabaseMapping(input: UpsertLocalMetabaseDatabaseMappingInput): Promise { - const timestamp = this.now().toISOString(); - this.db - .prepare( - ` - INSERT INTO local_metabase_database_mappings ( - metabase_connection_id, - metabase_database_id, - metabase_database_name, - metabase_engine, - metabase_host, - metabase_db_name, - target_connection_id, - sync_enabled, - source, - updated_at - ) - VALUES (?, ?, NULL, NULL, NULL, NULL, ?, ?, ?, ?) - ON CONFLICT(metabase_connection_id, metabase_database_id) DO UPDATE SET - target_connection_id = excluded.target_connection_id, - sync_enabled = excluded.sync_enabled, - source = excluded.source, - updated_at = excluded.updated_at - `, - ) - .run( - input.connectionId, - input.metabaseDatabaseId, - input.targetConnectionId, - input.syncEnabled ? 1 : 0, - input.source, - timestamp, - ); - } - - async setMappingSyncEnabled(input: SetLocalMetabaseMappingSyncEnabledInput): Promise { - const timestamp = this.now().toISOString(); - this.db - .prepare( - ` - UPDATE local_metabase_database_mappings - SET sync_enabled = ?, updated_at = ? - WHERE metabase_connection_id = ? AND metabase_database_id = ? - `, - ) - .run(input.syncEnabled ? 1 : 0, timestamp, input.connectionId, input.metabaseDatabaseId); - } - - async setSyncState(input: SetLocalMetabaseSyncStateInput): Promise { - const timestamp = this.now().toISOString(); - const write = this.db.transaction(() => { - this.db - .prepare( - ` - INSERT INTO local_metabase_sync_config ( - metabase_connection_id, - sync_mode, - default_tag_names_json, - updated_at - ) - VALUES (?, ?, ?, ?) - ON CONFLICT(metabase_connection_id) DO UPDATE SET - sync_mode = excluded.sync_mode, - default_tag_names_json = excluded.default_tag_names_json, - updated_at = excluded.updated_at - `, - ) - .run(input.connectionId, input.syncMode, JSON.stringify(input.defaultTagNames), timestamp); - - this.db.prepare('DELETE FROM local_metabase_selections WHERE metabase_connection_id = ?').run(input.connectionId); - const insertSelection = this.db.prepare(` - INSERT INTO local_metabase_selections ( - metabase_connection_id, - selection_type, - metabase_object_id - ) - VALUES (?, ?, ?) - `); - for (const selection of input.selections) { - insertSelection.run(input.connectionId, selection.selectionType, selection.metabaseObjectId); - } - }); - - write(); - } - async refreshDiscoveredDatabases(input: RefreshLocalMetabaseDiscoveredDatabasesInput): Promise { const timestamp = this.now().toISOString(); const refresh = this.db.transaction(() => { const upsert = this.db.prepare(` - INSERT INTO local_metabase_database_mappings ( + INSERT INTO local_metabase_discovered_databases ( metabase_connection_id, metabase_database_id, metabase_database_name, metabase_engine, metabase_host, metabase_db_name, - target_connection_id, - sync_enabled, - source, updated_at ) - VALUES (?, ?, ?, ?, ?, ?, NULL, 0, 'refresh', ?) + VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(metabase_connection_id, metabase_database_id) DO UPDATE SET metabase_database_name = excluded.metabase_database_name, metabase_engine = excluded.metabase_engine, @@ -483,78 +140,116 @@ export class LocalMetabaseSourceStateReader implements MetabaseSourceStateReader refresh(); } - async clearDatabaseMappings(input: ClearLocalMetabaseMappingsInput): Promise { - if (input.metabaseDatabaseId === undefined) { - this.db.prepare('DELETE FROM local_metabase_database_mappings WHERE metabase_connection_id = ?').run(input.connectionId); - return; - } - this.db - .prepare('DELETE FROM local_metabase_database_mappings WHERE metabase_connection_id = ? AND metabase_database_id = ?') - .run(input.connectionId, input.metabaseDatabaseId); - } - - async getUnhydratedSyncEnabledMappingIds(connectionId: string): Promise { + async listDiscoveredDatabases(connectionId: string): Promise { const rows = this.db - .prepare( - ` - SELECT metabase_database_id - FROM local_metabase_database_mappings - WHERE metabase_connection_id = ? - AND sync_enabled = 1 - AND target_connection_id IS NOT NULL - AND metabase_database_name IS NULL - ORDER BY metabase_database_id - `, - ) - .all(connectionId) as Array<{ metabase_database_id: number }>; - return rows.map((row) => row.metabase_database_id); - } - - async getSourceState(connectionId: string): Promise { - const config = this.db - .prepare('SELECT sync_mode, default_tag_names_json FROM local_metabase_sync_config WHERE metabase_connection_id = ?') - .get(connectionId) as SyncConfigRow | undefined; - const selections = this.db - .prepare( - ` - SELECT selection_type, metabase_object_id - FROM local_metabase_selections - WHERE metabase_connection_id = ? - ORDER BY selection_type, metabase_object_id - `, - ) - .all(connectionId) as SelectionRow[]; - const mappings = this.db .prepare( ` SELECT metabase_database_id, metabase_database_name, metabase_engine, - target_connection_id, - sync_enabled - FROM local_metabase_database_mappings + metabase_host, + metabase_db_name, + updated_at + FROM local_metabase_discovered_databases WHERE metabase_connection_id = ? - AND metabase_database_name IS NOT NULL ORDER BY metabase_database_id `, ) - .all(connectionId) as MappingRow[]; + .all(connectionId) as DiscoveryRow[]; + return rows.map(discoveredRowToDatabase); + } - return { - syncMode: config?.sync_mode ?? 'ALL', - defaultTagNames: config ? parseDefaultTagNames(config.default_tag_names_json) : [], - selections: selections.map((selection) => ({ - selectionType: selection.selection_type, - metabaseObjectId: selection.metabase_object_id, - })), - mappings: mappings.map((mapping) => ({ - metabaseDatabaseId: mapping.metabase_database_id, - metabaseDatabaseName: mapping.metabase_database_name, - metabaseEngine: mapping.metabase_engine, - targetConnectionId: mapping.target_connection_id, - syncEnabled: mapping.sync_enabled === 1, - })), - }; + async getDiscoveredDatabase( + connectionId: string, + metabaseDatabaseId: number, + ): Promise { + const row = this.db + .prepare( + ` + SELECT + metabase_database_id, + metabase_database_name, + metabase_engine, + metabase_host, + metabase_db_name, + updated_at + FROM local_metabase_discovered_databases + WHERE metabase_connection_id = ? AND metabase_database_id = ? + `, + ) + .get(connectionId, metabaseDatabaseId) as DiscoveryRow | undefined; + return row ? discoveredRowToDatabase(row) : null; + } +} + +export class KtxYamlMetabaseSourceStateReader implements MetabaseSourceStateReader { + constructor( + private readonly project: Pick, + private readonly options: { discoveryCache?: LocalMetabaseDiscoveryCache } = {}, + ) {} + + async getSourceState(connectionId: string): Promise { + const connection = this.project.config.connections[connectionId]; + if (!connection || String(connection.driver ?? '').toLowerCase() !== 'metabase') { + return emptyMetabaseSourceState(); + } + + const bootstrap = parseMetabaseMappingBootstrap(connectionId, connection); + const discovered = new Map( + (await this.options.discoveryCache?.listDiscoveredDatabases(connectionId))?.map((database) => [database.id, database]) ?? + [], + ); + + return { + syncMode: bootstrap.syncMode, + selections: selectionState(bootstrap), + defaultTagNames: bootstrap.defaultTagNames, + mappings: configuredMappingIds(bootstrap).map((id) => { + const metadata = discovered.get(id); + return { + metabaseDatabaseId: id, + metabaseDatabaseName: metadata?.name ?? null, + metabaseEngine: metadata?.engine ?? null, + metabaseHost: metadata?.host ?? null, + metabaseDbName: metadata?.dbName ?? null, + targetConnectionId: bootstrap.databaseMappings[String(id)] ?? null, + syncEnabled: bootstrap.syncEnabled[String(id)] ?? false, + }; + }), + }; + } + + async listDatabaseMappings(connectionId: string): Promise { + const state = await this.getSourceState(connectionId); + const configuredRows: LocalMetabaseMappingListRow[] = state.mappings.map((mapping) => ({ + metabaseDatabaseId: mapping.metabaseDatabaseId, + metabaseDatabaseName: mapping.metabaseDatabaseName, + metabaseEngine: mapping.metabaseEngine, + metabaseHost: mapping.metabaseHost ?? null, + metabaseDbName: mapping.metabaseDbName ?? null, + targetConnectionId: mapping.targetConnectionId, + syncEnabled: mapping.syncEnabled, + source: 'ktx.yaml', + })); + + const configuredIds = new Set(configuredRows.map((row) => row.metabaseDatabaseId)); + const discoveredRows = + (await this.options.discoveryCache?.listDiscoveredDatabases(connectionId))?.filter( + (database) => !configuredIds.has(database.id), + ) ?? []; + return [ + ...configuredRows, + ...discoveredRows.map((database) => ({ + metabaseDatabaseId: database.id, + metabaseDatabaseName: database.name, + metabaseEngine: database.engine, + metabaseHost: database.host, + metabaseDbName: database.dbName, + targetConnectionId: null, + syncEnabled: false, + source: 'refresh' as const, + })), + ].sort((left, right) => left.metabaseDatabaseId - right.metabaseDatabaseId); } } diff --git a/packages/context/src/ingest/adapters/metabase/source-state-port.ts b/packages/context/src/ingest/adapters/metabase/source-state-port.ts index 7c872f8d..16de9369 100644 --- a/packages/context/src/ingest/adapters/metabase/source-state-port.ts +++ b/packages/context/src/ingest/adapters/metabase/source-state-port.ts @@ -9,6 +9,8 @@ export interface MetabaseSourceStateMapping { metabaseDatabaseId: number; metabaseDatabaseName: string | null; metabaseEngine: string | null; + metabaseHost?: string | null; + metabaseDbName?: string | null; targetConnectionId: string | null; syncEnabled: boolean; } diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index d2336ae9..3c238d98 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -240,17 +240,15 @@ export { createLocalMetabaseSourceAdapter, metabaseRuntimeConfigFromLocalConnection, } from './adapters/metabase/local-metabase.adapter.js'; -export { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; +export { + KtxYamlMetabaseSourceStateReader, + LocalMetabaseDiscoveryCache, +} from './adapters/metabase/local-source-state-store.js'; export type { - ClearLocalMetabaseMappingsInput, + LocalMetabaseDiscoveredDatabaseRow, LocalMetabaseMappingListRow, LocalMetabaseMappingSource, - LocalMetabaseSourceStateMappingInput, - ReplaceLocalMetabaseSourceStateInput, RefreshLocalMetabaseDiscoveredDatabasesInput, - SetLocalMetabaseMappingSyncEnabledInput, - SetLocalMetabaseSyncStateInput, - UpsertLocalMetabaseDatabaseMappingInput, } from './adapters/metabase/local-source-state-store.js'; export { metabaseLocalConnectionIdSchema, metabasePullConfigSchema, parseMetabasePullConfig } from './adapters/metabase/types.js'; export type { MetabasePullConfig, MetabaseSyncMode } from './adapters/metabase/types.js'; diff --git a/packages/context/src/ingest/local-adapters.test.ts b/packages/context/src/ingest/local-adapters.test.ts index 7161743a..ad3b23f4 100644 --- a/packages/context/src/ingest/local-adapters.test.ts +++ b/packages/context/src/ingest/local-adapters.test.ts @@ -480,7 +480,7 @@ describe('local ingest adapters', () => { }), config: { ...project.config, - setup: { database_connection_ids: ['warehouse'], completed_steps: [] }, + setup: { database_connection_ids: ['warehouse'] }, connections: { warehouse: { driver: 'postgres', diff --git a/packages/context/src/ingest/local-ingest.ts b/packages/context/src/ingest/local-ingest.ts index 2ec13184..6056f6ed 100644 --- a/packages/context/src/ingest/local-ingest.ts +++ b/packages/context/src/ingest/local-ingest.ts @@ -9,10 +9,9 @@ import type { KtxSemanticLayerComputePort } from '../daemon/index.js'; import type { KtxLocalProject } from '../project/index.js'; import { ktxLocalStateDbPath } from '../project/index.js'; import { planMetabaseFanoutChildren } from './adapters/metabase/fanout-planner.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; +import { KtxYamlMetabaseSourceStateReader, LocalMetabaseDiscoveryCache } from './adapters/metabase/local-source-state-store.js'; import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } from './local-adapters.js'; import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js'; -import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; import type { MemoryFlowEventSink } from './memory-flow/types.js'; import { buildSyncId } from './raw-sources-paths.js'; import type { IngestReportBody, IngestReportSnapshot } from './reports.js'; @@ -364,16 +363,10 @@ export async function runLocalMetabaseIngest( const metabaseConnectionId = safeSegment('metabase connection id', options.metabaseConnectionId); assertConfigured(options.project, 'metabase', metabaseConnectionId); - await seedLocalMappingStateFromKtxYaml(options.project, metabaseConnectionId); const adapter = findAdapter(options.adapters, 'metabase'); - const sourceStateReader = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(options.project) }); - - const unhydrated = await sourceStateReader.getUnhydratedSyncEnabledMappingIds(metabaseConnectionId); - if (unhydrated.length > 0) { - throw new Error( - `Metabase mappings ${unhydrated.join(', ')} are not hydrated; run \`ktx connection mapping refresh ${metabaseConnectionId}\` before local Metabase ingest.`, - ); - } + const sourceStateReader = new KtxYamlMetabaseSourceStateReader(options.project, { + discoveryCache: new LocalMetabaseDiscoveryCache({ dbPath: ktxLocalStateDbPath(options.project) }), + }); const state = await sourceStateReader.getSourceState(metabaseConnectionId); const childPlans = planMetabaseFanoutChildren({ diff --git a/packages/context/src/ingest/local-mapping-reconcile.test.ts b/packages/context/src/ingest/local-mapping-reconcile.test.ts index c0f8dcac..4a5d2740 100644 --- a/packages/context/src/ingest/local-mapping-reconcile.test.ts +++ b/packages/context/src/ingest/local-mapping-reconcile.test.ts @@ -4,7 +4,6 @@ import { join } from 'node:path'; import { afterEach, describe, expect, it } from 'vitest'; import { ktxLocalStateDbPath, type KtxLocalProject } from '../project/index.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; import { seedLocalMappingStateFromKtxYaml } from './local-mapping-reconcile.js'; describe('local mapping yaml reconciliation bridge', () => { @@ -23,7 +22,7 @@ describe('local mapping yaml reconciliation bridge', () => { } as KtxLocalProject; } - it('seeds Metabase local state from ktx.yaml mapping intent', async () => { + it('does not copy Metabase mapping intent into local SQLite state', async () => { tempDir = await mkdtemp(join(tmpdir(), 'ktx-metabase-yaml-seed-')); const project = projectWithConnections({ 'prod-metabase': { @@ -39,17 +38,7 @@ describe('local mapping yaml reconciliation bridge', () => { 'prod-warehouse': { driver: 'postgres', url: 'postgresql://readonly@db.test/analytics' }, }); - await seedLocalMappingStateFromKtxYaml(project, 'prod-metabase'); - - const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) }); - await expect(store.listDatabaseMappings('prod-metabase')).resolves.toMatchObject([ - { metabaseDatabaseId: 1, targetConnectionId: 'prod-warehouse', syncEnabled: true, source: 'ktx.yaml' }, - ]); - await expect(store.getSourceState('prod-metabase')).resolves.toMatchObject({ - syncMode: 'ONLY', - selections: [{ selectionType: 'collection', metabaseObjectId: 12 }], - defaultTagNames: ['ktx'], - }); + await expect(seedLocalMappingStateFromKtxYaml(project, 'prod-metabase')).resolves.toBeUndefined(); }); it('seeds Looker local mappings from ktx.yaml mapping intent', async () => { diff --git a/packages/context/src/ingest/local-mapping-reconcile.ts b/packages/context/src/ingest/local-mapping-reconcile.ts index a1bae2fc..1a58af95 100644 --- a/packages/context/src/ingest/local-mapping-reconcile.ts +++ b/packages/context/src/ingest/local-mapping-reconcile.ts @@ -3,29 +3,8 @@ import { parseConnectionMappingBootstrap, type KtxLocalProject, type LookerMappingBootstrap, - type MetabaseMappingBootstrap, } from '../project/index.js'; import { LocalLookerRuntimeStore } from './adapters/looker/local-runtime-store.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; - -function metabaseSelections(bootstrap: MetabaseMappingBootstrap) { - return [ - ...bootstrap.selections.collections.map((id) => ({ selectionType: 'collection' as const, metabaseObjectId: id })), - ...bootstrap.selections.items.map((id) => ({ selectionType: 'item' as const, metabaseObjectId: id })), - ]; -} - -function metabaseMappings(bootstrap: MetabaseMappingBootstrap) { - const ids = new Set([...Object.keys(bootstrap.databaseMappings), ...Object.keys(bootstrap.syncEnabled)]); - return [...ids] - .map((id) => Number(id)) - .sort((a, b) => a - b) - .map((id) => ({ - metabaseDatabaseId: id, - targetConnectionId: bootstrap.databaseMappings[String(id)] ?? null, - syncEnabled: bootstrap.syncEnabled[String(id)] ?? false, - })); -} function lookerMappings(bootstrap: LookerMappingBootstrap) { return Object.entries(bootstrap.connectionMappings) @@ -44,20 +23,12 @@ export async function seedLocalMappingStateFromKtxYaml(project: KtxLocalProject, return; } - const dbPath = ktxLocalStateDbPath(project); if (bootstrap.adapter === 'metabase') { - await new LocalMetabaseSourceStateReader({ dbPath }).applyYamlBootstrap({ - connectionId, - syncMode: bootstrap.syncMode, - defaultTagNames: bootstrap.defaultTagNames, - selections: metabaseSelections(bootstrap), - mappings: metabaseMappings(bootstrap), - }); return; } if (bootstrap.adapter === 'looker') { - await new LocalLookerRuntimeStore({ dbPath }).applyYamlBootstrap({ + await new LocalLookerRuntimeStore({ dbPath: ktxLocalStateDbPath(project) }).applyYamlBootstrap({ lookerConnectionId: connectionId, mappings: lookerMappings(bootstrap), }); diff --git a/packages/context/src/ingest/local-metabase-ingest.test.ts b/packages/context/src/ingest/local-metabase-ingest.test.ts index da00c7ec..fe3bd80b 100644 --- a/packages/context/src/ingest/local-metabase-ingest.test.ts +++ b/packages/context/src/ingest/local-metabase-ingest.test.ts @@ -4,7 +4,7 @@ import { join } from 'node:path'; import { AgentRunnerService } from '../agent/index.js'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { initKtxProject, type KtxLocalProject } from '../project/index.js'; -import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; +import { LocalMetabaseDiscoveryCache } from './adapters/metabase/local-source-state-store.js'; import { getLocalIngestStatus, runLocalMetabaseIngest } from './local-ingest.js'; import type { ChunkResult, FetchContext, SourceAdapter } from './types.js'; @@ -94,33 +94,19 @@ describe('runLocalMetabaseIngest', () => { }); async function seedMetabaseState(): Promise { - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.replaceSourceState({ - connectionId: 'prod-metabase', + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': 'warehouse_a', '2': 'warehouse_b' }, + syncEnabled: { '1': true, '2': true }, syncMode: 'ALL', defaultTagNames: ['ktx'], - selections: [], - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: 'localhost', - metabaseDbName: 'a', - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'refresh', - }, - { - metabaseDatabaseId: 2, - metabaseDatabaseName: 'Warehouse B', - metabaseEngine: 'postgres', - metabaseHost: 'localhost', - metabaseDbName: 'b', - targetConnectionId: 'warehouse_b', - syncEnabled: true, - source: 'refresh', - }, + selections: { collections: [], items: [] }, + }; + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); + await discoveryCache.refreshDiscoveredDatabases({ + connectionId: 'prod-metabase', + discovered: [ + { id: 1, name: 'Warehouse A', engine: 'postgres', host: 'localhost', dbName: 'a' }, + { id: 2, name: 'Warehouse B', engine: 'postgres', host: 'localhost', dbName: 'b' }, ], }); } @@ -151,22 +137,10 @@ describe('runLocalMetabaseIngest', () => { }); it('throws before runner work when there are no sync-enabled mapped rows', async () => { - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.replaceSourceState({ - connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 1, - metabaseDatabaseName: 'Warehouse A', - metabaseEngine: 'postgres', - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: null, - syncEnabled: true, - source: 'refresh', - }, - ], - }); + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': null }, + syncEnabled: { '1': true }, + }; await expect( runLocalMetabaseIngest({ @@ -178,59 +152,28 @@ describe('runLocalMetabaseIngest', () => { ).rejects.toThrow('no sync-enabled mappings with a target connection'); }); - it('throws with refresh guidance for unhydrated sync-enabled rows', async () => { - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.replaceSourceState({ - connectionId: 'prod-metabase', - mappings: [ - { - metabaseDatabaseId: 7, - metabaseDatabaseName: null, - metabaseEngine: null, - metabaseHost: null, - metabaseDbName: null, - targetConnectionId: 'warehouse_a', - syncEnabled: true, - source: 'ktx.yaml', - }, - ], + it('seeds yaml-only Metabase mappings before the unhydrated fan-out preflight', async () => { + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': 'warehouse_a' }, + syncEnabled: { '1': true }, + }; + + const result = await runLocalMetabaseIngest({ + project, + adapters: [new FakeMetabaseSourceAdapter()], + metabaseConnectionId: 'prod-metabase', + agentRunner: new TestAgentRunner(), + jobIdFactory: () => 'metabase-child-1', }); - await expect( - runLocalMetabaseIngest({ - project, - adapters: [new FakeMetabaseSourceAdapter()], + expect(result.status).toBe('all_succeeded'); + expect(result.children).toMatchObject([ + { metabaseConnectionId: 'prod-metabase', - agentRunner: new TestAgentRunner(), - }), - ).rejects.toThrow('run `ktx connection mapping refresh prod-metabase`'); - }); - - it('seeds yaml-only Metabase mappings before the unhydrated fan-out preflight', async () => { - const project = { - projectDir: tempDir, - config: { - ingest: { adapters: ['metabase'] }, - connections: { - 'prod-metabase': { - driver: 'metabase', - mappings: { - databaseMappings: { '1': 'prod-warehouse' }, - syncEnabled: { '1': true }, - }, - }, - 'prod-warehouse': { driver: 'postgres', url: 'postgresql://readonly@db.test/analytics' }, - }, + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', }, - } as never; - - await expect( - runLocalMetabaseIngest({ - project, - adapters: [new FakeMetabaseSourceAdapter()], - metabaseConnectionId: 'prod-metabase', - }), - ).rejects.toThrow('run `ktx connection mapping refresh prod-metabase`'); + ]); }); it('rejects source-dir uploads through the Metabase fan-out runner', async () => { @@ -266,15 +209,15 @@ describe('runLocalMetabaseIngest', () => { it('captures fetch-time child failures and continues later mappings', async () => { await seedMetabaseState(); project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' }; - const store = new LocalMetabaseSourceStateReader({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); - await store.upsertDatabaseMapping({ - connectionId: 'prod-metabase', - metabaseDatabaseId: 3, - targetConnectionId: 'warehouse_c', - syncEnabled: true, - source: 'cli', - }); - await store.refreshDiscoveredDatabases({ + project.config.connections['prod-metabase'].mappings = { + databaseMappings: { '1': 'warehouse_a', '2': 'warehouse_b', '3': 'warehouse_c' }, + syncEnabled: { '1': true, '2': true, '3': true }, + syncMode: 'ALL', + defaultTagNames: ['ktx'], + selections: { collections: [], items: [] }, + }; + const discoveryCache = new LocalMetabaseDiscoveryCache({ dbPath: join(tempDir, '.ktx', 'db.sqlite') }); + await discoveryCache.refreshDiscoveredDatabases({ connectionId: 'prod-metabase', discovered: [ { id: 1, name: 'Warehouse A', engine: 'postgres', host: 'localhost', dbName: 'a' }, diff --git a/packages/context/src/ingest/tools/verification-ledger.tool.ts b/packages/context/src/ingest/tools/verification-ledger.tool.ts index f99e79be..ac880607 100644 --- a/packages/context/src/ingest/tools/verification-ledger.tool.ts +++ b/packages/context/src/ingest/tools/verification-ledger.tool.ts @@ -8,7 +8,7 @@ const verificationLedgerInputSchema = z.object({ notes: z.string().max(2000).optional(), }); -export interface VerificationLedgerEntry { +interface VerificationLedgerEntry { summary: string; verifiedIdentifiers: string[]; unverifiedIdentifiers: string[]; diff --git a/packages/context/src/ingest/tools/warehouse-verification/index.ts b/packages/context/src/ingest/tools/warehouse-verification/index.ts index 0901eace..e6ac2c1c 100644 --- a/packages/context/src/ingest/tools/warehouse-verification/index.ts +++ b/packages/context/src/ingest/tools/warehouse-verification/index.ts @@ -6,12 +6,6 @@ import { EntityDetailsTool } from './entity-details.tool.js'; import { SqlExecutionTool } from './sql-execution.tool.js'; import { WarehouseCatalogService } from './warehouse-catalog.service.js'; -export { DiscoverDataTool } from './discover-data.tool.js'; -export { EntityDetailsTool } from './entity-details.tool.js'; -export { SqlExecutionTool } from './sql-execution.tool.js'; -export { WarehouseCatalogService } from './warehouse-catalog.service.js'; -export type { RawSchemaHit, TableDetail, WarehouseColumnDetail } from './warehouse-catalog.service.js'; - export function createWarehouseVerificationTools(deps: { connections: SlConnectionCatalogPort; fallbackFileStore: KtxFileStorePort; diff --git a/packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts b/packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts index 691f88e9..a1edf807 100644 --- a/packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts +++ b/packages/context/src/ingest/tools/warehouse-verification/warehouse-catalog.service.ts @@ -14,7 +14,7 @@ export interface WarehouseCatalogServiceDeps { fileStore: KtxFileStorePort; } -export interface WarehouseColumnDetail extends KtxSchemaColumn { +interface WarehouseColumnDetail extends KtxSchemaColumn { descriptions: Record; rowCount: number | null; nullCount: number | null; diff --git a/packages/context/src/package-exports.test.ts b/packages/context/src/package-exports.test.ts index 4fd7e502..9689744e 100644 --- a/packages/context/src/package-exports.test.ts +++ b/packages/context/src/package-exports.test.ts @@ -199,7 +199,9 @@ describe('@ktx/context package exports', () => { expect(ingest.stagedSyncConfigSchema).toBeDefined(); expect(ingest.stagedLookerScopeFileSchema).toBeDefined(); expect(ingest.stagedLookerFetchReportSchema).toBeDefined(); - expect(ingest.LocalMetabaseSourceStateReader).toBeTypeOf('function'); + expect('LocalMetabaseSourceStateReader' in ingest).toBe(false); + expect(ingest.KtxYamlMetabaseSourceStateReader).toBeTypeOf('function'); + expect(ingest.LocalMetabaseDiscoveryCache).toBeTypeOf('function'); expect(ingest.createLocalMetabaseSourceAdapter).toBeTypeOf('function'); expect(ingest.metabaseRuntimeConfigFromLocalConnection).toBeTypeOf('function'); expect(ingest.IngestMetabaseClientFactory).toBeTypeOf('function'); diff --git a/packages/context/src/project/config.test.ts b/packages/context/src/project/config.test.ts index 1be70322..cad7945c 100644 --- a/packages/context/src/project/config.test.ts +++ b/packages/context/src/project/config.test.ts @@ -81,16 +81,13 @@ describe('KTX project config', () => { }); }); - it('parses and serializes setup wizard metadata', () => { + it('parses and serializes setup warehouse metadata without setup progress', () => { const config = parseKtxProjectConfig(` project: revenue setup: database_connection_ids: - warehouse - analytics - completed_steps: - - project - - llm connections: warehouse: driver: postgres @@ -99,13 +96,12 @@ connections: expect(config.setup).toEqual({ database_connection_ids: ['warehouse', 'analytics'], - completed_steps: ['project', 'llm'], }); const serialized = serializeKtxProjectConfig(config); expect(serialized).toContain('setup:'); expect(serialized).toContain('database_connection_ids:'); - expect(serialized).toContain('completed_steps:'); + expect(serialized).not.toContain('completed_steps:'); }); it('parses global direct Anthropic LLM config', () => { diff --git a/packages/context/src/project/config.ts b/packages/context/src/project/config.ts index f1aa9d71..5da193f2 100644 --- a/packages/context/src/project/config.ts +++ b/packages/context/src/project/config.ts @@ -75,7 +75,6 @@ export interface KtxProjectConnectionConfig { export interface KtxProjectSetupConfig { database_connection_ids: string[]; - completed_steps?: string[]; } export interface KtxProjectConfig { @@ -508,7 +507,6 @@ export function parseKtxProjectConfig(raw: string): KtxProjectConfig { ? { setup: { database_connection_ids: stringArray(setup.database_connection_ids, []), - completed_steps: stringArray(setup.completed_steps, []), }, } : {}), diff --git a/packages/context/src/project/index.ts b/packages/context/src/project/index.ts index 8fd171d4..8ea92bf6 100644 --- a/packages/context/src/project/index.ts +++ b/packages/context/src/project/index.ts @@ -27,12 +27,10 @@ export { initKtxProject, loadKtxProject } from './project.js'; export type { KtxSetupStep } from './setup-config.js'; export { KTX_SETUP_STEPS, - ktxSetupCompletedSteps, ktxSetupStatePath, markKtxSetupStateStepComplete, mergeKtxSetupGitignoreEntries, readKtxSetupState, setKtxSetupDatabaseConnectionIds, - stripKtxSetupCompletedSteps, writeKtxSetupState, } from './setup-config.js'; diff --git a/packages/context/src/project/setup-config.test.ts b/packages/context/src/project/setup-config.test.ts index 46912d43..92c02707 100644 --- a/packages/context/src/project/setup-config.test.ts +++ b/packages/context/src/project/setup-config.test.ts @@ -4,12 +4,10 @@ import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { buildDefaultKtxProjectConfig } from './config.js'; import { - ktxSetupCompletedSteps, markKtxSetupStateStepComplete, mergeKtxSetupGitignoreEntries, readKtxSetupState, setKtxSetupDatabaseConnectionIds, - stripKtxSetupCompletedSteps, } from './setup-config.js'; describe('KTX setup config helpers', () => { @@ -48,36 +46,6 @@ describe('KTX setup config helpers', () => { expect(config.setup).toBeUndefined(); }); - it('strips setup completed steps while preserving database connection ids', () => { - const config = { - ...buildDefaultKtxProjectConfig('warehouse'), - setup: { - database_connection_ids: ['warehouse'], - completed_steps: ['project', 'databases'], - }, - }; - - expect(stripKtxSetupCompletedSteps(config).setup).toEqual({ - database_connection_ids: ['warehouse'], - }); - }); - - it('combines legacy config setup steps with local state for reads', () => { - const config = { - ...buildDefaultKtxProjectConfig('warehouse'), - setup: { - database_connection_ids: ['warehouse'], - completed_steps: ['project', 'databases'], - }, - }; - - expect(ktxSetupCompletedSteps(config, { completed_steps: ['databases', 'sources'] })).toEqual([ - 'project', - 'databases', - 'sources', - ]); - }); - it('merges setup-local gitignore entries without removing existing lines', () => { expect(mergeKtxSetupGitignoreEntries('cache/\ndb.sqlite\n')).toBe( ['cache/', 'db.sqlite', 'db.sqlite-*', 'ingest-transcripts/', 'secrets/', 'setup/', 'agents/', ''].join('\n'), diff --git a/packages/context/src/project/setup-config.ts b/packages/context/src/project/setup-config.ts index a426caf7..b2c8e161 100644 --- a/packages/context/src/project/setup-config.ts +++ b/packages/context/src/project/setup-config.ts @@ -64,27 +64,6 @@ export async function markKtxSetupStateStepComplete(projectDir: string, step: Kt return nextState; } -export function ktxSetupCompletedSteps(config: KtxProjectConfig, state: KtxSetupState): KtxSetupStep[] { - return uniqueSetupSteps([...(config.setup?.completed_steps ?? []), ...state.completed_steps]); -} - -export function stripKtxSetupCompletedSteps(config: KtxProjectConfig): KtxProjectConfig { - if (!config.setup) { - return config; - } - const databaseConnectionIds = config.setup.database_connection_ids ?? []; - if (databaseConnectionIds.length === 0) { - const { setup: _setup, ...withoutSetup } = config; - return withoutSetup; - } - return { - ...config, - setup: { - database_connection_ids: [...databaseConnectionIds], - }, - }; -} - export function setKtxSetupDatabaseConnectionIds( config: KtxProjectConfig, connectionIds: string[], @@ -95,7 +74,6 @@ export function setKtxSetupDatabaseConnectionIds( ...config, setup: { database_connection_ids: uniqueConnectionIds, - ...(config.setup?.completed_steps ? { completed_steps: [...config.setup.completed_steps] } : {}), }, }; }