import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join, resolve } from 'node:path'; import { fileURLToPath, pathToFileURL } from 'node:url'; import { cancel, isCancel, log, multiselect, select, text } from '@clack/prompts'; import { resolveNotionAuthToken } from '@ktx/context/connections'; import { resolveKtxConfigReference } from '@ktx/context/core'; import { cloneOrPull, loadDbtSchemaFiles, loadProjectInfo, type NotionApi, NotionClient, parseLookmlStagedDir, parseMetricflowFiles, testRepoConnection, } from '@ktx/context/ingest'; import { type KtxProjectConfig, type KtxProjectConnectionConfig, loadKtxProject, markKtxSetupStepComplete, serializeKtxProjectConfig, } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; import { runKtxConnectionMapping } from './commands/connection-mapping.js'; import { runKtxConnection } from './connection.js'; import { withMenuOptionsSpacing, withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js'; import { runKtxPublicIngest } from './public-ingest.js'; import { withSetupInterruptConfirmation } from './setup-interrupt.js'; export type KtxSetupSourceType = 'dbt' | 'metricflow' | 'metabase' | 'looker' | 'lookml' | 'notion'; export interface KtxSetupSourcesArgs { projectDir: string; inputMode: 'auto' | 'disabled'; source?: KtxSetupSourceType; sourceConnectionId?: string; sourcePath?: string; sourceGitUrl?: string; sourceBranch?: string; sourceSubpath?: string; sourceAuthTokenRef?: string; sourceUrl?: string; sourceApiKeyRef?: string; sourceClientId?: string; sourceClientSecretRef?: string; sourceWarehouseConnectionId?: string; sourceProjectName?: string; sourceProfilesPath?: string; sourceTarget?: string; metabaseDatabaseId?: number; notionCrawlMode?: 'all_accessible' | 'selected_roots'; notionRootPageIds?: string[]; runInitialSourceIngest: boolean; skipSources: boolean; } export type KtxSetupSourcesResult = | { status: 'ready'; projectDir: string; connectionIds: string[] } | { status: 'skipped'; projectDir: string } | { status: 'back'; projectDir: string } | { status: 'missing-input'; projectDir: string } | { status: 'failed'; projectDir: string }; export interface KtxSetupSourcesPromptAdapter { multiselect(options: { message: string; options: Array<{ value: string; label: string }>; required?: boolean; }): Promise; select(options: { message: string; options: Array<{ value: string; label: string }> }): Promise; text(options: { message: string; placeholder?: string; initialValue?: string }): Promise; cancel(message: string): void; log?(message: string): void; } export type SourceValidationResult = { ok: true; detail?: string } | { ok: false; message: string }; export interface KtxSetupSourcesDeps { prompts?: KtxSetupSourcesPromptAdapter; testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>; validateDbt?: (connection: KtxProjectConnectionConfig) => Promise; validateMetricflow?: (connection: KtxProjectConnectionConfig) => Promise; validateMetabase?: (projectDir: string, connectionId: string) => Promise; validateLooker?: (projectDir: string, connectionId: string) => Promise; validateLookml?: (connection: KtxProjectConnectionConfig) => Promise; validateNotion?: (connection: KtxProjectConnectionConfig) => Promise; runMapping?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise; runInitialIngest?: ( projectDir: string, connectionId: string, io: KtxCliIo, options: { inputMode: KtxSetupSourcesArgs['inputMode'] }, ) => Promise; } const SOURCE_OPTIONS: Array<{ value: KtxSetupSourceType; label: string }> = [ { value: 'dbt', label: 'dbt' }, { value: 'metricflow', label: 'MetricFlow' }, { value: 'metabase', label: 'Metabase' }, { value: 'looker', label: 'Looker' }, { value: 'lookml', label: 'LookML' }, { value: 'notion', label: 'Notion' }, ]; const SOURCE_LABELS = Object.fromEntries(SOURCE_OPTIONS.map((option) => [option.value, option.label])) as Record< KtxSetupSourceType, string >; const PRIMARY_SOURCE_DRIVERS = new Set([ 'sqlite', 'postgres', 'mysql', 'clickhouse', 'sqlserver', 'bigquery', 'snowflake', ]); function createPromptAdapter(): KtxSetupSourcesPromptAdapter { return { async multiselect(options) { const value = await withSetupInterruptConfirmation(() => multiselect(withMenuOptionsSpacing(options))); if (isCancel(value)) { cancel('Setup cancelled.'); return ['back']; } return [...value] as string[]; }, async select(options) { const value = await withSetupInterruptConfirmation(() => select(withMenuOptionsSpacing(options))); if (isCancel(value)) { cancel('Setup cancelled.'); return 'back'; } return String(value); }, async text(options) { const value = await withSetupInterruptConfirmation(() => text({ ...options, message: withTextInputNavigation(options.message) }), ); return isCancel(value) ? undefined : String(value); }, cancel(message) { cancel(message); }, log(message) { log.info(message); }, }; } function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); } function stringField(value: unknown): string | undefined { return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined; } function sourceLabel(source: KtxSetupSourceType): string { return SOURCE_LABELS[source]; } function sourceAdapter(source: KtxSetupSourceType): string { return source; } function connectionNamePrompt(label: string): string { return `Name this ${label} connection\nKTX will use this short name in commands and config. You can rename it now.`; } function gitAuthAfterFailurePrompt(source: KtxSetupSourceType): string { const label = source === 'dbt' ? 'This' : `This ${sourceLabel(source)}`; return [ `${label} repo requires authentication.`, 'Generate a token at: https://github.com/settings/tokens/new', 'Store it in an env var, then enter env:VARIABLE_NAME here (e.g. env:GITHUB_TOKEN).', 'Or use file:/absolute/path if the token is stored in a file.', 'Press Enter to skip and try without authentication anyway.', ].join('\n'); } function sourceSubpathPrompt(source: KtxSetupSourceType): string { if (source === 'dbt') { return [ 'Folder containing dbt_project.yml (optional)', 'Press Enter when dbt_project.yml is at the repo root.', 'For monorepos, enter a relative path like analytics/dbt.', ].join('\n'); } return [ `${sourceLabel(source)} project folder (optional)`, 'If the project files are inside a subfolder, enter that path.', 'Press Enter if the path or repo already points at the project.', ].join('\n'); } async function promptText( prompts: KtxSetupSourcesPromptAdapter, options: { message: string; placeholder?: string; initialValue?: string }, ): Promise { return await prompts.text({ ...options, message: withTextInputNavigation(options.message) }); } function assertSafeConnectionId(connectionId: string): void { if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) { throw new Error(`Unsafe connection id: ${connectionId}`); } } function credentialRef(value: string | undefined, label: string): string { const ref = value?.trim(); if (!ref) { throw new Error(`Missing ${label}; use env:NAME or file:/absolute/path`); } if (!ref.startsWith('env:') && !ref.startsWith('file:')) { throw new Error(`${label} must use env:NAME or file:/absolute/path`); } return ref; } function repoOrLocalSource(args: KtxSetupSourcesArgs): { sourceDir?: string; repoUrl?: string } { if (args.sourcePath && args.sourceGitUrl) { throw new Error('Choose only one source location: --source-path or --source-git-url.'); } if (args.sourcePath) { return { sourceDir: resolve(args.sourcePath) }; } if (args.sourceGitUrl) { return { repoUrl: args.sourceGitUrl }; } throw new Error('Missing source location: pass --source-path or --source-git-url.'); } function fileRepoUrl(sourceDir: string): string { return pathToFileURL(sourceDir).toString(); } async function writeProjectConfig(projectDir: string, config: KtxProjectConfig): Promise { const project = await loadKtxProject({ projectDir }); await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); } async function writeSourceConnection( projectDir: string, connectionId: string, connection: KtxProjectConnectionConfig, adapter: string, ): Promise<() => Promise> { assertSafeConnectionId(connectionId); const project = await loadKtxProject({ projectDir }); const previousConnection = project.config.connections[connectionId]; const hadPreviousConnection = previousConnection !== undefined; const shouldRemoveAdapterOnRollback = !project.config.ingest.adapters.includes(adapter); const config = { ...project.config, connections: { ...project.config.connections, [connectionId]: connection, }, ingest: { ...project.config.ingest, adapters: project.config.ingest.adapters.includes(adapter) ? [...project.config.ingest.adapters] : [...project.config.ingest.adapters, adapter], }, }; await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); return async () => { const latest = await loadKtxProject({ projectDir }); const connections = { ...latest.config.connections }; if (hadPreviousConnection) { connections[connectionId] = previousConnection; } else { delete connections[connectionId]; } await writeProjectConfig(projectDir, { ...latest.config, connections, ingest: { ...latest.config.ingest, adapters: shouldRemoveAdapterOnRollback ? latest.config.ingest.adapters.filter((candidate) => candidate !== adapter) : latest.config.ingest.adapters, }, }); }; } async function ensureSourceAdapterEnabled(projectDir: string, source: KtxSetupSourceType): Promise { const adapter = sourceAdapter(source); const project = await loadKtxProject({ projectDir }); if (project.config.ingest.adapters.includes(adapter)) { return; } await writeProjectConfig(projectDir, { ...project.config, ingest: { ...project.config.ingest, adapters: [...project.config.ingest.adapters, adapter], }, }); } async function markSourcesComplete(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); await writeFile( project.configPath, serializeKtxProjectConfig(markKtxSetupStepComplete(project.config, 'sources')), 'utf-8', ); } function hasPrimarySource(config: KtxProjectConfig): boolean { const setupPrimaryIds = config.setup?.database_connection_ids ?? []; if (setupPrimaryIds.some((connectionId) => Object.hasOwn(config.connections, connectionId))) { return true; } return Object.values(config.connections).some((connection) => PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()), ); } function buildDbtConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const source = repoOrLocalSource(args); return { driver: 'dbt', ...(source.sourceDir ? { source_dir: source.sourceDir } : {}), ...(source.repoUrl ? { repo_url: source.repoUrl } : {}), ...(args.sourceBranch ? { branch: args.sourceBranch } : {}), ...(args.sourceSubpath ? { path: args.sourceSubpath } : {}), ...(args.sourceAuthTokenRef ? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'dbt private repo access token') } : {}), ...(args.sourceProfilesPath ? { profiles_path: resolve(args.sourceProfilesPath) } : {}), ...(args.sourceTarget ? { target: args.sourceTarget } : {}), ...(args.sourceProjectName ? { project_name: args.sourceProjectName } : {}), }; } function buildMetricflowConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const source = repoOrLocalSource(args); return { driver: 'metricflow', metricflow: { repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''), ...(args.sourceBranch ? { branch: args.sourceBranch } : {}), ...(args.sourceSubpath ? { path: args.sourceSubpath } : {}), ...(args.sourceAuthTokenRef ? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'MetricFlow auth token ref') } : {}), }, }; } function buildMetabaseConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { if (!args.sourceUrl) { throw new Error('Missing Metabase URL: pass --source-url.'); } if (!args.sourceWarehouseConnectionId) { throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.'); } if (!args.metabaseDatabaseId) { throw new Error('Missing Metabase database id: pass --metabase-database-id.'); } return { driver: 'metabase', api_url: args.sourceUrl, api_key_ref: credentialRef(args.sourceApiKeyRef, 'Metabase API key ref'), mappings: { databaseMappings: { [String(args.metabaseDatabaseId)]: args.sourceWarehouseConnectionId }, syncEnabled: { [String(args.metabaseDatabaseId)]: true }, syncMode: 'ONLY', }, }; } function buildLookerConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { if (!args.sourceUrl) { throw new Error('Missing Looker base URL: pass --source-url.'); } if (!args.sourceClientId) { throw new Error('Missing Looker client id: pass --source-client-id.'); } if (!args.sourceWarehouseConnectionId) { throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.'); } return { driver: 'looker', base_url: args.sourceUrl, client_id: args.sourceClientId, client_secret_ref: credentialRef(args.sourceClientSecretRef, 'Looker client secret ref'), mappings: { connectionMappings: { [args.sourceTarget ?? args.sourceWarehouseConnectionId]: args.sourceWarehouseConnectionId, }, }, }; } function buildLookmlConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const source = repoOrLocalSource(args); return { driver: 'lookml', repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''), ...(args.sourceBranch ? { branch: args.sourceBranch } : {}), ...(args.sourceSubpath ? { path: args.sourceSubpath } : {}), ...(args.sourceAuthTokenRef ? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'LookML auth token ref') } : {}), mappings: { expectedLookerConnectionName: args.sourceTarget ?? args.sourceWarehouseConnectionId ?? null, }, }; } function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { const crawlMode = args.notionCrawlMode ?? 'selected_roots'; const rootPageIds = args.notionRootPageIds ?? []; if (crawlMode === 'selected_roots' && rootPageIds.length === 0) { throw new Error('Notion selected_roots requires --notion-root-page-id.'); } return { driver: 'notion', auth_token_ref: credentialRef(args.sourceApiKeyRef, 'Notion token ref'), crawl_mode: crawlMode, root_page_ids: rootPageIds, root_database_ids: [], root_data_source_ids: [], max_pages_per_run: 1000, max_knowledge_creates_per_run: 5, max_knowledge_updates_per_run: 20, last_successful_cursor: null, }; } function sourcePathFromFileRepoUrl(repoUrl: string, subpath?: string): string { const root = fileURLToPath(repoUrl); return subpath ? join(root, subpath) : root; } function repoAuthToken(connection: KtxProjectConnectionConfig | Record): string | null { const ref = stringField(connection.auth_token_ref) ?? stringField(connection.authTokenRef); const literal = stringField(connection.authToken) ?? stringField(connection.auth_token); return literal ?? resolveKtxConfigReference(ref, process.env) ?? null; } async function collectYamlFilesRecursive(sourceRoot: string): Promise> { const entries = await readdir(sourceRoot, { withFileTypes: true, recursive: true }); const files: Array<{ content: string; path: string }> = []; for (const entry of entries) { if (!entry.isFile() || !/\.ya?ml$/i.test(entry.name)) { continue; } const path = join(entry.parentPath, entry.name); files.push({ path, content: await readFile(path, 'utf-8') }); } return files; } async function defaultValidateDbt(connection: KtxProjectConnectionConfig): Promise { let sourceDir = stringField(connection.source_dir) ?? stringField(connection.sourceDir); const repoUrl = stringField(connection.repo_url) ?? stringField(connection.repoUrl); if (!sourceDir && repoUrl?.startsWith('file:')) { sourceDir = sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path)); } if (!sourceDir && repoUrl) { const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-')); await cloneOrPull({ repoUrl, authToken: repoAuthToken(connection), cacheDir, branch: stringField(connection.branch) ?? 'main', }); sourceDir = stringField(connection.path) ? join(cacheDir, String(connection.path)) : cacheDir; } if (!sourceDir) { return { ok: false, message: 'dbt setup requires --source-path or --source-git-url.' }; } const info = await loadProjectInfo(sourceDir); const schemaFiles = await loadDbtSchemaFiles(sourceDir); if (!info.projectName && typeof connection.project_name !== 'string') { return { ok: false, message: 'dbt project metadata is missing project name.' }; } return { ok: true, detail: `project=${info.projectName ?? connection.project_name} schemas=${schemaFiles.length}` }; } async function defaultValidateMetricflow(connection: KtxProjectConnectionConfig): Promise { const metricflow = isRecord(connection.metricflow) ? connection.metricflow : undefined; const repoUrl = stringField(metricflow?.repoUrl); if (!repoUrl) { return { ok: false, message: 'MetricFlow setup requires repoUrl.' }; } if (!repoUrl.startsWith('file:')) { const result = await testRepoConnection({ repoUrl, authToken: metricflow ? repoAuthToken(metricflow) : null, }); if (!result.ok) { return { ok: false, message: result.error }; } return { ok: true, detail: 'repository reachable' }; } const path = sourcePathFromFileRepoUrl(repoUrl, stringField(metricflow?.path)); const parsed = parseMetricflowFiles(await collectYamlFilesRecursive(path)); return { ok: true, detail: `semanticModels=${parsed.semanticModels.length} metrics=${parsed.crossModelMetrics.length}`, }; } async function defaultValidateMetabase(projectDir: string, connectionId: string): Promise { const code = await runKtxConnection( { command: 'map', projectDir, sourceConnectionId: connectionId, json: true }, { stdout: { write() {} }, stderr: { write() {} } }, ); return code === 0 ? { ok: true, detail: 'mapping validated' } : { ok: false, message: 'Metabase mapping validation failed' }; } async function defaultValidateLooker(projectDir: string, connectionId: string): Promise { const code = await runKtxConnectionMapping( { command: 'refresh', projectDir, connectionId, autoAccept: true }, { stdout: { write() {} }, stderr: { write() {} } }, ); return code === 0 ? { ok: true, detail: 'Looker mapping refreshed' } : { ok: false, message: 'Looker validation failed' }; } async function defaultValidateLookml(connection: KtxProjectConnectionConfig): Promise { const repoUrl = stringField(connection.repoUrl) ?? stringField(connection.repo_url); if (!repoUrl) { return { ok: false, message: 'LookML setup requires repoUrl.' }; } if (!repoUrl.startsWith('file:')) { const result = await testRepoConnection({ repoUrl, authToken: repoAuthToken(connection) }); return result.ok ? { ok: true, detail: 'repository reachable' } : { ok: false, message: result.error }; } const parsed = await parseLookmlStagedDir(sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path))); const count = parsed.models.length + parsed.views.length + parsed.dashboards.length; return count > 0 ? { ok: true, detail: `lookmlFiles=${count}` } : { ok: false, message: 'No LookML files found' }; } async function defaultValidateNotion(connection: KtxProjectConnectionConfig): Promise { const token = await resolveNotionAuthToken(String(connection.auth_token_ref)); const client: NotionApi = new NotionClient(token); await client.retrieveBotUser(); const roots = Array.isArray(connection.root_page_ids) ? connection.root_page_ids.filter((id): id is string => typeof id === 'string') : []; for (const root of roots) { await client.retrievePage(root); } return { ok: true, detail: `roots=${roots.length}` }; } async function defaultRunMapping(projectDir: string, connectionId: string, io: KtxCliIo): Promise { return await runKtxConnection({ command: 'map', projectDir, sourceConnectionId: connectionId, json: false }, io); } async function defaultRunInitialIngest( projectDir: string, connectionId: string, io: KtxCliIo, options: { inputMode: KtxSetupSourcesArgs['inputMode'] }, ): Promise { return await runKtxPublicIngest( { command: 'run', projectDir, targetConnectionId: connectionId, all: false, json: false, inputMode: options.inputMode, }, io, ); } async function runInitialSourceIngestWithRecovery(input: { args: KtxSetupSourcesArgs; connectionId: string; io: KtxCliIo; prompts: KtxSetupSourcesPromptAdapter; deps: KtxSetupSourcesDeps; }): Promise<'ready' | 'continue' | 'back' | 'failed'> { while (true) { input.io.stdout.write(`Building context from ${input.connectionId}. Large sources can take a while.\n`); const ingestCode = await (input.deps.runInitialIngest ?? defaultRunInitialIngest)( input.args.projectDir, input.connectionId, input.io, { inputMode: input.args.inputMode, }, ); if (ingestCode === 0) { return 'ready'; } if (input.args.inputMode === 'disabled') { return 'failed'; } const action = await input.prompts.select({ message: `Context build failed for ${input.connectionId}\nRetry now, continue setup and build this source later, or go back.`, options: [ { value: 'retry', label: 'Retry context build' }, { value: 'continue', label: 'Continue setup and build this source later' }, { value: 'back', label: 'Back' }, ], }); if (action === 'retry') { continue; } if (action === 'continue') { input.io.stdout.write(`Context source saved without a completed context build for ${input.connectionId}.\n`); input.io.stdout.write(`Run later: ktx ingest ${input.connectionId}\n`); return 'continue'; } return 'back'; } } type SourceLocationChoice = 'path' | 'git'; type SourcePromptState = KtxSetupSourcesArgs & { sourceLocation?: SourceLocationChoice; }; type SourcePromptStep = (state: SourcePromptState) => Promise<'next' | 'back'>; type InteractiveSourceConnectionChoice = | { kind: 'existing'; connectionId: string; connection: KtxProjectConnectionConfig } | { kind: 'new'; args: KtxSetupSourcesArgs } | 'back'; async function runSourcePromptSteps( initialState: SourcePromptState, stepsForState: (state: SourcePromptState) => SourcePromptStep[], ): Promise { let stepIndex = 0; while (true) { const steps = stepsForState(initialState); if (stepIndex >= steps.length) { const { sourceLocation: _sourceLocation, ...sourceArgs } = initialState; return sourceArgs; } const result = await steps[stepIndex]?.(initialState); if (result === 'back') { if (stepIndex === 0) { return 'back'; } stepIndex -= 1; continue; } stepIndex += 1; } } function resetRepoLocationFields(state: SourcePromptState): void { delete state.sourcePath; delete state.sourceGitUrl; delete state.sourceBranch; delete state.sourceAuthTokenRef; delete state.sourceSubpath; delete state.sourceProjectName; } function connectionIdPromptSteps( args: KtxSetupSourcesArgs, source: KtxSetupSourceType, prompts: KtxSetupSourcesPromptAdapter, defaultConnectionId: string, ): SourcePromptStep[] { if (args.sourceConnectionId) { return []; } return [ async (state) => { const enteredConnectionId = await promptText(prompts, { message: connectionNamePrompt(sourceLabel(source)), placeholder: defaultConnectionId, initialValue: defaultConnectionId, }); if (enteredConnectionId === undefined) { return 'back'; } state.sourceConnectionId = enteredConnectionId.trim() || defaultConnectionId; return 'next'; }, ]; } async function promptForInteractiveSource( args: KtxSetupSourcesArgs, source: KtxSetupSourceType, prompts: KtxSetupSourcesPromptAdapter, defaultConnectionId = `${source}-main`, testGitRepo: KtxSetupSourcesDeps['testGitRepo'] = testRepoConnection, ): Promise { const initialState: SourcePromptState = { ...args, source }; if (args.sourceConnectionId) { initialState.sourceConnectionId = args.sourceConnectionId; } const connectionSteps = connectionIdPromptSteps(args, source, prompts, defaultConnectionId); if (source === 'dbt' || source === 'metricflow' || source === 'lookml') { return await runSourcePromptSteps(initialState, (state) => [ ...connectionSteps, async () => { const selectedLocation = await prompts.select({ message: `${source} source location`, options: [ { value: 'path', label: 'Local path' }, { value: 'git', label: 'Git URL' }, { value: 'back', label: 'Back' }, ], }); if (selectedLocation !== 'path' && selectedLocation !== 'git') { return 'back'; } if (state.sourceLocation !== selectedLocation) { resetRepoLocationFields(state); } state.sourceLocation = selectedLocation; return 'next'; }, ...(state.sourceLocation === 'path' ? [ async (currentState: SourcePromptState) => { const sourcePath = await promptText(prompts, { message: `${source} local path` }); if (sourcePath === undefined) return 'back'; currentState.sourcePath = sourcePath; return 'next'; }, ] : []), ...(state.sourceLocation === 'git' ? [ async (currentState: SourcePromptState) => { const sourceGitUrl = await promptText(prompts, { message: `${source} git URL` }); if (sourceGitUrl === undefined) return 'back'; currentState.sourceGitUrl = sourceGitUrl; return 'next'; }, async (currentState: SourcePromptState) => { const branch = await promptText(prompts, { message: `${source} git branch`, initialValue: 'main' }); if (branch === undefined) return 'back'; currentState.sourceBranch = branch || 'main'; return 'next'; }, ] : []), ...(state.sourceLocation ? [ async (currentState: SourcePromptState) => { const subpath = await promptText(prompts, { message: sourceSubpathPrompt(source), placeholder: 'optional', }); if (subpath === undefined) return 'back'; if (subpath) { currentState.sourceSubpath = subpath; } else { delete currentState.sourceSubpath; } return 'next'; }, ] : []), ...(state.sourceLocation === 'git' ? [ async (currentState: SourcePromptState) => { const result = await testGitRepo!({ repoUrl: currentState.sourceGitUrl! }); if (result.ok) { delete currentState.sourceAuthTokenRef; prompts.log?.('Repository connected.'); return 'next'; } const authRef = await promptText(prompts, { message: gitAuthAfterFailurePrompt(source), placeholder: 'env:GITHUB_TOKEN', }); if (authRef === undefined) return 'back'; if (authRef) { currentState.sourceAuthTokenRef = authRef; } else { delete currentState.sourceAuthTokenRef; } return 'next'; }, ] : []), ]); } if (source === 'metabase') { return await runSourcePromptSteps(initialState, () => [ ...connectionSteps, async (state) => { const sourceUrl = await promptText(prompts, { message: 'Metabase URL' }); if (sourceUrl === undefined) return 'back'; state.sourceUrl = sourceUrl; return 'next'; }, async (state) => { const sourceApiKeyRef = await promptText(prompts, { message: 'Metabase API key ref', placeholder: 'env:METABASE_API_KEY', }); if (sourceApiKeyRef === undefined) return 'back'; state.sourceApiKeyRef = sourceApiKeyRef; return 'next'; }, async (state) => { const sourceWarehouseConnectionId = await promptText(prompts, { message: 'Mapped warehouse connection id' }); if (sourceWarehouseConnectionId === undefined) return 'back'; state.sourceWarehouseConnectionId = sourceWarehouseConnectionId; return 'next'; }, async (state) => { const databaseId = await promptText(prompts, { message: 'Metabase database id' }); if (databaseId === undefined) return 'back'; state.metabaseDatabaseId = Number.parseInt(databaseId, 10); return 'next'; }, ]); } if (source === 'looker') { return await runSourcePromptSteps(initialState, () => [ ...connectionSteps, async (state) => { const sourceUrl = await promptText(prompts, { message: 'Looker base URL' }); if (sourceUrl === undefined) return 'back'; state.sourceUrl = sourceUrl; return 'next'; }, async (state) => { const sourceClientId = await promptText(prompts, { message: 'Looker client id' }); if (sourceClientId === undefined) return 'back'; state.sourceClientId = sourceClientId; return 'next'; }, async (state) => { const sourceClientSecretRef = await promptText(prompts, { message: 'Looker client secret ref', placeholder: 'env:LOOKER_CLIENT_SECRET', }); if (sourceClientSecretRef === undefined) return 'back'; state.sourceClientSecretRef = sourceClientSecretRef; return 'next'; }, async (state) => { const sourceWarehouseConnectionId = await promptText(prompts, { message: 'Mapped warehouse connection id' }); if (sourceWarehouseConnectionId === undefined) return 'back'; state.sourceWarehouseConnectionId = sourceWarehouseConnectionId; return 'next'; }, async (state) => { const lookerConnectionName = await promptText(prompts, { message: 'Looker connection name', placeholder: 'optional', }); if (lookerConnectionName === undefined) return 'back'; if (lookerConnectionName) { state.sourceTarget = lookerConnectionName; } else { delete state.sourceTarget; } return 'next'; }, ]); } return await runSourcePromptSteps(initialState, (state) => [ ...connectionSteps, async (currentState) => { const sourceApiKeyRef = await promptText(prompts, { message: 'Notion token ref', placeholder: 'env:NOTION_TOKEN', }); if (sourceApiKeyRef === undefined) return 'back'; currentState.sourceApiKeyRef = sourceApiKeyRef; return 'next'; }, async (currentState) => { const crawlMode = await prompts.select({ message: 'Notion crawl mode', options: [ { value: 'selected_roots', label: 'Selected roots' }, { value: 'all_accessible', label: 'All accessible pages' }, { value: 'back', label: 'Back' }, ], }); if (crawlMode === 'back') return 'back'; currentState.notionCrawlMode = crawlMode === 'all_accessible' ? 'all_accessible' : 'selected_roots'; if (currentState.notionCrawlMode === 'all_accessible') { delete currentState.notionRootPageIds; } return 'next'; }, ...(state.notionCrawlMode === 'selected_roots' ? [ async (currentState: SourcePromptState) => { const roots = await promptText(prompts, { message: 'Notion root page ids', placeholder: 'comma-separated ids', }); if (roots === undefined) return 'back'; currentState.notionRootPageIds = roots .split(',') .map((root) => root.trim()) .filter(Boolean); return 'next'; }, ] : []), ]); } function existingConnectionIdsBySource( connections: Record, source: KtxSetupSourceType, ): string[] { return Object.entries(connections) .filter(([, connection]) => String(connection.driver ?? '').toLowerCase() === source) .map(([connectionId]) => connectionId) .sort((left, right) => left.localeCompare(right)); } function defaultConnectionIdForSource( connections: Record, source: KtxSetupSourceType, ): string { const base = `${source}-main`; if (!connections[base]) { return base; } let index = 2; while (connections[`${base}-${index}`]) { index += 1; } return `${base}-${index}`; } async function chooseInteractiveSourceConnection(input: { args: KtxSetupSourcesArgs; source: KtxSetupSourceType; connections: Record; prompts: KtxSetupSourcesPromptAdapter; testGitRepo?: KtxSetupSourcesDeps['testGitRepo']; }): Promise { const existingIds = existingConnectionIdsBySource(input.connections, input.source); const defaultConnectionId = defaultConnectionIdForSource(input.connections, input.source); const label = sourceLabel(input.source); if (existingIds.length === 0) { const sourceArgs = await promptForInteractiveSource(input.args, input.source, input.prompts, defaultConnectionId, input.testGitRepo); return sourceArgs === 'back' ? 'back' : { kind: 'new', args: sourceArgs }; } while (true) { const choice = await input.prompts.select({ message: `Configure ${label}`, options: [ ...existingIds.map((connectionId) => ({ value: `existing:${connectionId}`, label: `Use existing ${label} connection: ${connectionId}`, })), { value: 'new', label: `Add new ${label} connection` }, { value: 'back', label: 'Back' }, ], }); if (choice === 'back') return 'back'; if (choice.startsWith('existing:')) { const connectionId = choice.slice('existing:'.length); const connection = input.connections[connectionId]; if (connection) { return { kind: 'existing', connectionId, connection }; } continue; } const sourceArgs = await promptForInteractiveSource(input.args, input.source, input.prompts, defaultConnectionId, input.testGitRepo); if (sourceArgs === 'back') { continue; } return { kind: 'new', args: sourceArgs }; } } function buildConnection(source: KtxSetupSourceType, args: KtxSetupSourcesArgs): KtxProjectConnectionConfig { if (source === 'dbt') { return buildDbtConnection(args); } if (source === 'metricflow') { return buildMetricflowConnection(args); } if (source === 'metabase') { return buildMetabaseConnection(args); } if (source === 'looker') { return buildLookerConnection(args); } if (source === 'lookml') { return buildLookmlConnection(args); } return buildNotionConnection(args); } async function validateSource( source: KtxSetupSourceType, args: { projectDir: string; connectionId: string; connection: KtxProjectConnectionConfig }, deps: KtxSetupSourcesDeps, ): Promise { if (source === 'dbt') { return await (deps.validateDbt ?? defaultValidateDbt)(args.connection); } if (source === 'metricflow') { return await (deps.validateMetricflow ?? defaultValidateMetricflow)(args.connection); } if (source === 'metabase') { return await (deps.validateMetabase ?? defaultValidateMetabase)(args.projectDir, args.connectionId); } if (source === 'looker') { return await (deps.validateLooker ?? defaultValidateLooker)(args.projectDir, args.connectionId); } if (source === 'lookml') { return await (deps.validateLookml ?? defaultValidateLookml)(args.connection); } return await (deps.validateNotion ?? defaultValidateNotion)(args.connection); } export async function runKtxSetupSourcesStep( args: KtxSetupSourcesArgs, io: KtxCliIo, deps: KtxSetupSourcesDeps = {}, ): Promise { try { if (args.skipSources) { await markSourcesComplete(args.projectDir); io.stdout.write('Context source setup skipped.\n'); return { status: 'skipped', projectDir: args.projectDir }; } const prompts = deps.prompts ?? createPromptAdapter(); const project = await loadKtxProject({ projectDir: args.projectDir }); if (!hasPrimarySource(project.config)) { const message = 'Connect a primary source before adding context sources.'; if (args.source) { io.stderr.write(`${message}\n`); return { status: 'failed', projectDir: args.projectDir }; } if (args.inputMode !== 'disabled') { io.stdout.write(`${message}\n`); return { status: 'skipped', projectDir: args.projectDir }; } } while (true) { const selected = args.source ? [args.source] : args.inputMode === 'disabled' ? [] : await prompts.multiselect({ message: withMultiselectNavigation('Which context sources should KTX ingest?'), options: [...SOURCE_OPTIONS], required: false, }); if (selected.includes('back')) { return { status: 'back', projectDir: args.projectDir }; } if (selected.length === 0) { if (args.inputMode === 'disabled') { io.stderr.write('Missing context source selection: pass --source or --skip-sources.\n'); return { status: 'missing-input', projectDir: args.projectDir }; } await markSourcesComplete(args.projectDir); io.stdout.write('No context sources selected.\n'); return { status: 'skipped', projectDir: args.projectDir }; } const readyConnectionIds: string[] = []; let returnToSourceSelection = false; for (const source of selected as KtxSetupSourceType[]) { const sourceChoice = args.source ? ({ kind: 'new', args } as const) : await chooseInteractiveSourceConnection({ args, source, connections: (await loadKtxProject({ projectDir: args.projectDir })).config.connections, prompts, testGitRepo: deps.testGitRepo, }); if (sourceChoice === 'back') { if (args.source) { return { status: 'back', projectDir: args.projectDir }; } returnToSourceSelection = true; break; } const connectionId = sourceChoice.kind === 'existing' ? sourceChoice.connectionId : (sourceChoice.args.sourceConnectionId ?? `${source}-main`); const connection = sourceChoice.kind === 'existing' ? sourceChoice.connection : buildConnection(source, sourceChoice.args); const rollback = sourceChoice.kind === 'existing' ? undefined : await writeSourceConnection(args.projectDir, connectionId, connection, sourceAdapter(source)); if (sourceChoice.kind === 'existing') { await ensureSourceAdapterEnabled(args.projectDir, source); } const validation = await validateSource(source, { projectDir: args.projectDir, connectionId, connection }, deps); if (!validation.ok) { await rollback?.(); io.stderr.write(`${validation.message}\n`); return { status: 'failed', projectDir: args.projectDir }; } if (source === 'metabase' || source === 'looker') { const mappingCode = await (deps.runMapping ?? defaultRunMapping)(args.projectDir, connectionId, io); if (mappingCode !== 0) { await rollback?.(); return { status: 'failed', projectDir: args.projectDir }; } } if (args.runInitialSourceIngest) { const ingestResult = await runInitialSourceIngestWithRecovery({ args, connectionId, io, prompts, deps, }); if (ingestResult === 'failed') { await rollback?.(); return { status: 'failed', projectDir: args.projectDir }; } if (ingestResult === 'back') { await rollback?.(); if (args.source) { return { status: 'back', projectDir: args.projectDir }; } returnToSourceSelection = true; break; } } else { io.stdout.write(`Context source ${connectionId} saved. It will be built during the context build step.\n`); } readyConnectionIds.push(connectionId); } if (returnToSourceSelection) { continue; } if (readyConnectionIds.length > 0 && !args.source && args.inputMode !== 'disabled') { const addMore = await prompts.select({ message: `${readyConnectionIds.length} context source${readyConnectionIds.length > 1 ? 's' : ''} configured (${readyConnectionIds.join(', ')}). Add another?`, options: [ { value: 'done', label: 'Done — continue to context build' }, { value: 'add', label: 'Add another context source' }, ], }); if (addMore === 'add') { continue; } } await markSourcesComplete(args.projectDir); return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds }; } } catch (error) { io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); return { status: 'failed', projectDir: args.projectDir }; } }