import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join, relative, resolve } from 'node:path'; import { fileURLToPath, pathToFileURL } from 'node:url'; import { localConnectionTypeForConfig, resolveNotionConnectionAuthToken, } from '@ktx/context/connections'; import { resolveKtxConfigReference } from '@ktx/context/core'; import { cloneOrPull, DEFAULT_METABASE_CLIENT_CONFIG, discoverMetabaseDatabases, type DiscoveredMetabaseDatabase, loadDbtSchemaFiles, loadProjectInfo, MetabaseClient, type NotionApi, NotionClient, parseLookmlStagedDir, parseMetricflowFiles, testRepoConnection, } from '@ktx/context/ingest'; import { type KtxProjectConfig, type KtxProjectConnectionConfig, loadKtxProject, markKtxSetupStateStepComplete, serializeKtxProjectConfig, } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { pickNotionRootPages } from './notion-page-picker.js'; import { runKtxSourceMapping } from './source-mapping.js'; import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js'; import { runKtxPublicIngest } from './public-ingest.js'; import { writeProjectLocalSecretReference } from './setup-secrets.js'; import { createKtxSetupPromptAdapter, type KtxSetupPromptOption, } from './setup-prompts.js'; export type KtxSetupSourceType = 'dbt' | 'metricflow' | 'metabase' | 'looker' | 'lookml' | 'notion'; const DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN = 25; export interface KtxSetupSourcesArgs { projectDir: string; inputMode: 'auto' | 'disabled'; source?: KtxSetupSourceType; sourceConnectionId?: string; sourcePath?: string; sourceGitUrl?: string; sourceBranch?: string; sourceSubpath?: string; sourceAuthTokenRef?: string; sourceUrl?: string; sourceApiKeyRef?: string; sourceClientId?: string; sourceClientSecretRef?: string; sourceWarehouseConnectionId?: string; sourceProjectName?: string; sourceProfilesPath?: string; sourceTarget?: string; metabaseDatabaseId?: number; notionCrawlMode?: 'all_accessible' | 'selected_roots'; notionRootPageIds?: string[]; runInitialSourceIngest: boolean; skipSources: boolean; } export type KtxSetupSourcesResult = | { status: 'ready'; projectDir: string; connectionIds: string[] } | { status: 'skipped'; projectDir: string } | { status: 'back'; projectDir: string } | { status: 'missing-input'; projectDir: string } | { status: 'failed'; projectDir: string }; export interface KtxSetupSourcesPromptAdapter { multiselect(options: { message: string; options: KtxSetupPromptOption[]; initialValues?: string[]; required?: boolean; }): Promise; select(options: { message: string; options: KtxSetupPromptOption[] }): Promise; text(options: { message: string; placeholder?: string; initialValue?: string }): Promise; password(options: { message: string }): Promise; cancel(message: string): void; log?(message: string): void; } export type SourceValidationResult = { ok: true; detail?: string } | { ok: false; message: string }; export interface KtxSetupSourcesDeps { prompts?: KtxSetupSourcesPromptAdapter; testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>; validateDbt?: (connection: KtxProjectConnectionConfig) => Promise; validateMetricflow?: (connection: KtxProjectConnectionConfig) => Promise; validateMetabase?: (projectDir: string, connectionId: string) => Promise; validateLooker?: (projectDir: string, connectionId: string) => Promise; validateLookml?: (connection: KtxProjectConnectionConfig) => Promise; validateNotion?: (connection: KtxProjectConnectionConfig) => Promise; pickNotionRootPages?: typeof pickNotionRootPages; discoverMetabaseDatabases?: (args: { sourceUrl: string; sourceApiKeyRef: string; sourceConnectionId: string; }) => Promise; runMapping?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise; runInitialIngest?: ( projectDir: string, connectionId: string, io: KtxCliIo, options: { inputMode: KtxSetupSourcesArgs['inputMode'] }, ) => Promise; } const SOURCE_OPTIONS: Array<{ value: KtxSetupSourceType; label: string }> = [ { value: 'dbt', label: 'dbt' }, { value: 'metricflow', label: 'MetricFlow' }, { value: 'metabase', label: 'Metabase' }, { value: 'looker', label: 'Looker' }, { value: 'lookml', label: 'LookML' }, { value: 'notion', label: 'Notion' }, ]; const SOURCE_LABELS = Object.fromEntries(SOURCE_OPTIONS.map((option) => [option.value, option.label])) as Record< KtxSetupSourceType, string >; const PRIMARY_SOURCE_DRIVERS = new Set([ 'sqlite', 'postgres', 'mysql', 'clickhouse', 'sqlserver', 'bigquery', 'snowflake', ]); function createPromptAdapter(): KtxSetupSourcesPromptAdapter { return createKtxSetupPromptAdapter({ selectCancelValue: 'back', multiselectCancelValue: 'back', confirmEmptyOptionalMultiselect: true, }); } function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); } function stringField(value: unknown): string | undefined { return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined; } function sourceLabel(source: KtxSetupSourceType): string { return SOURCE_LABELS[source]; } function sourceAdapter(source: KtxSetupSourceType): string { return source; } function connectionNamePrompt(label: string): string { return `Name this ${label} connection\nKTX will use this short name in commands and config. You can rename it now.`; } function sourceSubpathPrompt(source: KtxSetupSourceType): string { if (source === 'dbt') { return [ 'Folder containing dbt_project.yml (optional)', 'Press Enter when dbt_project.yml is at the repo root.', 'For monorepos, enter a relative path like analytics/dbt.', ].join('\n'); } return [ `${sourceLabel(source)} project folder (optional)`, 'If the project files are inside a subfolder, enter that path.', 'Press Enter if the path or repo already points at the project.', ].join('\n'); } const SCAN_SKIP_DIRS = new Set(['.git', 'node_modules', '.venv', 'target', 'dbt_packages', 'dbt_modules', '__pycache__']); async function findDbtProjectSubpaths(rootDir: string): Promise { const entries = await readdir(rootDir, { withFileTypes: true, recursive: true }); const subpaths: string[] = []; for (const entry of entries) { if (!entry.isFile()) continue; if (entry.name !== 'dbt_project.yml' && entry.name !== 'dbt_project.yaml') continue; const relDir = relative(rootDir, entry.parentPath); if (relDir.split('/').some((part) => SCAN_SKIP_DIRS.has(part))) continue; subpaths.push(relDir); } return subpaths; } async function promptText( prompts: KtxSetupSourcesPromptAdapter, options: { message: string; placeholder?: string; initialValue?: string }, ): Promise { return await prompts.text({ ...options, message: withTextInputNavigation(options.message) }); } function assertSafeConnectionId(connectionId: string): void { if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) { throw new Error(`Unsafe connection id: ${connectionId}`); } } function credentialRef(value: string | undefined, label: string): string { const ref = value?.trim(); if (!ref) { throw new Error(`Missing ${label}; use env:NAME or file:/absolute/path`); } if (!ref.startsWith('env:') && !ref.startsWith('file:')) { throw new Error(`${label} must use env:NAME or file:/absolute/path`); } return ref; } async function chooseSourceCredentialRef(input: { prompts: KtxSetupSourcesPromptAdapter; projectDir: string; label: string; envName: string; secretFileName: string; existingRef?: string; }): Promise { while (true) { const choice = await input.prompts.select({ message: `How should KTX find your ${input.label}?`, options: [ ...(input.existingRef ? [{ value: 'keep', label: 'Keep existing credential' }] : []), { value: 'env', label: `Use ${input.envName} from the environment` }, { value: 'paste', label: 'Paste a key and save it as a local secret file' }, { value: 'back', label: 'Back' }, ], }); if (choice === 'back') return 'back'; if (choice === 'keep' && input.existingRef) return input.existingRef; if (choice === 'paste') { const value = await input.prompts.password({ message: input.label }); if (value === undefined) continue; if (!value.trim()) continue; const ref = await writeProjectLocalSecretReference({ projectDir: input.projectDir, fileName: input.secretFileName, value, }); input.prompts.log?.(`Saved to .ktx/secrets/${input.secretFileName}`); return ref; } return `env:${input.envName}`; } } async function chooseGitAuthCredentialRef(input: { prompts: KtxSetupSourcesPromptAdapter; projectDir: string; source: KtxSetupSourceType; connectionId: string; existingRef?: string; repoUrl?: string; testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>; }): Promise { const label = input.source === 'dbt' ? 'This' : `This ${sourceLabel(input.source)}`; while (true) { const choice = await input.prompts.select({ message: `${label} repo requires authentication.`, options: [ ...(input.existingRef ? [{ value: 'keep', label: 'Keep existing credential' }] : []), { value: 'env', label: 'Use GITHUB_TOKEN from the environment' }, { value: 'paste', label: 'Paste a token and save it as a local secret file' }, { value: 'skip', label: 'Skip — try without authentication' }, { value: 'back', label: 'Back' }, ], }); if (choice === 'back') return 'back'; if (choice === 'keep' && input.existingRef) return input.existingRef; if (choice === 'skip') return undefined; if (choice === 'paste') { const value = await input.prompts.password({ message: 'Git access token' }); if (value === undefined) continue; if (!value.trim()) continue; if (input.testGitRepo && input.repoUrl) { const result = await input.testGitRepo({ repoUrl: input.repoUrl, authToken: value }); if (!result.ok) { input.prompts.log?.(`Authentication failed: ${result.error}`); continue; } } const fileName = `${input.connectionId}-auth-token`; const ref = await writeProjectLocalSecretReference({ projectDir: input.projectDir, fileName, value, }); input.prompts.log?.(`Saved to .ktx/secrets/${fileName}`); return ref; } return 'env:GITHUB_TOKEN'; } } function repoOrLocalSource(args: KtxSetupSourcesArgs): { sourceDir?: string; repoUrl?: string } { if (args.sourcePath && args.sourceGitUrl) { throw new Error('Choose only one source location: --source-path or --source-git-url.'); } if (args.sourcePath) { return { sourceDir: resolve(args.sourcePath) }; } if (args.sourceGitUrl) { return { repoUrl: args.sourceGitUrl }; } throw new Error('Missing source location: pass --source-path or --source-git-url.'); } function fileRepoUrl(sourceDir: string): string { return pathToFileURL(sourceDir).toString(); } async function writeProjectConfig(projectDir: string, config: KtxProjectConfig): Promise { const project = await loadKtxProject({ projectDir }); await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); } async function writeSourceConnection( projectDir: string, connectionId: string, connection: KtxProjectConnectionConfig, adapter: string, ): Promise<() => Promise> { assertSafeConnectionId(connectionId); const project = await loadKtxProject({ projectDir }); const previousConnection = project.config.connections[connectionId]; const hadPreviousConnection = previousConnection !== undefined; const shouldRemoveAdapterOnRollback = !project.config.ingest.adapters.includes(adapter); const config = { ...project.config, connections: { ...project.config.connections, [connectionId]: connection, }, ingest: { ...project.config.ingest, adapters: project.config.ingest.adapters.includes(adapter) ? [...project.config.ingest.adapters] : [...project.config.ingest.adapters, adapter], }, }; await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); return async () => { const latest = await loadKtxProject({ projectDir }); const connections = { ...latest.config.connections }; if (hadPreviousConnection) { connections[connectionId] = previousConnection; } else { delete connections[connectionId]; } await writeProjectConfig(projectDir, { ...latest.config, connections, ingest: { ...latest.config.ingest, adapters: shouldRemoveAdapterOnRollback ? latest.config.ingest.adapters.filter((candidate) => candidate !== adapter) : latest.config.ingest.adapters, }, }); }; } async function ensureSourceAdapterEnabled(projectDir: string, source: KtxSetupSourceType): Promise { const adapter = sourceAdapter(source); const project = await loadKtxProject({ projectDir }); if (project.config.ingest.adapters.includes(adapter)) { return; } await writeProjectConfig(projectDir, { ...project.config, ingest: { ...project.config.ingest, adapters: [...project.config.ingest.adapters, adapter], }, }); } async function markSourcesComplete(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'sources'); } function hasPrimarySource(config: KtxProjectConfig): boolean { const setupPrimaryIds = config.setup?.database_connection_ids ?? []; if (setupPrimaryIds.some((connectionId) => Object.hasOwn(config.connections, connectionId))) { return true; } return Object.values(config.connections).some((connection) => PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()), ); } function buildDbtConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const source = repoOrLocalSource(args); return { driver: 'dbt', ...(source.sourceDir ? { source_dir: source.sourceDir } : {}), ...(source.repoUrl ? { repo_url: source.repoUrl } : {}), ...(args.sourceBranch ? { branch: args.sourceBranch } : {}), ...(args.sourceSubpath ? { path: args.sourceSubpath } : {}), ...(args.sourceAuthTokenRef ? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'dbt private repo access token') } : {}), ...(args.sourceProfilesPath ? { profiles_path: resolve(args.sourceProfilesPath) } : {}), ...(args.sourceTarget ? { target: args.sourceTarget } : {}), ...(args.sourceProjectName ? { project_name: args.sourceProjectName } : {}), }; } function buildMetricflowConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const source = repoOrLocalSource(args); return { driver: 'metricflow', metricflow: { repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''), ...(args.sourceBranch ? { branch: args.sourceBranch } : {}), ...(args.sourceSubpath ? { path: args.sourceSubpath } : {}), ...(args.sourceAuthTokenRef ? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'MetricFlow auth token ref') } : {}), }, }; } function buildMetabaseConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { if (!args.sourceUrl) { throw new Error('Missing Metabase URL: pass --source-url.'); } if (!args.sourceWarehouseConnectionId) { throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.'); } if (!args.metabaseDatabaseId) { throw new Error('Missing Metabase database id: pass --metabase-database-id.'); } return { driver: 'metabase', api_url: args.sourceUrl, api_key_ref: credentialRef(args.sourceApiKeyRef, 'Metabase API key ref'), mappings: { databaseMappings: { [String(args.metabaseDatabaseId)]: args.sourceWarehouseConnectionId }, syncEnabled: { [String(args.metabaseDatabaseId)]: true }, syncMode: 'ALL', selections: { collections: [], items: [] }, defaultTagNames: [], }, }; } function buildLookerConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { if (!args.sourceUrl) { throw new Error('Missing Looker base URL: pass --source-url.'); } if (!args.sourceClientId) { throw new Error('Missing Looker client id: pass --source-client-id.'); } if (!args.sourceWarehouseConnectionId) { throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.'); } return { driver: 'looker', base_url: args.sourceUrl, client_id: args.sourceClientId, client_secret_ref: credentialRef(args.sourceClientSecretRef, 'Looker client secret ref'), mappings: { connectionMappings: { [args.sourceTarget ?? args.sourceWarehouseConnectionId]: args.sourceWarehouseConnectionId, }, }, }; } function buildLookmlConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const source = repoOrLocalSource(args); return { driver: 'lookml', repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''), ...(args.sourceBranch ? { branch: args.sourceBranch } : {}), ...(args.sourceSubpath ? { path: args.sourceSubpath } : {}), ...(args.sourceAuthTokenRef ? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'LookML auth token ref') } : {}), mappings: { expectedLookerConnectionName: args.sourceTarget ?? args.sourceWarehouseConnectionId ?? null, }, }; } function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const rootPageIds = args.notionRootPageIds ?? []; const crawlMode = rootPageIds.length > 0 ? 'selected_roots' : (args.notionCrawlMode ?? 'selected_roots'); if (crawlMode === 'selected_roots' && rootPageIds.length === 0) { throw new Error('Notion selected_roots requires --notion-root-page-id.'); } return { driver: 'notion', auth_token_ref: credentialRef(args.sourceApiKeyRef, 'Notion token ref'), crawl_mode: crawlMode, ...(rootPageIds.length > 0 ? { root_page_ids: rootPageIds } : {}), root_database_ids: [], root_data_source_ids: [], max_pages_per_run: 1000, max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN, max_knowledge_updates_per_run: 20, }; } function sourcePathFromFileRepoUrl(repoUrl: string, subpath?: string): string { const root = fileURLToPath(repoUrl); return subpath ? join(root, subpath) : root; } function repoAuthToken(connection: KtxProjectConnectionConfig | Record): string | null { const ref = stringField(connection.auth_token_ref); const literal = stringField(connection.auth_token); return literal ?? resolveKtxConfigReference(ref, process.env) ?? null; } async function collectYamlFilesRecursive(sourceRoot: string): Promise> { const entries = await readdir(sourceRoot, { withFileTypes: true, recursive: true }); const files: Array<{ content: string; path: string }> = []; for (const entry of entries) { if (!entry.isFile() || !/\.ya?ml$/i.test(entry.name)) { continue; } const path = join(entry.parentPath, entry.name); files.push({ path, content: await readFile(path, 'utf-8') }); } return files; } async function defaultValidateDbt(connection: KtxProjectConnectionConfig): Promise { let sourceDir = stringField(connection.source_dir); const repoUrl = stringField(connection.repo_url); if (!sourceDir && repoUrl?.startsWith('file:')) { sourceDir = sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path)); } if (!sourceDir && repoUrl) { const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-')); try { await cloneOrPull({ repoUrl, authToken: repoAuthToken(connection), cacheDir, branch: stringField(connection.branch) ?? 'main', }); } catch (error) { const reason = error instanceof Error ? error.message : String(error); return { ok: false, message: `Failed to clone ${repoUrl}: ${reason}` }; } sourceDir = stringField(connection.path) ? join(cacheDir, String(connection.path)) : cacheDir; } if (!sourceDir) { return { ok: false, message: 'dbt setup requires --source-path or --source-git-url.' }; } const info = await loadProjectInfo(sourceDir); const schemaFiles = await loadDbtSchemaFiles(sourceDir); if (!info.projectName && typeof connection.project_name !== 'string') { return { ok: false, message: 'dbt project metadata is missing project name.' }; } return { ok: true, detail: `project=${info.projectName ?? connection.project_name} schemas=${schemaFiles.length}` }; } async function defaultValidateMetricflow(connection: KtxProjectConnectionConfig): Promise { const metricflow = isRecord(connection.metricflow) ? connection.metricflow : undefined; const repoUrl = stringField(metricflow?.repoUrl); if (!repoUrl) { return { ok: false, message: 'MetricFlow setup requires repoUrl.' }; } if (!repoUrl.startsWith('file:')) { const result = await testRepoConnection({ repoUrl, authToken: metricflow ? repoAuthToken(metricflow) : null, }); if (!result.ok) { return { ok: false, message: result.error }; } return { ok: true, detail: 'repository reachable' }; } const path = sourcePathFromFileRepoUrl(repoUrl, stringField(metricflow?.path)); const parsed = parseMetricflowFiles(await collectYamlFilesRecursive(path)); return { ok: true, detail: `semanticModels=${parsed.semanticModels.length} metrics=${parsed.crossModelMetrics.length}`, }; } async function defaultValidateLooker(projectDir: string, connectionId: string): Promise { const code = await runKtxSourceMapping( { command: 'refresh', projectDir, connectionId, autoAccept: true }, { stdout: { write() {} }, stderr: { write() {} } }, ); return code === 0 ? { ok: true, detail: 'Looker mapping refreshed' } : { ok: false, message: 'Looker validation failed' }; } async function defaultValidateLookml(connection: KtxProjectConnectionConfig): Promise { const repoUrl = stringField(connection.repoUrl); if (!repoUrl) { return { ok: false, message: 'LookML setup requires repoUrl.' }; } if (!repoUrl.startsWith('file:')) { const result = await testRepoConnection({ repoUrl, authToken: repoAuthToken(connection) }); return result.ok ? { ok: true, detail: 'repository reachable' } : { ok: false, message: result.error }; } const parsed = await parseLookmlStagedDir(sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path))); const count = parsed.models.length + parsed.views.length + parsed.dashboards.length; return count > 0 ? { ok: true, detail: `lookmlFiles=${count}` } : { ok: false, message: 'No LookML files found' }; } async function defaultValidateNotion(connection: KtxProjectConnectionConfig): Promise { const token = await resolveNotionConnectionAuthToken({ auth_token: stringField(connection.auth_token) ?? null, auth_token_ref: stringField(connection.auth_token_ref) ?? null, }); const client: NotionApi = new NotionClient(token); await client.retrieveBotUser(); const roots = Array.isArray(connection.root_page_ids) ? connection.root_page_ids.filter((id): id is string => typeof id === 'string') : []; for (const root of roots) { await client.retrievePage(root); } return { ok: true, detail: `roots=${roots.length}` }; } interface MappingJsonOutput { connectionId: string; refresh: { ok: boolean; output: string[] }; validation: { ok: boolean; output: string[] }; mappings: unknown[]; } function splitOutputLines(output: string): string[] { return output .split('\n') .map((line) => line.trim()) .filter(Boolean); } function writeSetupPrefixedLines(write: (chunk: string) => void, output: string): void { for (const line of output.split(/\r?\n/)) { if (line.length > 0) { write(`│ ${line}\n`); } } } function createSetupPrefixedIo(io: KtxCliIo): KtxCliIo { return { stdout: { isTTY: io.stdout.isTTY, columns: io.stdout.columns, write(chunk: string) { writeSetupPrefixedLines((line) => io.stdout.write(line), chunk); }, }, stderr: { write(chunk: string) { writeSetupPrefixedLines((line) => io.stderr.write(line), chunk); }, }, }; } function parseMappingListJson(output: string): unknown[] { const trimmed = output.trim(); if (!trimmed) { return []; } const parsed = JSON.parse(trimmed) as unknown; return Array.isArray(parsed) ? parsed : []; } function summarizeMappingResult(parsed: MappingJsonOutput): string { const mappingCount = parsed.mappings.length; const mappingNoun = mappingCount === 1 ? 'mapping' : 'mappings'; return `Mapping validated — ${mappingCount} ${mappingNoun} configured`; } async function defaultRunMapping(projectDir: string, connectionId: string, io: KtxCliIo): Promise { const outputs = { refresh: '', validation: '', list: '', }; const refreshCode = await runKtxSourceMapping( { command: 'refresh', projectDir, connectionId, autoAccept: true }, { stdout: { write(chunk: string) { outputs.refresh += chunk; } }, stderr: io.stderr, }, ); if (refreshCode !== 0) { return refreshCode; } const validationCode = await runKtxSourceMapping( { command: 'validate', projectDir, connectionId }, { stdout: { write(chunk: string) { outputs.validation += chunk; } }, stderr: io.stderr, }, ); if (validationCode !== 0) { return validationCode; } const listCode = await runKtxSourceMapping( { command: 'list', projectDir, connectionId, json: true }, { stdout: { write(chunk: string) { outputs.list += chunk; } }, stderr: io.stderr, }, ); if (listCode !== 0) { return listCode; } const parsed: MappingJsonOutput = { connectionId, refresh: { ok: true, output: splitOutputLines(outputs.refresh) }, validation: { ok: true, output: splitOutputLines(outputs.validation) }, mappings: parseMappingListJson(outputs.list), }; io.stdout.write(`${summarizeMappingResult(parsed)}\n`); return 0; } async function defaultRunInitialIngest( projectDir: string, connectionId: string, io: KtxCliIo, options: { inputMode: KtxSetupSourcesArgs['inputMode'] }, ): Promise { return await runKtxPublicIngest( { command: 'run', projectDir, targetConnectionId: connectionId, all: false, json: false, inputMode: options.inputMode, }, io, ); } async function runInitialSourceIngestWithRecovery(input: { args: KtxSetupSourcesArgs; connectionId: string; io: KtxCliIo; prompts: KtxSetupSourcesPromptAdapter; deps: KtxSetupSourcesDeps; }): Promise<'ready' | 'continue' | 'back' | 'failed'> { while (true) { input.io.stdout.write(`│ Building context from ${input.connectionId}. Large sources can take a while.\n`); const ingestCode = await (input.deps.runInitialIngest ?? defaultRunInitialIngest)( input.args.projectDir, input.connectionId, input.io, { inputMode: input.args.inputMode, }, ); if (ingestCode === 0) { return 'ready'; } if (input.args.inputMode === 'disabled') { return 'failed'; } const action = await input.prompts.select({ message: `Context build failed for ${input.connectionId}\nRetry now, continue setup and build this source later, or go back.`, options: [ { value: 'retry', label: 'Retry context build' }, { value: 'continue', label: 'Continue setup and build this source later' }, { value: 'back', label: 'Back' }, ], }); if (action === 'retry') { continue; } if (action === 'continue') { input.io.stdout.write(`│ Context source saved without a completed context build for ${input.connectionId}.\n`); input.io.stdout.write(`│ Run later: ktx ingest ${input.connectionId}\n`); return 'continue'; } return 'back'; } } type SourceLocationChoice = 'path' | 'git'; type SourcePromptState = KtxSetupSourcesArgs & { sourceLocation?: SourceLocationChoice; }; type SourcePromptStep = (state: SourcePromptState) => Promise<'next' | 'back'>; interface WarehouseConnectionChoice { id: string; connectionType: string; } type InteractiveSourceConnectionChoice = | { kind: 'existing'; connectionId: string; connection: KtxProjectConnectionConfig } | { kind: 'new'; args: KtxSetupSourcesArgs } | { kind: 'edited'; connectionId: string; args: KtxSetupSourcesArgs } | 'back'; type SourceSetupChoiceResult = | { status: 'ready'; connectionId: string } | { status: 'back' } | { status: 'failed' }; async function runSourcePromptSteps( initialState: SourcePromptState, stepsForState: (state: SourcePromptState) => SourcePromptStep[], ): Promise { let stepIndex = 0; while (true) { const steps = stepsForState(initialState); if (stepIndex >= steps.length) { const { sourceLocation: _sourceLocation, ...sourceArgs } = initialState; return sourceArgs; } const result = await steps[stepIndex]?.(initialState); if (result === 'back') { if (stepIndex === 0) { return 'back'; } stepIndex -= 1; continue; } stepIndex += 1; } } function resetRepoLocationFields(state: SourcePromptState): void { delete state.sourcePath; delete state.sourceGitUrl; delete state.sourceBranch; delete state.sourceAuthTokenRef; delete state.sourceSubpath; delete state.sourceProjectName; } function sourceLocationFromArgs(args: KtxSetupSourcesArgs): SourceLocationChoice | undefined { if (args.sourcePath) return 'path'; if (args.sourceGitUrl) return 'git'; return undefined; } function warehouseConnectionChoices(config: KtxProjectConfig): WarehouseConnectionChoice[] { return Object.entries(config.connections) .filter(([, connection]) => PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase())) .map(([id, connection]) => ({ id, connectionType: localConnectionTypeForConfig(id, connection) })) .sort((left, right) => left.id.localeCompare(right.id)); } async function chooseMappedWarehouseConnectionId(input: { projectDir: string; prompts: KtxSetupSourcesPromptAdapter; }): Promise { const project = await loadKtxProject({ projectDir: input.projectDir }); const choices = warehouseConnectionChoices(project.config); if (choices.length === 1) { return choices[0].id; } if (choices.length === 0) { const entered = await promptText(input.prompts, { message: 'Mapped warehouse connection id' }); return entered === undefined ? 'back' : entered; } const selected = await input.prompts.select({ message: 'Mapped warehouse connection', options: [ ...choices.map((choice) => ({ value: choice.id, label: `${choice.id} (${choice.connectionType})`, })), { value: 'back', label: 'Back' }, ], }); return selected === 'back' ? 'back' : selected; } async function defaultDiscoverMetabaseDatabases(input: { sourceUrl: string; sourceApiKeyRef: string; }): Promise { const apiKey = resolveKtxConfigReference(input.sourceApiKeyRef, process.env); if (!apiKey) { throw new Error('Metabase API key ref could not be resolved'); } const client = new MetabaseClient( { apiUrl: input.sourceUrl, apiKey }, DEFAULT_METABASE_CLIENT_CONFIG, ); try { return await discoverMetabaseDatabases(client); } finally { await client.cleanup(); } } function metabaseDatabaseLabel(database: DiscoveredMetabaseDatabase): string { const detail = [database.engine].filter(Boolean).join(', '); return detail ? `${database.id}: ${database.name} (${detail})` : `${database.id}: ${database.name}`; } async function chooseMetabaseDatabaseId(input: { state: SourcePromptState; prompts: KtxSetupSourcesPromptAdapter; deps: KtxSetupSourcesDeps; }): Promise { const sourceUrl = input.state.sourceUrl; const sourceApiKeyRef = input.state.sourceApiKeyRef; if (sourceUrl && sourceApiKeyRef) { try { const discovered = await (input.deps.discoverMetabaseDatabases ?? defaultDiscoverMetabaseDatabases)({ sourceUrl, sourceApiKeyRef, sourceConnectionId: input.state.sourceConnectionId ?? 'metabase-main', }); if (discovered.length === 1) { return discovered[0].id; } if (discovered.length > 1) { const selected = await input.prompts.select({ message: 'Metabase database', options: [ ...discovered .slice() .sort((left, right) => left.id - right.id) .map((database) => ({ value: String(database.id), label: metabaseDatabaseLabel(database), })), { value: 'back', label: 'Back' }, ], }); return selected === 'back' ? 'back' : Number.parseInt(selected, 10); } } catch { // Discovery is a convenience. Fall back to the raw id prompt when credentials // are unavailable locally or the Metabase API cannot be reached yet. } } const databaseId = await promptText(input.prompts, { message: 'Metabase database id' }); return databaseId === undefined ? 'back' : Number.parseInt(databaseId, 10); } function connectionIdPromptSteps( args: KtxSetupSourcesArgs, source: KtxSetupSourceType, prompts: KtxSetupSourcesPromptAdapter, defaultConnectionId: string, ): SourcePromptStep[] { if (args.sourceConnectionId) { return []; } return [ async (state) => { const enteredConnectionId = await promptText(prompts, { message: connectionNamePrompt(sourceLabel(source)), placeholder: defaultConnectionId, initialValue: defaultConnectionId, }); if (enteredConnectionId === undefined) { return 'back'; } state.sourceConnectionId = enteredConnectionId.trim() || defaultConnectionId; return 'next'; }, ]; } async function promptForInteractiveSource( args: KtxSetupSourcesArgs, source: KtxSetupSourceType, prompts: KtxSetupSourcesPromptAdapter, io: KtxCliIo, deps: KtxSetupSourcesDeps, defaultConnectionId = `${source}-main`, testGitRepo: KtxSetupSourcesDeps['testGitRepo'] = testRepoConnection, discoverMetabaseDatabaseList?: KtxSetupSourcesDeps['discoverMetabaseDatabases'], ): Promise { const initialState: SourcePromptState = { ...args, source, sourceLocation: sourceLocationFromArgs(args) }; if (args.sourceConnectionId) { initialState.sourceConnectionId = args.sourceConnectionId; } const connectionSteps = connectionIdPromptSteps(args, source, prompts, defaultConnectionId); if (source === 'dbt' || source === 'metricflow' || source === 'lookml') { return await runSourcePromptSteps(initialState, (state) => [ ...connectionSteps, async () => { const selectedLocation = await prompts.select({ message: `${source} source location`, options: [ { value: 'path', label: 'Local path' }, { value: 'git', label: 'Git URL' }, { value: 'back', label: 'Back' }, ], }); if (selectedLocation !== 'path' && selectedLocation !== 'git') { return 'back'; } if (state.sourceLocation !== selectedLocation) { resetRepoLocationFields(state); } state.sourceLocation = selectedLocation; return 'next'; }, ...(state.sourceLocation === 'path' ? [ async (currentState: SourcePromptState) => { const sourcePath = await promptText(prompts, { message: `${source} local path`, ...(currentState.sourcePath ? { initialValue: currentState.sourcePath } : {}), }); if (sourcePath === undefined) return 'back'; currentState.sourcePath = sourcePath; return 'next'; }, ] : []), ...(state.sourceLocation === 'git' ? [ async (currentState: SourcePromptState) => { const sourceGitUrl = await promptText(prompts, { message: `${source} git URL`, ...(currentState.sourceGitUrl ? { initialValue: currentState.sourceGitUrl } : {}), }); if (sourceGitUrl === undefined) return 'back'; currentState.sourceGitUrl = sourceGitUrl; return 'next'; }, async (currentState: SourcePromptState) => { const branch = await promptText(prompts, { message: `${source} git branch`, initialValue: currentState.sourceBranch ?? 'main', }); if (branch === undefined) return 'back'; currentState.sourceBranch = branch || 'main'; return 'next'; }, ] : []), ...(state.sourceLocation === 'git' ? [ async (currentState: SourcePromptState) => { const result = await testGitRepo!({ repoUrl: currentState.sourceGitUrl! }); if (result.ok) { delete currentState.sourceAuthTokenRef; prompts.log?.('Repository connected.'); return 'next'; } const authRef = await chooseGitAuthCredentialRef({ prompts, projectDir: args.projectDir, source, connectionId: currentState.sourceConnectionId ?? `${source}-main`, existingRef: currentState.sourceAuthTokenRef, repoUrl: currentState.sourceGitUrl, testGitRepo, }); if (authRef === 'back') return 'back'; if (authRef) { currentState.sourceAuthTokenRef = authRef; } else { delete currentState.sourceAuthTokenRef; } return 'next'; }, ] : []), ...(state.sourceLocation ? [ async (currentState: SourcePromptState) => { if (source === 'dbt') { let scanDir: string | undefined; if (currentState.sourceLocation === 'path' && currentState.sourcePath) { scanDir = currentState.sourcePath; } else if (currentState.sourceLocation === 'git' && currentState.sourceGitUrl) { try { const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-scan-')); const authToken = currentState.sourceAuthTokenRef ? resolveKtxConfigReference(currentState.sourceAuthTokenRef, process.env) : null; await cloneOrPull({ repoUrl: currentState.sourceGitUrl, authToken, cacheDir, branch: currentState.sourceBranch ?? 'main', }); scanDir = cacheDir; } catch { // Clone failed — fall through to manual prompt } } if (scanDir) { try { const subpaths = await findDbtProjectSubpaths(scanDir); if (subpaths.length === 1) { const found = subpaths[0]!; if (found) { currentState.sourceSubpath = found; prompts.log?.(`Found dbt_project.yml in ${found}/`); } else { delete currentState.sourceSubpath; } return 'next'; } if (subpaths.length > 1) { const selected = await prompts.select({ message: 'Multiple dbt projects found — which one should KTX use?', options: [ ...subpaths.map((p) => ({ value: p || '.', label: p || '(project root)' })), { value: 'back', label: 'Back' }, ], }); if (selected === 'back') return 'back'; const subpath = selected === '.' ? '' : selected; if (subpath) { currentState.sourceSubpath = subpath; } else { delete currentState.sourceSubpath; } return 'next'; } } catch { // Directory unreadable — fall through to manual prompt } } } const subpath = await promptText(prompts, { message: sourceSubpathPrompt(source), placeholder: 'optional', ...(currentState.sourceSubpath ? { initialValue: currentState.sourceSubpath } : {}), }); if (subpath === undefined) return 'back'; if (subpath) { currentState.sourceSubpath = subpath; } else { delete currentState.sourceSubpath; } return 'next'; }, ] : []), ]); } if (source === 'metabase') { return await runSourcePromptSteps(initialState, () => [ ...connectionSteps, async (state) => { const sourceUrl = await promptText(prompts, { message: 'Metabase URL', ...(state.sourceUrl ? { initialValue: state.sourceUrl } : {}), }); if (sourceUrl === undefined) return 'back'; state.sourceUrl = sourceUrl; return 'next'; }, async (state) => { const ref = await chooseSourceCredentialRef({ prompts, projectDir: args.projectDir, label: 'Metabase API key', envName: 'METABASE_API_KEY', secretFileName: `${state.sourceConnectionId ?? 'metabase-main'}-api-key`, existingRef: state.sourceApiKeyRef, }); if (ref === 'back') return 'back'; state.sourceApiKeyRef = ref; return 'next'; }, async (state) => { const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({ projectDir: args.projectDir, prompts, }); if (sourceWarehouseConnectionId === 'back') return 'back'; state.sourceWarehouseConnectionId = sourceWarehouseConnectionId; return 'next'; }, async (state) => { const databaseId = await chooseMetabaseDatabaseId({ state, prompts, deps: { discoverMetabaseDatabases: discoverMetabaseDatabaseList }, }); if (databaseId === 'back') return 'back'; state.metabaseDatabaseId = databaseId; return 'next'; }, ]); } if (source === 'looker') { return await runSourcePromptSteps(initialState, () => [ ...connectionSteps, async (state) => { const sourceUrl = await promptText(prompts, { message: 'Looker base URL', ...(state.sourceUrl ? { initialValue: state.sourceUrl } : {}), }); if (sourceUrl === undefined) return 'back'; state.sourceUrl = sourceUrl; return 'next'; }, async (state) => { const sourceClientId = await promptText(prompts, { message: 'Looker client id', ...(state.sourceClientId ? { initialValue: state.sourceClientId } : {}), }); if (sourceClientId === undefined) return 'back'; state.sourceClientId = sourceClientId; return 'next'; }, async (state) => { const ref = await chooseSourceCredentialRef({ prompts, projectDir: args.projectDir, label: 'Looker client secret', envName: 'LOOKER_CLIENT_SECRET', secretFileName: `${state.sourceConnectionId ?? 'looker-main'}-client-secret`, existingRef: state.sourceClientSecretRef, }); if (ref === 'back') return 'back'; state.sourceClientSecretRef = ref; return 'next'; }, async (state) => { const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({ projectDir: args.projectDir, prompts, }); if (sourceWarehouseConnectionId === 'back') return 'back'; state.sourceWarehouseConnectionId = sourceWarehouseConnectionId; return 'next'; }, async (state) => { const lookerConnectionName = await promptText(prompts, { message: 'Looker connection name', placeholder: 'optional', ...(state.sourceTarget ? { initialValue: state.sourceTarget } : {}), }); if (lookerConnectionName === undefined) return 'back'; if (lookerConnectionName) { state.sourceTarget = lookerConnectionName; } else { delete state.sourceTarget; } return 'next'; }, ]); } return await runSourcePromptSteps(initialState, (state) => [ ...connectionSteps, async (currentState) => { const ref = await chooseSourceCredentialRef({ prompts, projectDir: args.projectDir, label: 'Notion integration token', envName: 'NOTION_TOKEN', secretFileName: `${currentState.sourceConnectionId ?? 'notion-main'}-token`, existingRef: currentState.sourceApiKeyRef, }); if (ref === 'back') return 'back'; currentState.sourceApiKeyRef = ref; return 'next'; }, async (currentState) => { const crawlMode = await prompts.select({ message: 'Which Notion pages should KTX ingest?', options: [ { value: 'selected_roots', label: 'Specific pages and their subpages (choose them in a picker)' }, { value: 'all_accessible', label: 'All pages the integration can access' }, { value: 'back', label: 'Back' }, ], }); if (crawlMode === 'back') return 'back'; currentState.notionCrawlMode = crawlMode === 'all_accessible' ? 'all_accessible' : 'selected_roots'; if (currentState.notionCrawlMode === 'all_accessible') { delete currentState.notionRootPageIds; } return 'next'; }, ...(state.notionCrawlMode === 'selected_roots' ? [ async (currentState: SourcePromptState) => { const connectionId = currentState.sourceConnectionId ?? 'notion-main'; const result = await (deps.pickNotionRootPages ?? pickNotionRootPages)( { connectionId, connection: { driver: 'notion', auth_token_ref: credentialRef(currentState.sourceApiKeyRef, 'Notion token ref'), crawl_mode: 'selected_roots', root_page_ids: currentState.notionRootPageIds ?? [], root_database_ids: [], root_data_source_ids: [], }, }, io, ); if (result.kind === 'back') { return 'back'; } if (result.kind === 'unavailable') { io.stderr.write(`${result.message}\n`); return 'back'; } currentState.notionRootPageIds = result.rootPageIds; return 'next'; }, ] : []), ]); } function existingConnectionIdsBySource( connections: Record, source: KtxSetupSourceType, ): string[] { return Object.entries(connections) .filter(([, connection]) => String(connection.driver ?? '').toLowerCase() === source) .map(([connectionId]) => connectionId) .sort((left, right) => left.localeCompare(right)); } function sourceTypeForConnection(connection: KtxProjectConnectionConfig): KtxSetupSourceType | null { const driver = String(connection.driver ?? '').toLowerCase(); return SOURCE_OPTIONS.some((option) => option.value === driver) ? (driver as KtxSetupSourceType) : null; } function contextSourceEditTargets(connections: Record): Array<{ connectionId: string; source: KtxSetupSourceType; }> { return Object.entries(connections) .map(([connectionId, connection]) => { const source = sourceTypeForConnection(connection); return source ? { connectionId, source } : null; }) .filter((target): target is { connectionId: string; source: KtxSetupSourceType } => target !== null) .sort((left, right) => left.connectionId.localeCompare(right.connectionId)); } function sourceChecklistForConnections(connections: Record): { options: Array<{ value: KtxSetupSourceType; label: string; hint?: string }>; initialValues: KtxSetupSourceType[]; } { const initialValues: KtxSetupSourceType[] = []; const options = SOURCE_OPTIONS.map((option) => { const existingIds = existingConnectionIdsBySource(connections, option.value); if (existingIds.length === 0) { return option; } initialValues.push(option.value); return { ...option, hint: `configured: ${existingIds.join(', ')}` }; }); return { options, initialValues }; } function defaultConnectionIdForSource( connections: Record, source: KtxSetupSourceType, ): string { const base = `${source}-main`; if (!connections[base]) { return base; } let index = 2; while (connections[`${base}-${index}`]) { index += 1; } return `${base}-${index}`; } function firstStringRecordEntry(value: unknown): [string, string] | undefined { if (!isRecord(value)) return undefined; for (const [key, raw] of Object.entries(value)) { if (typeof raw === 'string' && raw.trim().length > 0) { return [key, raw.trim()]; } } return undefined; } function applyRepoSourceArgs( args: KtxSetupSourcesArgs, input: { repoUrl?: string; sourceDir?: string; branch?: string; subpath?: string; authTokenRef?: string }, ): void { if (input.sourceDir) { args.sourcePath = input.sourceDir; } else if (input.repoUrl?.startsWith('file:')) { args.sourcePath = fileURLToPath(input.repoUrl); } else if (input.repoUrl) { args.sourceGitUrl = input.repoUrl; } if (input.branch) args.sourceBranch = input.branch; if (input.subpath) args.sourceSubpath = input.subpath; if (input.authTokenRef) args.sourceAuthTokenRef = input.authTokenRef; } function sourceArgsFromExistingConnection(input: { args: KtxSetupSourcesArgs; source: KtxSetupSourceType; connectionId: string; connection: KtxProjectConnectionConfig; }): KtxSetupSourcesArgs { const sourceArgs: KtxSetupSourcesArgs = { projectDir: input.args.projectDir, inputMode: input.args.inputMode, source: input.source, sourceConnectionId: input.connectionId, runInitialSourceIngest: input.args.runInitialSourceIngest, skipSources: input.args.skipSources, }; if (input.source === 'dbt') { applyRepoSourceArgs(sourceArgs, { sourceDir: stringField(input.connection.source_dir), repoUrl: stringField(input.connection.repo_url), branch: stringField(input.connection.branch), subpath: stringField(input.connection.path), authTokenRef: stringField(input.connection.auth_token_ref), }); const profilesPath = stringField(input.connection.profiles_path); const target = stringField(input.connection.target); const projectName = stringField(input.connection.project_name); if (profilesPath) sourceArgs.sourceProfilesPath = profilesPath; if (target) sourceArgs.sourceTarget = target; if (projectName) sourceArgs.sourceProjectName = projectName; return sourceArgs; } if (input.source === 'metricflow') { const metricflow = isRecord(input.connection.metricflow) ? input.connection.metricflow : {}; applyRepoSourceArgs(sourceArgs, { repoUrl: stringField(metricflow.repoUrl), branch: stringField(metricflow.branch), subpath: stringField(metricflow.path), authTokenRef: stringField(metricflow.auth_token_ref), }); return sourceArgs; } if (input.source === 'lookml') { applyRepoSourceArgs(sourceArgs, { repoUrl: stringField(input.connection.repoUrl), branch: stringField(input.connection.branch), subpath: stringField(input.connection.path), authTokenRef: stringField(input.connection.auth_token_ref), }); const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {}; const expectedLookerConnectionName = stringField(mappings.expectedLookerConnectionName); if (expectedLookerConnectionName) sourceArgs.sourceTarget = expectedLookerConnectionName; return sourceArgs; } if (input.source === 'metabase') { sourceArgs.sourceUrl = stringField(input.connection.api_url); sourceArgs.sourceApiKeyRef = stringField(input.connection.api_key_ref); const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {}; const databaseMapping = firstStringRecordEntry(mappings.databaseMappings); if (databaseMapping) { sourceArgs.metabaseDatabaseId = Number.parseInt(databaseMapping[0], 10); sourceArgs.sourceWarehouseConnectionId = databaseMapping[1]; } return sourceArgs; } if (input.source === 'looker') { sourceArgs.sourceUrl = stringField(input.connection.base_url); sourceArgs.sourceClientId = stringField(input.connection.client_id); sourceArgs.sourceClientSecretRef = stringField(input.connection.client_secret_ref); const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {}; const connectionMapping = firstStringRecordEntry(mappings.connectionMappings); if (connectionMapping) { sourceArgs.sourceTarget = connectionMapping[0]; sourceArgs.sourceWarehouseConnectionId = connectionMapping[1]; } return sourceArgs; } sourceArgs.sourceApiKeyRef = stringField(input.connection.auth_token_ref); sourceArgs.notionCrawlMode = input.connection.crawl_mode === 'all_accessible' ? 'all_accessible' : 'selected_roots'; if (Array.isArray(input.connection.root_page_ids)) { sourceArgs.notionRootPageIds = input.connection.root_page_ids.filter( (pageId): pageId is string => typeof pageId === 'string', ); } return sourceArgs; } async function promptEditedSourceConnection(input: { args: KtxSetupSourcesArgs; source: KtxSetupSourceType; connectionId: string; connection: KtxProjectConnectionConfig; prompts: KtxSetupSourcesPromptAdapter; io: KtxCliIo; testGitRepo?: KtxSetupSourcesDeps['testGitRepo']; pickNotionRootPages?: KtxSetupSourcesDeps['pickNotionRootPages']; discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases']; }): Promise | 'back'> { const sourceArgs = await promptForInteractiveSource( sourceArgsFromExistingConnection({ args: input.args, source: input.source, connectionId: input.connectionId, connection: input.connection, }), input.source, input.prompts, input.io, { pickNotionRootPages: input.pickNotionRootPages, discoverMetabaseDatabases: input.discoverMetabaseDatabases, }, input.connectionId, input.testGitRepo, input.discoverMetabaseDatabases, ); return sourceArgs === 'back' ? 'back' : { kind: 'edited', connectionId: input.connectionId, args: sourceArgs }; } async function chooseContextSourceToEdit(input: { projectDir: string; prompts: KtxSetupSourcesPromptAdapter; }): Promise<{ connectionId: string; source: KtxSetupSourceType } | 'back'> { const project = await loadKtxProject({ projectDir: input.projectDir }); const targets = contextSourceEditTargets(project.config.connections); if (targets.length === 0) return 'back'; const choice = await input.prompts.select({ message: 'Context source to edit', options: [ ...targets.map((target) => ({ value: target.connectionId, label: `${target.connectionId} (${sourceLabel(target.source)})`, })), { value: 'back', label: 'Back' }, ], }); if (choice === 'back') return 'back'; const target = targets.find((candidate) => candidate.connectionId === choice); return target ?? 'back'; } async function chooseInteractiveSourceConnection(input: { args: KtxSetupSourcesArgs; source: KtxSetupSourceType; connections: Record; prompts: KtxSetupSourcesPromptAdapter; io: KtxCliIo; testGitRepo?: KtxSetupSourcesDeps['testGitRepo']; pickNotionRootPages?: KtxSetupSourcesDeps['pickNotionRootPages']; discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases']; }): Promise { const existingIds = existingConnectionIdsBySource(input.connections, input.source); const defaultConnectionId = defaultConnectionIdForSource(input.connections, input.source); const label = sourceLabel(input.source); if (existingIds.length === 0) { const sourceArgs = await promptForInteractiveSource( input.args, input.source, input.prompts, input.io, { pickNotionRootPages: input.pickNotionRootPages, discoverMetabaseDatabases: input.discoverMetabaseDatabases, }, defaultConnectionId, input.testGitRepo, input.discoverMetabaseDatabases, ); return sourceArgs === 'back' ? 'back' : { kind: 'new', args: sourceArgs }; } while (true) { const choice = await input.prompts.select({ message: `Configure ${label}`, options: [ ...existingIds.map((connectionId) => ({ value: `existing:${connectionId}`, label: `Use existing ${label} connection: ${connectionId}`, })), ...existingIds.map((connectionId) => ({ value: `edit:${connectionId}`, label: `Edit existing ${label} connection: ${connectionId}`, })), { value: 'new', label: `Add new ${label} connection` }, { value: 'back', label: 'Back' }, ], }); if (choice === 'back') return 'back'; if (choice.startsWith('existing:')) { const connectionId = choice.slice('existing:'.length); const connection = input.connections[connectionId]; if (connection) { return { kind: 'existing', connectionId, connection }; } continue; } if (choice.startsWith('edit:')) { const connectionId = choice.slice('edit:'.length); const connection = input.connections[connectionId]; if (!connection) { continue; } const edited = await promptEditedSourceConnection({ args: input.args, source: input.source, connectionId, connection, prompts: input.prompts, io: input.io, testGitRepo: input.testGitRepo, pickNotionRootPages: input.pickNotionRootPages, discoverMetabaseDatabases: input.discoverMetabaseDatabases, }); if (edited === 'back') { continue; } return edited; } const sourceArgs = await promptForInteractiveSource( input.args, input.source, input.prompts, input.io, { pickNotionRootPages: input.pickNotionRootPages, discoverMetabaseDatabases: input.discoverMetabaseDatabases, }, defaultConnectionId, input.testGitRepo, input.discoverMetabaseDatabases, ); if (sourceArgs === 'back') { continue; } return { kind: 'new', args: sourceArgs }; } } function buildConnection(source: KtxSetupSourceType, args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { if (source === 'dbt') { return buildDbtConnection(args); } if (source === 'metricflow') { return buildMetricflowConnection(args); } if (source === 'metabase') { return buildMetabaseConnection(args); } if (source === 'looker') { return buildLookerConnection(args); } if (source === 'lookml') { return buildLookmlConnection(args); } return buildNotionConnection(args); } async function validateSource( source: KtxSetupSourceType, args: { projectDir: string; connectionId: string; connection: KtxProjectConnectionConfig }, deps: KtxSetupSourcesDeps, ): Promise { if (source === 'dbt') { return await (deps.validateDbt ?? defaultValidateDbt)(args.connection); } if (source === 'metricflow') { return await (deps.validateMetricflow ?? defaultValidateMetricflow)(args.connection); } if (source === 'metabase') { return deps.validateMetabase ? await deps.validateMetabase(args.projectDir, args.connectionId) : { ok: true, detail: 'mapping validation runs after the connection is saved' }; } if (source === 'looker') { return await (deps.validateLooker ?? defaultValidateLooker)(args.projectDir, args.connectionId); } if (source === 'lookml') { return await (deps.validateLookml ?? defaultValidateLookml)(args.connection); } return await (deps.validateNotion ?? defaultValidateNotion)(args.connection); } async function saveValidateAndMaybeBuildSource(input: { args: KtxSetupSourcesArgs; source: KtxSetupSourceType; sourceChoice: Exclude; prompts: KtxSetupSourcesPromptAdapter; io: KtxCliIo; deps: KtxSetupSourcesDeps; }): Promise { const connectionId = input.sourceChoice.kind === 'existing' ? input.sourceChoice.connectionId : input.sourceChoice.kind === 'edited' ? input.sourceChoice.connectionId : (input.sourceChoice.args.sourceConnectionId ?? `${input.source}-main`); const connection = input.sourceChoice.kind === 'existing' ? input.sourceChoice.connection : buildConnection(input.source, input.sourceChoice.args); const rollback = input.sourceChoice.kind === 'existing' ? undefined : await writeSourceConnection( input.args.projectDir, connectionId, connection, sourceAdapter(input.source), ); if (input.sourceChoice.kind === 'existing') { await ensureSourceAdapterEnabled(input.args.projectDir, input.source); } const validation = await validateSource( input.source, { projectDir: input.args.projectDir, connectionId, connection }, input.deps, ); if (!validation.ok) { await rollback?.(); input.io.stderr.write(`${validation.message}\n`); return { status: 'failed' }; } if (input.source === 'metabase' || input.source === 'looker') { input.prompts.log?.(`Validating ${sourceLabel(input.source)} mapping…`); const mappingCode = await (input.deps.runMapping ?? defaultRunMapping)( input.args.projectDir, connectionId, createSetupPrefixedIo(input.io), ); if (mappingCode !== 0) { await rollback?.(); return { status: 'failed' }; } } if (input.args.runInitialSourceIngest) { const ingestResult = await runInitialSourceIngestWithRecovery({ args: input.args, connectionId, io: input.io, prompts: input.prompts, deps: input.deps, }); if (ingestResult === 'failed') { await rollback?.(); return { status: 'failed' }; } if (ingestResult === 'back') { await rollback?.(); return { status: 'back' }; } } else { input.io.stdout.write(`│ Context source ${connectionId} saved. It will be built during the context build step.\n`); } return { status: 'ready', connectionId }; } export async function runKtxSetupSourcesStep( args: KtxSetupSourcesArgs, io: KtxCliIo, deps: KtxSetupSourcesDeps = {}, ): Promise { try { if (args.skipSources) { await markSourcesComplete(args.projectDir); io.stdout.write('│ Context source setup skipped.\n'); return { status: 'skipped', projectDir: args.projectDir }; } const prompts = deps.prompts ?? createPromptAdapter(); const project = await loadKtxProject({ projectDir: args.projectDir }); if (!hasPrimarySource(project.config)) { const message = 'Connect a database before adding context sources.'; if (args.source) { io.stderr.write(`${message}\n`); return { status: 'failed', projectDir: args.projectDir }; } if (args.inputMode !== 'disabled') { io.stdout.write(`│ ${message}\n`); return { status: 'skipped', projectDir: args.projectDir }; } } while (true) { const contextSourceChecklist = sourceChecklistForConnections( (await loadKtxProject({ projectDir: args.projectDir })).config.connections, ); const selected = args.source ? [args.source] : args.inputMode === 'disabled' ? [] : await prompts.multiselect({ message: withMultiselectNavigation('Which context sources should KTX ingest?'), options: contextSourceChecklist.options, ...(contextSourceChecklist.initialValues.length > 0 ? { initialValues: contextSourceChecklist.initialValues } : {}), required: false, }); if (selected.includes('back')) { return { status: 'back', projectDir: args.projectDir }; } if (selected.length === 0) { if (args.inputMode === 'disabled') { io.stderr.write('Missing context source selection: pass --source or --skip-sources.\n'); return { status: 'missing-input', projectDir: args.projectDir }; } await markSourcesComplete(args.projectDir); io.stdout.write('│ No context sources selected.\n'); return { status: 'skipped', projectDir: args.projectDir }; } const readyConnectionIds: string[] = []; let returnToSourceSelection = false; for (const source of selected as KtxSetupSourceType[]) { const sourceChoice = args.source ? ({ kind: 'new', args } as const) : await chooseInteractiveSourceConnection({ args, source, connections: (await loadKtxProject({ projectDir: args.projectDir })).config.connections, prompts, io, testGitRepo: deps.testGitRepo, pickNotionRootPages: deps.pickNotionRootPages, discoverMetabaseDatabases: deps.discoverMetabaseDatabases, }); if (sourceChoice === 'back') { if (args.source) { return { status: 'back', projectDir: args.projectDir }; } returnToSourceSelection = true; break; } const choiceResult = await saveValidateAndMaybeBuildSource({ args, source, sourceChoice, prompts, io, deps, }); if (choiceResult.status === 'failed') { if (args.source) { return { status: 'failed', projectDir: args.projectDir }; } prompts.log?.('Edit the connection or pick a different source to continue.'); returnToSourceSelection = true; break; } if (choiceResult.status === 'back') { if (args.source) { return { status: 'back', projectDir: args.projectDir }; } returnToSourceSelection = true; break; } if (!readyConnectionIds.includes(choiceResult.connectionId)) { readyConnectionIds.push(choiceResult.connectionId); } } if (returnToSourceSelection) { continue; } if (readyConnectionIds.length > 0 && !args.source && args.inputMode !== 'disabled') { let restartSourceSelection = false; while (true) { const addMore = await prompts.select({ message: `${readyConnectionIds.length} context source${readyConnectionIds.length > 1 ? 's' : ''} configured (${readyConnectionIds.join(', ')}). Add another?`, options: [ { value: 'done', label: 'Done — continue to context build' }, { value: 'edit', label: 'Edit an existing context source' }, { value: 'add', label: 'Add another context source' }, ], }); if (addMore === 'add') { restartSourceSelection = true; break; } if (addMore === 'edit') { const editTarget = await chooseContextSourceToEdit({ projectDir: args.projectDir, prompts }); if (editTarget === 'back') { continue; } const projectForEdit = await loadKtxProject({ projectDir: args.projectDir }); const connection = projectForEdit.config.connections[editTarget.connectionId]; if (!connection) { continue; } const sourceChoice = await promptEditedSourceConnection({ args, source: editTarget.source, connectionId: editTarget.connectionId, connection, prompts, io, testGitRepo: deps.testGitRepo, pickNotionRootPages: deps.pickNotionRootPages, discoverMetabaseDatabases: deps.discoverMetabaseDatabases, }); if (sourceChoice === 'back') { continue; } const choiceResult = await saveValidateAndMaybeBuildSource({ args, source: editTarget.source, sourceChoice, prompts, io, deps, }); if (choiceResult.status === 'failed') { prompts.log?.('Edit the connection or pick a different source to continue.'); continue; } if (choiceResult.status === 'back') { continue; } if (!readyConnectionIds.includes(choiceResult.connectionId)) { readyConnectionIds.push(choiceResult.connectionId); } continue; } break; } if (restartSourceSelection) { continue; } } await markSourcesComplete(args.projectDir); return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds }; } } catch (error) { io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); return { status: 'failed', projectDir: args.projectDir }; } }