ktx/packages/cli/src/setup-sources.ts
Luca Martial 9ecb8cb119
feat(cli): add edit flow for setup connections (#77)
* feat(cli): add edit flow for primary database connections in setup

Allow users to edit existing primary database connections during setup
instead of only adding new ones. Preselects existing values (URL, schemas,
tables) so users can adjust without re-entering everything.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(cli): add edit flow for context source connections in setup

Allow users to edit existing context source connections during setup.
Preselects existing values (URLs, credentials, repo details) and offers
a "Keep existing credential" option for sensitive fields.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(cli): rename "Add more" to "Add additional" in primary sources menu

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-13 17:22:59 -04:00

1951 lines
69 KiB
TypeScript

import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join, relative, resolve } from 'node:path';
import { fileURLToPath, pathToFileURL } from 'node:url';
import { localConnectionTypeForConfig, resolveNotionAuthToken } from '@ktx/context/connections';
import { resolveKtxConfigReference } from '@ktx/context/core';
import {
cloneOrPull,
DEFAULT_METABASE_CLIENT_CONFIG,
discoverMetabaseDatabases,
type DiscoveredMetabaseDatabase,
loadDbtSchemaFiles,
loadProjectInfo,
MetabaseClient,
type NotionApi,
NotionClient,
parseLookmlStagedDir,
parseMetricflowFiles,
testRepoConnection,
} from '@ktx/context/ingest';
import {
type KtxProjectConfig,
type KtxProjectConnectionConfig,
loadKtxProject,
markKtxSetupStateStepComplete,
serializeKtxProjectConfig,
} from '@ktx/context/project';
import type { KtxCliIo } from './cli-runtime.js';
import { pickNotionRootPages } from './notion-page-picker.js';
import { runKtxSourceMapping } from './source-mapping.js';
import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js';
import { runKtxPublicIngest } from './public-ingest.js';
import { writeProjectLocalSecretReference } from './setup-secrets.js';
import {
createKtxSetupPromptAdapter,
type KtxSetupPromptOption,
} from './setup-prompts.js';
export type KtxSetupSourceType = 'dbt' | 'metricflow' | 'metabase' | 'looker' | 'lookml' | 'notion';
const DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN = 25;
export interface KtxSetupSourcesArgs {
projectDir: string;
inputMode: 'auto' | 'disabled';
source?: KtxSetupSourceType;
sourceConnectionId?: string;
sourcePath?: string;
sourceGitUrl?: string;
sourceBranch?: string;
sourceSubpath?: string;
sourceAuthTokenRef?: string;
sourceUrl?: string;
sourceApiKeyRef?: string;
sourceClientId?: string;
sourceClientSecretRef?: string;
sourceWarehouseConnectionId?: string;
sourceProjectName?: string;
sourceProfilesPath?: string;
sourceTarget?: string;
metabaseDatabaseId?: number;
notionCrawlMode?: 'all_accessible' | 'selected_roots';
notionRootPageIds?: string[];
runInitialSourceIngest: boolean;
skipSources: boolean;
}
export type KtxSetupSourcesResult =
| { status: 'ready'; projectDir: string; connectionIds: string[] }
| { status: 'skipped'; projectDir: string }
| { status: 'back'; projectDir: string }
| { status: 'missing-input'; projectDir: string }
| { status: 'failed'; projectDir: string };
export interface KtxSetupSourcesPromptAdapter {
multiselect(options: {
message: string;
options: KtxSetupPromptOption[];
initialValues?: string[];
required?: boolean;
}): Promise<string[]>;
select(options: { message: string; options: KtxSetupPromptOption[] }): Promise<string>;
text(options: { message: string; placeholder?: string; initialValue?: string }): Promise<string | undefined>;
password(options: { message: string }): Promise<string | undefined>;
cancel(message: string): void;
log?(message: string): void;
}
export type SourceValidationResult = { ok: true; detail?: string } | { ok: false; message: string };
export interface KtxSetupSourcesDeps {
prompts?: KtxSetupSourcesPromptAdapter;
testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>;
validateDbt?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
validateMetricflow?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
validateMetabase?: (projectDir: string, connectionId: string) => Promise<SourceValidationResult>;
validateLooker?: (projectDir: string, connectionId: string) => Promise<SourceValidationResult>;
validateLookml?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
validateNotion?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
pickNotionRootPages?: typeof pickNotionRootPages;
discoverMetabaseDatabases?: (args: {
sourceUrl: string;
sourceApiKeyRef: string;
sourceConnectionId: string;
}) => Promise<DiscoveredMetabaseDatabase[]>;
runMapping?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
runInitialIngest?: (
projectDir: string,
connectionId: string,
io: KtxCliIo,
options: { inputMode: KtxSetupSourcesArgs['inputMode'] },
) => Promise<number>;
}
const SOURCE_OPTIONS: Array<{ value: KtxSetupSourceType; label: string }> = [
{ value: 'dbt', label: 'dbt' },
{ value: 'metricflow', label: 'MetricFlow' },
{ value: 'metabase', label: 'Metabase' },
{ value: 'looker', label: 'Looker' },
{ value: 'lookml', label: 'LookML' },
{ value: 'notion', label: 'Notion' },
];
const SOURCE_LABELS = Object.fromEntries(SOURCE_OPTIONS.map((option) => [option.value, option.label])) as Record<
KtxSetupSourceType,
string
>;
const PRIMARY_SOURCE_DRIVERS = new Set([
'sqlite',
'postgres',
'mysql',
'clickhouse',
'sqlserver',
'bigquery',
'snowflake',
]);
function createPromptAdapter(): KtxSetupSourcesPromptAdapter {
return createKtxSetupPromptAdapter({
selectCancelValue: 'back',
multiselectCancelValue: 'back',
confirmEmptyOptionalMultiselect: true,
});
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function stringField(value: unknown): string | undefined {
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined;
}
function sourceLabel(source: KtxSetupSourceType): string {
return SOURCE_LABELS[source];
}
function sourceAdapter(source: KtxSetupSourceType): string {
return source;
}
function connectionNamePrompt(label: string): string {
return `Name this ${label} connection\nKTX will use this short name in commands and config. You can rename it now.`;
}
function sourceSubpathPrompt(source: KtxSetupSourceType): string {
if (source === 'dbt') {
return [
'Folder containing dbt_project.yml (optional)',
'Press Enter when dbt_project.yml is at the repo root.',
'For monorepos, enter a relative path like analytics/dbt.',
].join('\n');
}
return [
`${sourceLabel(source)} project folder (optional)`,
'If the project files are inside a subfolder, enter that path.',
'Press Enter if the path or repo already points at the project.',
].join('\n');
}
const SCAN_SKIP_DIRS = new Set(['.git', 'node_modules', '.venv', 'target', 'dbt_packages', 'dbt_modules', '__pycache__']);
async function findDbtProjectSubpaths(rootDir: string): Promise<string[]> {
const entries = await readdir(rootDir, { withFileTypes: true, recursive: true });
const subpaths: string[] = [];
for (const entry of entries) {
if (!entry.isFile()) continue;
if (entry.name !== 'dbt_project.yml' && entry.name !== 'dbt_project.yaml') continue;
const relDir = relative(rootDir, entry.parentPath);
if (relDir.split('/').some((part) => SCAN_SKIP_DIRS.has(part))) continue;
subpaths.push(relDir);
}
return subpaths;
}
async function promptText(
prompts: KtxSetupSourcesPromptAdapter,
options: { message: string; placeholder?: string; initialValue?: string },
): Promise<string | undefined> {
return await prompts.text({ ...options, message: withTextInputNavigation(options.message) });
}
function assertSafeConnectionId(connectionId: string): void {
if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) {
throw new Error(`Unsafe connection id: ${connectionId}`);
}
}
function credentialRef(value: string | undefined, label: string): string {
const ref = value?.trim();
if (!ref) {
throw new Error(`Missing ${label}; use env:NAME or file:/absolute/path`);
}
if (!ref.startsWith('env:') && !ref.startsWith('file:')) {
throw new Error(`${label} must use env:NAME or file:/absolute/path`);
}
return ref;
}
async function chooseSourceCredentialRef(input: {
prompts: KtxSetupSourcesPromptAdapter;
projectDir: string;
label: string;
envName: string;
secretFileName: string;
existingRef?: string;
}): Promise<string | 'back'> {
while (true) {
const choice = await input.prompts.select({
message: `How should KTX find your ${input.label}?`,
options: [
...(input.existingRef ? [{ value: 'keep', label: 'Keep existing credential' }] : []),
{ value: 'env', label: `Use ${input.envName} from the environment` },
{ value: 'paste', label: 'Paste a key and save it as a local secret file' },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
if (choice === 'keep' && input.existingRef) return input.existingRef;
if (choice === 'paste') {
const value = await input.prompts.password({ message: input.label });
if (value === undefined) continue;
if (!value.trim()) continue;
const ref = await writeProjectLocalSecretReference({
projectDir: input.projectDir,
fileName: input.secretFileName,
value,
});
input.prompts.log?.(`Saved to .ktx/secrets/${input.secretFileName}`);
return ref;
}
return `env:${input.envName}`;
}
}
async function chooseGitAuthCredentialRef(input: {
prompts: KtxSetupSourcesPromptAdapter;
projectDir: string;
source: KtxSetupSourceType;
connectionId: string;
existingRef?: string;
}): Promise<string | undefined | 'back'> {
const label = input.source === 'dbt' ? 'This' : `This ${sourceLabel(input.source)}`;
while (true) {
const choice = await input.prompts.select({
message: `${label} repo requires authentication.`,
options: [
...(input.existingRef ? [{ value: 'keep', label: 'Keep existing credential' }] : []),
{ value: 'env', label: 'Use GITHUB_TOKEN from the environment' },
{ value: 'paste', label: 'Paste a token and save it as a local secret file' },
{ value: 'skip', label: 'Skip — try without authentication' },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
if (choice === 'keep' && input.existingRef) return input.existingRef;
if (choice === 'skip') return undefined;
if (choice === 'paste') {
const value = await input.prompts.password({ message: 'Git access token' });
if (value === undefined) continue;
if (!value.trim()) continue;
const fileName = `${input.connectionId}-auth-token`;
const ref = await writeProjectLocalSecretReference({
projectDir: input.projectDir,
fileName,
value,
});
input.prompts.log?.(`Saved to .ktx/secrets/${fileName}`);
return ref;
}
return 'env:GITHUB_TOKEN';
}
}
function repoOrLocalSource(args: KtxSetupSourcesArgs): { sourceDir?: string; repoUrl?: string } {
if (args.sourcePath && args.sourceGitUrl) {
throw new Error('Choose only one source location: --source-path or --source-git-url.');
}
if (args.sourcePath) {
return { sourceDir: resolve(args.sourcePath) };
}
if (args.sourceGitUrl) {
return { repoUrl: args.sourceGitUrl };
}
throw new Error('Missing source location: pass --source-path or --source-git-url.');
}
function fileRepoUrl(sourceDir: string): string {
return pathToFileURL(sourceDir).toString();
}
async function writeProjectConfig(projectDir: string, config: KtxProjectConfig): Promise<void> {
const project = await loadKtxProject({ projectDir });
await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8');
}
async function writeSourceConnection(
projectDir: string,
connectionId: string,
connection: KtxProjectConnectionConfig,
adapter: string,
): Promise<() => Promise<void>> {
assertSafeConnectionId(connectionId);
const project = await loadKtxProject({ projectDir });
const previousConnection = project.config.connections[connectionId];
const hadPreviousConnection = previousConnection !== undefined;
const shouldRemoveAdapterOnRollback = !project.config.ingest.adapters.includes(adapter);
const config = {
...project.config,
connections: {
...project.config.connections,
[connectionId]: connection,
},
ingest: {
...project.config.ingest,
adapters: project.config.ingest.adapters.includes(adapter)
? [...project.config.ingest.adapters]
: [...project.config.ingest.adapters, adapter],
},
};
await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8');
return async () => {
const latest = await loadKtxProject({ projectDir });
const connections = { ...latest.config.connections };
if (hadPreviousConnection) {
connections[connectionId] = previousConnection;
} else {
delete connections[connectionId];
}
await writeProjectConfig(projectDir, {
...latest.config,
connections,
ingest: {
...latest.config.ingest,
adapters: shouldRemoveAdapterOnRollback
? latest.config.ingest.adapters.filter((candidate) => candidate !== adapter)
: latest.config.ingest.adapters,
},
});
};
}
async function ensureSourceAdapterEnabled(projectDir: string, source: KtxSetupSourceType): Promise<void> {
const adapter = sourceAdapter(source);
const project = await loadKtxProject({ projectDir });
if (project.config.ingest.adapters.includes(adapter)) {
return;
}
await writeProjectConfig(projectDir, {
...project.config,
ingest: {
...project.config.ingest,
adapters: [...project.config.ingest.adapters, adapter],
},
});
}
async function markSourcesComplete(projectDir: string): Promise<void> {
const project = await loadKtxProject({ projectDir });
await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8');
await markKtxSetupStateStepComplete(projectDir, 'sources');
}
function hasPrimarySource(config: KtxProjectConfig): boolean {
const setupPrimaryIds = config.setup?.database_connection_ids ?? [];
if (setupPrimaryIds.some((connectionId) => Object.hasOwn(config.connections, connectionId))) {
return true;
}
return Object.values(config.connections).some((connection) =>
PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()),
);
}
function buildDbtConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const source = repoOrLocalSource(args);
return {
driver: 'dbt',
...(source.sourceDir ? { source_dir: source.sourceDir } : {}),
...(source.repoUrl ? { repo_url: source.repoUrl } : {}),
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
...(args.sourceAuthTokenRef
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'dbt private repo access token') }
: {}),
...(args.sourceProfilesPath ? { profiles_path: resolve(args.sourceProfilesPath) } : {}),
...(args.sourceTarget ? { target: args.sourceTarget } : {}),
...(args.sourceProjectName ? { project_name: args.sourceProjectName } : {}),
};
}
function buildMetricflowConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const source = repoOrLocalSource(args);
return {
driver: 'metricflow',
metricflow: {
repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''),
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
...(args.sourceAuthTokenRef
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'MetricFlow auth token ref') }
: {}),
},
};
}
function buildMetabaseConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
if (!args.sourceUrl) {
throw new Error('Missing Metabase URL: pass --source-url.');
}
if (!args.sourceWarehouseConnectionId) {
throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.');
}
if (!args.metabaseDatabaseId) {
throw new Error('Missing Metabase database id: pass --metabase-database-id.');
}
return {
driver: 'metabase',
api_url: args.sourceUrl,
api_key_ref: credentialRef(args.sourceApiKeyRef, 'Metabase API key ref'),
mappings: {
databaseMappings: { [String(args.metabaseDatabaseId)]: args.sourceWarehouseConnectionId },
syncEnabled: { [String(args.metabaseDatabaseId)]: true },
syncMode: 'ALL',
},
};
}
function buildLookerConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
if (!args.sourceUrl) {
throw new Error('Missing Looker base URL: pass --source-url.');
}
if (!args.sourceClientId) {
throw new Error('Missing Looker client id: pass --source-client-id.');
}
if (!args.sourceWarehouseConnectionId) {
throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.');
}
return {
driver: 'looker',
base_url: args.sourceUrl,
client_id: args.sourceClientId,
client_secret_ref: credentialRef(args.sourceClientSecretRef, 'Looker client secret ref'),
mappings: {
connectionMappings: {
[args.sourceTarget ?? args.sourceWarehouseConnectionId]: args.sourceWarehouseConnectionId,
},
},
};
}
function buildLookmlConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const source = repoOrLocalSource(args);
return {
driver: 'lookml',
repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''),
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
...(args.sourceAuthTokenRef
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'LookML auth token ref') }
: {}),
mappings: {
expectedLookerConnectionName: args.sourceTarget ?? args.sourceWarehouseConnectionId ?? null,
},
};
}
function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const rootPageIds = args.notionRootPageIds ?? [];
const crawlMode = rootPageIds.length > 0 ? 'selected_roots' : (args.notionCrawlMode ?? 'selected_roots');
if (crawlMode === 'selected_roots' && rootPageIds.length === 0) {
throw new Error('Notion selected_roots requires --notion-root-page-id.');
}
return {
driver: 'notion',
auth_token_ref: credentialRef(args.sourceApiKeyRef, 'Notion token ref'),
crawl_mode: crawlMode,
...(rootPageIds.length > 0 ? { root_page_ids: rootPageIds } : {}),
root_database_ids: [],
root_data_source_ids: [],
max_pages_per_run: 1000,
max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN,
max_knowledge_updates_per_run: 20,
last_successful_cursor: null,
};
}
function sourcePathFromFileRepoUrl(repoUrl: string, subpath?: string): string {
const root = fileURLToPath(repoUrl);
return subpath ? join(root, subpath) : root;
}
function repoAuthToken(connection: KtxProjectConnectionConfig | Record<string, unknown>): string | null {
const ref = stringField(connection.auth_token_ref);
const literal = stringField(connection.auth_token);
return literal ?? resolveKtxConfigReference(ref, process.env) ?? null;
}
async function collectYamlFilesRecursive(sourceRoot: string): Promise<Array<{ content: string; path: string }>> {
const entries = await readdir(sourceRoot, { withFileTypes: true, recursive: true });
const files: Array<{ content: string; path: string }> = [];
for (const entry of entries) {
if (!entry.isFile() || !/\.ya?ml$/i.test(entry.name)) {
continue;
}
const path = join(entry.parentPath, entry.name);
files.push({ path, content: await readFile(path, 'utf-8') });
}
return files;
}
async function defaultValidateDbt(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
let sourceDir = stringField(connection.source_dir);
const repoUrl = stringField(connection.repo_url);
if (!sourceDir && repoUrl?.startsWith('file:')) {
sourceDir = sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path));
}
if (!sourceDir && repoUrl) {
const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-'));
await cloneOrPull({
repoUrl,
authToken: repoAuthToken(connection),
cacheDir,
branch: stringField(connection.branch) ?? 'main',
});
sourceDir = stringField(connection.path) ? join(cacheDir, String(connection.path)) : cacheDir;
}
if (!sourceDir) {
return { ok: false, message: 'dbt setup requires --source-path or --source-git-url.' };
}
const info = await loadProjectInfo(sourceDir);
const schemaFiles = await loadDbtSchemaFiles(sourceDir);
if (!info.projectName && typeof connection.project_name !== 'string') {
return { ok: false, message: 'dbt project metadata is missing project name.' };
}
return { ok: true, detail: `project=${info.projectName ?? connection.project_name} schemas=${schemaFiles.length}` };
}
async function defaultValidateMetricflow(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
const metricflow = isRecord(connection.metricflow) ? connection.metricflow : undefined;
const repoUrl = stringField(metricflow?.repoUrl);
if (!repoUrl) {
return { ok: false, message: 'MetricFlow setup requires repoUrl.' };
}
if (!repoUrl.startsWith('file:')) {
const result = await testRepoConnection({
repoUrl,
authToken: metricflow ? repoAuthToken(metricflow) : null,
});
if (!result.ok) {
return { ok: false, message: result.error };
}
return { ok: true, detail: 'repository reachable' };
}
const path = sourcePathFromFileRepoUrl(repoUrl, stringField(metricflow?.path));
const parsed = parseMetricflowFiles(await collectYamlFilesRecursive(path));
return {
ok: true,
detail: `semanticModels=${parsed.semanticModels.length} metrics=${parsed.crossModelMetrics.length}`,
};
}
async function defaultValidateLooker(projectDir: string, connectionId: string): Promise<SourceValidationResult> {
const code = await runKtxSourceMapping(
{ command: 'refresh', projectDir, connectionId, autoAccept: true },
{ stdout: { write() {} }, stderr: { write() {} } },
);
return code === 0
? { ok: true, detail: 'Looker mapping refreshed' }
: { ok: false, message: 'Looker validation failed' };
}
async function defaultValidateLookml(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
const repoUrl = stringField(connection.repoUrl);
if (!repoUrl) {
return { ok: false, message: 'LookML setup requires repoUrl.' };
}
if (!repoUrl.startsWith('file:')) {
const result = await testRepoConnection({ repoUrl, authToken: repoAuthToken(connection) });
return result.ok ? { ok: true, detail: 'repository reachable' } : { ok: false, message: result.error };
}
const parsed = await parseLookmlStagedDir(sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path)));
const count = parsed.models.length + parsed.views.length + parsed.dashboards.length;
return count > 0 ? { ok: true, detail: `lookmlFiles=${count}` } : { ok: false, message: 'No LookML files found' };
}
async function defaultValidateNotion(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
const token = await resolveNotionAuthToken(String(connection.auth_token_ref));
const client: NotionApi = new NotionClient(token);
await client.retrieveBotUser();
const roots = Array.isArray(connection.root_page_ids)
? connection.root_page_ids.filter((id): id is string => typeof id === 'string')
: [];
for (const root of roots) {
await client.retrievePage(root);
}
return { ok: true, detail: `roots=${roots.length}` };
}
interface MappingJsonOutput {
connectionId: string;
refresh: { ok: boolean; output: string[] };
validation: { ok: boolean; output: string[] };
mappings: unknown[];
}
function splitOutputLines(output: string): string[] {
return output
.split('\n')
.map((line) => line.trim())
.filter(Boolean);
}
function writeSetupPrefixedLines(write: (chunk: string) => void, output: string): void {
for (const line of output.split(/\r?\n/)) {
if (line.length > 0) {
write(`${line}\n`);
}
}
}
function createSetupPrefixedIo(io: KtxCliIo): KtxCliIo {
return {
stdout: {
isTTY: io.stdout.isTTY,
columns: io.stdout.columns,
write(chunk: string) {
writeSetupPrefixedLines((line) => io.stdout.write(line), chunk);
},
},
stderr: {
write(chunk: string) {
writeSetupPrefixedLines((line) => io.stderr.write(line), chunk);
},
},
};
}
function parseMappingListJson(output: string): unknown[] {
const trimmed = output.trim();
if (!trimmed) {
return [];
}
const parsed = JSON.parse(trimmed) as unknown;
return Array.isArray(parsed) ? parsed : [];
}
function summarizeMappingResult(parsed: MappingJsonOutput): string {
const mappingCount = parsed.mappings.length;
const mappingNoun = mappingCount === 1 ? 'mapping' : 'mappings';
return `Mapping validated — ${mappingCount} ${mappingNoun} configured`;
}
async function defaultRunMapping(projectDir: string, connectionId: string, io: KtxCliIo): Promise<number> {
const outputs = {
refresh: '',
validation: '',
list: '',
};
const refreshCode = await runKtxSourceMapping(
{ command: 'refresh', projectDir, connectionId, autoAccept: true },
{
stdout: { write(chunk: string) { outputs.refresh += chunk; } },
stderr: io.stderr,
},
);
if (refreshCode !== 0) {
return refreshCode;
}
const validationCode = await runKtxSourceMapping(
{ command: 'validate', projectDir, connectionId },
{
stdout: { write(chunk: string) { outputs.validation += chunk; } },
stderr: io.stderr,
},
);
if (validationCode !== 0) {
return validationCode;
}
const listCode = await runKtxSourceMapping(
{ command: 'list', projectDir, connectionId, json: true },
{
stdout: { write(chunk: string) { outputs.list += chunk; } },
stderr: io.stderr,
},
);
if (listCode !== 0) {
return listCode;
}
const parsed: MappingJsonOutput = {
connectionId,
refresh: { ok: true, output: splitOutputLines(outputs.refresh) },
validation: { ok: true, output: splitOutputLines(outputs.validation) },
mappings: parseMappingListJson(outputs.list),
};
io.stdout.write(`${summarizeMappingResult(parsed)}\n`);
return 0;
}
async function defaultRunInitialIngest(
projectDir: string,
connectionId: string,
io: KtxCliIo,
options: { inputMode: KtxSetupSourcesArgs['inputMode'] },
): Promise<number> {
return await runKtxPublicIngest(
{
command: 'run',
projectDir,
targetConnectionId: connectionId,
all: false,
json: false,
inputMode: options.inputMode,
},
io,
);
}
async function runInitialSourceIngestWithRecovery(input: {
args: KtxSetupSourcesArgs;
connectionId: string;
io: KtxCliIo;
prompts: KtxSetupSourcesPromptAdapter;
deps: KtxSetupSourcesDeps;
}): Promise<'ready' | 'continue' | 'back' | 'failed'> {
while (true) {
input.io.stdout.write(`│ Building context from ${input.connectionId}. Large sources can take a while.\n`);
const ingestCode = await (input.deps.runInitialIngest ?? defaultRunInitialIngest)(
input.args.projectDir,
input.connectionId,
input.io,
{
inputMode: input.args.inputMode,
},
);
if (ingestCode === 0) {
return 'ready';
}
if (input.args.inputMode === 'disabled') {
return 'failed';
}
const action = await input.prompts.select({
message: `Context build failed for ${input.connectionId}\nRetry now, continue setup and build this source later, or go back.`,
options: [
{ value: 'retry', label: 'Retry context build' },
{ value: 'continue', label: 'Continue setup and build this source later' },
{ value: 'back', label: 'Back' },
],
});
if (action === 'retry') {
continue;
}
if (action === 'continue') {
input.io.stdout.write(`│ Context source saved without a completed context build for ${input.connectionId}.\n`);
input.io.stdout.write(`│ Run later: ktx ingest run --connection-id ${input.connectionId} --adapter <adapter>\n`);
return 'continue';
}
return 'back';
}
}
type SourceLocationChoice = 'path' | 'git';
type SourcePromptState = KtxSetupSourcesArgs & {
sourceLocation?: SourceLocationChoice;
};
type SourcePromptStep = (state: SourcePromptState) => Promise<'next' | 'back'>;
interface WarehouseConnectionChoice {
id: string;
connectionType: string;
}
type InteractiveSourceConnectionChoice =
| { kind: 'existing'; connectionId: string; connection: KtxProjectConnectionConfig }
| { kind: 'new'; args: KtxSetupSourcesArgs }
| { kind: 'edited'; connectionId: string; args: KtxSetupSourcesArgs }
| 'back';
type SourceSetupChoiceResult =
| { status: 'ready'; connectionId: string }
| { status: 'back' }
| { status: 'failed' };
async function runSourcePromptSteps(
initialState: SourcePromptState,
stepsForState: (state: SourcePromptState) => SourcePromptStep[],
): Promise<KtxSetupSourcesArgs | 'back'> {
let stepIndex = 0;
while (true) {
const steps = stepsForState(initialState);
if (stepIndex >= steps.length) {
const { sourceLocation: _sourceLocation, ...sourceArgs } = initialState;
return sourceArgs;
}
const result = await steps[stepIndex]?.(initialState);
if (result === 'back') {
if (stepIndex === 0) {
return 'back';
}
stepIndex -= 1;
continue;
}
stepIndex += 1;
}
}
function resetRepoLocationFields(state: SourcePromptState): void {
delete state.sourcePath;
delete state.sourceGitUrl;
delete state.sourceBranch;
delete state.sourceAuthTokenRef;
delete state.sourceSubpath;
delete state.sourceProjectName;
}
function sourceLocationFromArgs(args: KtxSetupSourcesArgs): SourceLocationChoice | undefined {
if (args.sourcePath) return 'path';
if (args.sourceGitUrl) return 'git';
return undefined;
}
function warehouseConnectionChoices(config: KtxProjectConfig): WarehouseConnectionChoice[] {
return Object.entries(config.connections)
.filter(([, connection]) => PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()))
.map(([id, connection]) => ({ id, connectionType: localConnectionTypeForConfig(id, connection) }))
.sort((left, right) => left.id.localeCompare(right.id));
}
async function chooseMappedWarehouseConnectionId(input: {
projectDir: string;
prompts: KtxSetupSourcesPromptAdapter;
}): Promise<string | 'back'> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const choices = warehouseConnectionChoices(project.config);
if (choices.length === 1) {
return choices[0].id;
}
if (choices.length === 0) {
const entered = await promptText(input.prompts, { message: 'Mapped warehouse connection id' });
return entered === undefined ? 'back' : entered;
}
const selected = await input.prompts.select({
message: 'Mapped warehouse connection',
options: [
...choices.map((choice) => ({
value: choice.id,
label: `${choice.id} (${choice.connectionType})`,
})),
{ value: 'back', label: 'Back' },
],
});
return selected === 'back' ? 'back' : selected;
}
async function defaultDiscoverMetabaseDatabases(input: {
sourceUrl: string;
sourceApiKeyRef: string;
}): Promise<DiscoveredMetabaseDatabase[]> {
const apiKey = resolveKtxConfigReference(input.sourceApiKeyRef, process.env);
if (!apiKey) {
throw new Error('Metabase API key ref could not be resolved');
}
const client = new MetabaseClient(
{ apiUrl: input.sourceUrl, apiKey },
DEFAULT_METABASE_CLIENT_CONFIG,
);
try {
return await discoverMetabaseDatabases(client);
} finally {
await client.cleanup();
}
}
function metabaseDatabaseLabel(database: DiscoveredMetabaseDatabase): string {
const detail = [database.engine].filter(Boolean).join(', ');
return detail ? `${database.id}: ${database.name} (${detail})` : `${database.id}: ${database.name}`;
}
async function chooseMetabaseDatabaseId(input: {
state: SourcePromptState;
prompts: KtxSetupSourcesPromptAdapter;
deps: KtxSetupSourcesDeps;
}): Promise<number | 'back'> {
const sourceUrl = input.state.sourceUrl;
const sourceApiKeyRef = input.state.sourceApiKeyRef;
if (sourceUrl && sourceApiKeyRef) {
try {
const discovered = await (input.deps.discoverMetabaseDatabases ?? defaultDiscoverMetabaseDatabases)({
sourceUrl,
sourceApiKeyRef,
sourceConnectionId: input.state.sourceConnectionId ?? 'metabase-main',
});
if (discovered.length === 1) {
return discovered[0].id;
}
if (discovered.length > 1) {
const selected = await input.prompts.select({
message: 'Metabase database',
options: [
...discovered
.slice()
.sort((left, right) => left.id - right.id)
.map((database) => ({
value: String(database.id),
label: metabaseDatabaseLabel(database),
})),
{ value: 'back', label: 'Back' },
],
});
return selected === 'back' ? 'back' : Number.parseInt(selected, 10);
}
} catch {
// Discovery is a convenience. Fall back to the raw id prompt when credentials
// are unavailable locally or the Metabase API cannot be reached yet.
}
}
const databaseId = await promptText(input.prompts, { message: 'Metabase database id' });
return databaseId === undefined ? 'back' : Number.parseInt(databaseId, 10);
}
function connectionIdPromptSteps(
args: KtxSetupSourcesArgs,
source: KtxSetupSourceType,
prompts: KtxSetupSourcesPromptAdapter,
defaultConnectionId: string,
): SourcePromptStep[] {
if (args.sourceConnectionId) {
return [];
}
return [
async (state) => {
const enteredConnectionId = await promptText(prompts, {
message: connectionNamePrompt(sourceLabel(source)),
placeholder: defaultConnectionId,
initialValue: defaultConnectionId,
});
if (enteredConnectionId === undefined) {
return 'back';
}
state.sourceConnectionId = enteredConnectionId.trim() || defaultConnectionId;
return 'next';
},
];
}
async function promptForInteractiveSource(
args: KtxSetupSourcesArgs,
source: KtxSetupSourceType,
prompts: KtxSetupSourcesPromptAdapter,
io: KtxCliIo,
deps: KtxSetupSourcesDeps,
defaultConnectionId = `${source}-main`,
testGitRepo: KtxSetupSourcesDeps['testGitRepo'] = testRepoConnection,
discoverMetabaseDatabaseList?: KtxSetupSourcesDeps['discoverMetabaseDatabases'],
): Promise<KtxSetupSourcesArgs | 'back'> {
const initialState: SourcePromptState = { ...args, source, sourceLocation: sourceLocationFromArgs(args) };
if (args.sourceConnectionId) {
initialState.sourceConnectionId = args.sourceConnectionId;
}
const connectionSteps = connectionIdPromptSteps(args, source, prompts, defaultConnectionId);
if (source === 'dbt' || source === 'metricflow' || source === 'lookml') {
return await runSourcePromptSteps(initialState, (state) => [
...connectionSteps,
async () => {
const selectedLocation = await prompts.select({
message: `${source} source location`,
options: [
{ value: 'path', label: 'Local path' },
{ value: 'git', label: 'Git URL' },
{ value: 'back', label: 'Back' },
],
});
if (selectedLocation !== 'path' && selectedLocation !== 'git') {
return 'back';
}
if (state.sourceLocation !== selectedLocation) {
resetRepoLocationFields(state);
}
state.sourceLocation = selectedLocation;
return 'next';
},
...(state.sourceLocation === 'path'
? [
async (currentState: SourcePromptState) => {
const sourcePath = await promptText(prompts, {
message: `${source} local path`,
...(currentState.sourcePath ? { initialValue: currentState.sourcePath } : {}),
});
if (sourcePath === undefined) return 'back';
currentState.sourcePath = sourcePath;
return 'next';
},
]
: []),
...(state.sourceLocation === 'git'
? [
async (currentState: SourcePromptState) => {
const sourceGitUrl = await promptText(prompts, {
message: `${source} git URL`,
...(currentState.sourceGitUrl ? { initialValue: currentState.sourceGitUrl } : {}),
});
if (sourceGitUrl === undefined) return 'back';
currentState.sourceGitUrl = sourceGitUrl;
return 'next';
},
async (currentState: SourcePromptState) => {
const branch = await promptText(prompts, {
message: `${source} git branch`,
initialValue: currentState.sourceBranch ?? 'main',
});
if (branch === undefined) return 'back';
currentState.sourceBranch = branch || 'main';
return 'next';
},
]
: []),
...(state.sourceLocation === 'git'
? [
async (currentState: SourcePromptState) => {
const result = await testGitRepo!({ repoUrl: currentState.sourceGitUrl! });
if (result.ok) {
delete currentState.sourceAuthTokenRef;
prompts.log?.('Repository connected.');
return 'next';
}
const authRef = await chooseGitAuthCredentialRef({
prompts,
projectDir: args.projectDir,
source,
connectionId: currentState.sourceConnectionId ?? `${source}-main`,
existingRef: currentState.sourceAuthTokenRef,
});
if (authRef === 'back') return 'back';
if (authRef) {
currentState.sourceAuthTokenRef = authRef;
} else {
delete currentState.sourceAuthTokenRef;
}
return 'next';
},
]
: []),
...(state.sourceLocation
? [
async (currentState: SourcePromptState) => {
if (source === 'dbt') {
let scanDir: string | undefined;
if (currentState.sourceLocation === 'path' && currentState.sourcePath) {
scanDir = currentState.sourcePath;
} else if (currentState.sourceLocation === 'git' && currentState.sourceGitUrl) {
try {
const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-scan-'));
const authToken = currentState.sourceAuthTokenRef
? resolveKtxConfigReference(currentState.sourceAuthTokenRef, process.env)
: null;
await cloneOrPull({
repoUrl: currentState.sourceGitUrl,
authToken,
cacheDir,
branch: currentState.sourceBranch ?? 'main',
});
scanDir = cacheDir;
} catch {
// Clone failed — fall through to manual prompt
}
}
if (scanDir) {
try {
const subpaths = await findDbtProjectSubpaths(scanDir);
if (subpaths.length === 1) {
const found = subpaths[0]!;
if (found) {
currentState.sourceSubpath = found;
prompts.log?.(`Found dbt_project.yml in ${found}/`);
} else {
delete currentState.sourceSubpath;
}
return 'next';
}
if (subpaths.length > 1) {
const selected = await prompts.select({
message: 'Multiple dbt projects found — which one should KTX use?',
options: [
...subpaths.map((p) => ({ value: p || '.', label: p || '(project root)' })),
{ value: 'back', label: 'Back' },
],
});
if (selected === 'back') return 'back';
const subpath = selected === '.' ? '' : selected;
if (subpath) {
currentState.sourceSubpath = subpath;
} else {
delete currentState.sourceSubpath;
}
return 'next';
}
} catch {
// Directory unreadable — fall through to manual prompt
}
}
}
const subpath = await promptText(prompts, {
message: sourceSubpathPrompt(source),
placeholder: 'optional',
...(currentState.sourceSubpath ? { initialValue: currentState.sourceSubpath } : {}),
});
if (subpath === undefined) return 'back';
if (subpath) {
currentState.sourceSubpath = subpath;
} else {
delete currentState.sourceSubpath;
}
return 'next';
},
]
: []),
]);
}
if (source === 'metabase') {
return await runSourcePromptSteps(initialState, () => [
...connectionSteps,
async (state) => {
const sourceUrl = await promptText(prompts, {
message: 'Metabase URL',
...(state.sourceUrl ? { initialValue: state.sourceUrl } : {}),
});
if (sourceUrl === undefined) return 'back';
state.sourceUrl = sourceUrl;
return 'next';
},
async (state) => {
const ref = await chooseSourceCredentialRef({
prompts,
projectDir: args.projectDir,
label: 'Metabase API key',
envName: 'METABASE_API_KEY',
secretFileName: `${state.sourceConnectionId ?? 'metabase-main'}-api-key`,
existingRef: state.sourceApiKeyRef,
});
if (ref === 'back') return 'back';
state.sourceApiKeyRef = ref;
return 'next';
},
async (state) => {
const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({
projectDir: args.projectDir,
prompts,
});
if (sourceWarehouseConnectionId === 'back') return 'back';
state.sourceWarehouseConnectionId = sourceWarehouseConnectionId;
return 'next';
},
async (state) => {
const databaseId = await chooseMetabaseDatabaseId({
state,
prompts,
deps: { discoverMetabaseDatabases: discoverMetabaseDatabaseList },
});
if (databaseId === 'back') return 'back';
state.metabaseDatabaseId = databaseId;
return 'next';
},
]);
}
if (source === 'looker') {
return await runSourcePromptSteps(initialState, () => [
...connectionSteps,
async (state) => {
const sourceUrl = await promptText(prompts, {
message: 'Looker base URL',
...(state.sourceUrl ? { initialValue: state.sourceUrl } : {}),
});
if (sourceUrl === undefined) return 'back';
state.sourceUrl = sourceUrl;
return 'next';
},
async (state) => {
const sourceClientId = await promptText(prompts, {
message: 'Looker client id',
...(state.sourceClientId ? { initialValue: state.sourceClientId } : {}),
});
if (sourceClientId === undefined) return 'back';
state.sourceClientId = sourceClientId;
return 'next';
},
async (state) => {
const ref = await chooseSourceCredentialRef({
prompts,
projectDir: args.projectDir,
label: 'Looker client secret',
envName: 'LOOKER_CLIENT_SECRET',
secretFileName: `${state.sourceConnectionId ?? 'looker-main'}-client-secret`,
existingRef: state.sourceClientSecretRef,
});
if (ref === 'back') return 'back';
state.sourceClientSecretRef = ref;
return 'next';
},
async (state) => {
const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({
projectDir: args.projectDir,
prompts,
});
if (sourceWarehouseConnectionId === 'back') return 'back';
state.sourceWarehouseConnectionId = sourceWarehouseConnectionId;
return 'next';
},
async (state) => {
const lookerConnectionName = await promptText(prompts, {
message: 'Looker connection name',
placeholder: 'optional',
...(state.sourceTarget ? { initialValue: state.sourceTarget } : {}),
});
if (lookerConnectionName === undefined) return 'back';
if (lookerConnectionName) {
state.sourceTarget = lookerConnectionName;
} else {
delete state.sourceTarget;
}
return 'next';
},
]);
}
return await runSourcePromptSteps(initialState, (state) => [
...connectionSteps,
async (currentState) => {
const ref = await chooseSourceCredentialRef({
prompts,
projectDir: args.projectDir,
label: 'Notion integration token',
envName: 'NOTION_TOKEN',
secretFileName: `${currentState.sourceConnectionId ?? 'notion-main'}-token`,
existingRef: currentState.sourceApiKeyRef,
});
if (ref === 'back') return 'back';
currentState.sourceApiKeyRef = ref;
return 'next';
},
async (currentState) => {
const crawlMode = await prompts.select({
message: 'Which Notion pages should KTX ingest?',
options: [
{ value: 'selected_roots', label: 'Specific pages and their subpages (choose them in a picker)' },
{ value: 'all_accessible', label: 'All pages the integration can access' },
{ value: 'back', label: 'Back' },
],
});
if (crawlMode === 'back') return 'back';
currentState.notionCrawlMode = crawlMode === 'all_accessible' ? 'all_accessible' : 'selected_roots';
if (currentState.notionCrawlMode === 'all_accessible') {
delete currentState.notionRootPageIds;
}
return 'next';
},
...(state.notionCrawlMode === 'selected_roots'
? [
async (currentState: SourcePromptState) => {
const connectionId = currentState.sourceConnectionId ?? 'notion-main';
const result = await (deps.pickNotionRootPages ?? pickNotionRootPages)(
{
connectionId,
connection: {
driver: 'notion',
auth_token_ref: credentialRef(currentState.sourceApiKeyRef, 'Notion token ref'),
crawl_mode: 'selected_roots',
root_page_ids: currentState.notionRootPageIds ?? [],
root_database_ids: [],
root_data_source_ids: [],
},
},
io,
);
if (result.kind === 'back') {
return 'back';
}
if (result.kind === 'unavailable') {
io.stderr.write(`${result.message}\n`);
return 'back';
}
currentState.notionRootPageIds = result.rootPageIds;
return 'next';
},
]
: []),
]);
}
function existingConnectionIdsBySource(
connections: Record<string, KtxProjectConnectionConfig>,
source: KtxSetupSourceType,
): string[] {
return Object.entries(connections)
.filter(([, connection]) => String(connection.driver ?? '').toLowerCase() === source)
.map(([connectionId]) => connectionId)
.sort((left, right) => left.localeCompare(right));
}
function sourceTypeForConnection(connection: KtxProjectConnectionConfig): KtxSetupSourceType | null {
const driver = String(connection.driver ?? '').toLowerCase();
return SOURCE_OPTIONS.some((option) => option.value === driver) ? (driver as KtxSetupSourceType) : null;
}
function contextSourceEditTargets(connections: Record<string, KtxProjectConnectionConfig>): Array<{
connectionId: string;
source: KtxSetupSourceType;
}> {
return Object.entries(connections)
.map(([connectionId, connection]) => {
const source = sourceTypeForConnection(connection);
return source ? { connectionId, source } : null;
})
.filter((target): target is { connectionId: string; source: KtxSetupSourceType } => target !== null)
.sort((left, right) => left.connectionId.localeCompare(right.connectionId));
}
function sourceChecklistForConnections(connections: Record<string, KtxProjectConnectionConfig>): {
options: Array<{ value: KtxSetupSourceType; label: string; hint?: string }>;
initialValues: KtxSetupSourceType[];
} {
const initialValues: KtxSetupSourceType[] = [];
const options = SOURCE_OPTIONS.map((option) => {
const existingIds = existingConnectionIdsBySource(connections, option.value);
if (existingIds.length === 0) {
return option;
}
initialValues.push(option.value);
return { ...option, hint: `configured: ${existingIds.join(', ')}` };
});
return { options, initialValues };
}
function defaultConnectionIdForSource(
connections: Record<string, KtxProjectConnectionConfig>,
source: KtxSetupSourceType,
): string {
const base = `${source}-main`;
if (!connections[base]) {
return base;
}
let index = 2;
while (connections[`${base}-${index}`]) {
index += 1;
}
return `${base}-${index}`;
}
function firstStringRecordEntry(value: unknown): [string, string] | undefined {
if (!isRecord(value)) return undefined;
for (const [key, raw] of Object.entries(value)) {
if (typeof raw === 'string' && raw.trim().length > 0) {
return [key, raw.trim()];
}
}
return undefined;
}
function applyRepoSourceArgs(
args: KtxSetupSourcesArgs,
input: { repoUrl?: string; sourceDir?: string; branch?: string; subpath?: string; authTokenRef?: string },
): void {
if (input.sourceDir) {
args.sourcePath = input.sourceDir;
} else if (input.repoUrl?.startsWith('file:')) {
args.sourcePath = fileURLToPath(input.repoUrl);
} else if (input.repoUrl) {
args.sourceGitUrl = input.repoUrl;
}
if (input.branch) args.sourceBranch = input.branch;
if (input.subpath) args.sourceSubpath = input.subpath;
if (input.authTokenRef) args.sourceAuthTokenRef = input.authTokenRef;
}
function sourceArgsFromExistingConnection(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connectionId: string;
connection: KtxProjectConnectionConfig;
}): KtxSetupSourcesArgs {
const sourceArgs: KtxSetupSourcesArgs = {
projectDir: input.args.projectDir,
inputMode: input.args.inputMode,
source: input.source,
sourceConnectionId: input.connectionId,
runInitialSourceIngest: input.args.runInitialSourceIngest,
skipSources: input.args.skipSources,
};
if (input.source === 'dbt') {
applyRepoSourceArgs(sourceArgs, {
sourceDir: stringField(input.connection.source_dir),
repoUrl: stringField(input.connection.repo_url),
branch: stringField(input.connection.branch),
subpath: stringField(input.connection.path),
authTokenRef: stringField(input.connection.auth_token_ref),
});
const profilesPath = stringField(input.connection.profiles_path);
const target = stringField(input.connection.target);
const projectName = stringField(input.connection.project_name);
if (profilesPath) sourceArgs.sourceProfilesPath = profilesPath;
if (target) sourceArgs.sourceTarget = target;
if (projectName) sourceArgs.sourceProjectName = projectName;
return sourceArgs;
}
if (input.source === 'metricflow') {
const metricflow = isRecord(input.connection.metricflow) ? input.connection.metricflow : {};
applyRepoSourceArgs(sourceArgs, {
repoUrl: stringField(metricflow.repoUrl),
branch: stringField(metricflow.branch),
subpath: stringField(metricflow.path),
authTokenRef: stringField(metricflow.auth_token_ref),
});
return sourceArgs;
}
if (input.source === 'lookml') {
applyRepoSourceArgs(sourceArgs, {
repoUrl: stringField(input.connection.repoUrl),
branch: stringField(input.connection.branch),
subpath: stringField(input.connection.path),
authTokenRef: stringField(input.connection.auth_token_ref),
});
const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {};
const expectedLookerConnectionName = stringField(mappings.expectedLookerConnectionName);
if (expectedLookerConnectionName) sourceArgs.sourceTarget = expectedLookerConnectionName;
return sourceArgs;
}
if (input.source === 'metabase') {
sourceArgs.sourceUrl = stringField(input.connection.api_url);
sourceArgs.sourceApiKeyRef = stringField(input.connection.api_key_ref);
const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {};
const databaseMapping = firstStringRecordEntry(mappings.databaseMappings);
if (databaseMapping) {
sourceArgs.metabaseDatabaseId = Number.parseInt(databaseMapping[0], 10);
sourceArgs.sourceWarehouseConnectionId = databaseMapping[1];
}
return sourceArgs;
}
if (input.source === 'looker') {
sourceArgs.sourceUrl = stringField(input.connection.base_url);
sourceArgs.sourceClientId = stringField(input.connection.client_id);
sourceArgs.sourceClientSecretRef = stringField(input.connection.client_secret_ref);
const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {};
const connectionMapping = firstStringRecordEntry(mappings.connectionMappings);
if (connectionMapping) {
sourceArgs.sourceTarget = connectionMapping[0];
sourceArgs.sourceWarehouseConnectionId = connectionMapping[1];
}
return sourceArgs;
}
sourceArgs.sourceApiKeyRef = stringField(input.connection.auth_token_ref);
sourceArgs.notionCrawlMode =
input.connection.crawl_mode === 'all_accessible' ? 'all_accessible' : 'selected_roots';
if (Array.isArray(input.connection.root_page_ids)) {
sourceArgs.notionRootPageIds = input.connection.root_page_ids.filter(
(pageId): pageId is string => typeof pageId === 'string',
);
}
return sourceArgs;
}
async function promptEditedSourceConnection(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connectionId: string;
connection: KtxProjectConnectionConfig;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
testGitRepo?: KtxSetupSourcesDeps['testGitRepo'];
pickNotionRootPages?: KtxSetupSourcesDeps['pickNotionRootPages'];
discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases'];
}): Promise<Extract<InteractiveSourceConnectionChoice, { kind: 'edited' }> | 'back'> {
const sourceArgs = await promptForInteractiveSource(
sourceArgsFromExistingConnection({
args: input.args,
source: input.source,
connectionId: input.connectionId,
connection: input.connection,
}),
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
},
input.connectionId,
input.testGitRepo,
input.discoverMetabaseDatabases,
);
return sourceArgs === 'back'
? 'back'
: { kind: 'edited', connectionId: input.connectionId, args: sourceArgs };
}
async function chooseContextSourceToEdit(input: {
projectDir: string;
prompts: KtxSetupSourcesPromptAdapter;
}): Promise<{ connectionId: string; source: KtxSetupSourceType } | 'back'> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const targets = contextSourceEditTargets(project.config.connections);
if (targets.length === 0) return 'back';
const choice = await input.prompts.select({
message: 'Context source to edit',
options: [
...targets.map((target) => ({
value: target.connectionId,
label: `${target.connectionId} (${sourceLabel(target.source)})`,
})),
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
const target = targets.find((candidate) => candidate.connectionId === choice);
return target ?? 'back';
}
async function chooseInteractiveSourceConnection(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connections: Record<string, KtxProjectConnectionConfig>;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
testGitRepo?: KtxSetupSourcesDeps['testGitRepo'];
pickNotionRootPages?: KtxSetupSourcesDeps['pickNotionRootPages'];
discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases'];
}): Promise<InteractiveSourceConnectionChoice> {
const existingIds = existingConnectionIdsBySource(input.connections, input.source);
const defaultConnectionId = defaultConnectionIdForSource(input.connections, input.source);
const label = sourceLabel(input.source);
if (existingIds.length === 0) {
const sourceArgs = await promptForInteractiveSource(
input.args,
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
},
defaultConnectionId,
input.testGitRepo,
input.discoverMetabaseDatabases,
);
return sourceArgs === 'back' ? 'back' : { kind: 'new', args: sourceArgs };
}
while (true) {
const choice = await input.prompts.select({
message: `Configure ${label}`,
options: [
...existingIds.map((connectionId) => ({
value: `existing:${connectionId}`,
label: `Use existing ${label} connection: ${connectionId}`,
})),
...existingIds.map((connectionId) => ({
value: `edit:${connectionId}`,
label: `Edit existing ${label} connection: ${connectionId}`,
})),
{ value: 'new', label: `Add new ${label} connection` },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
if (choice.startsWith('existing:')) {
const connectionId = choice.slice('existing:'.length);
const connection = input.connections[connectionId];
if (connection) {
return { kind: 'existing', connectionId, connection };
}
continue;
}
if (choice.startsWith('edit:')) {
const connectionId = choice.slice('edit:'.length);
const connection = input.connections[connectionId];
if (!connection) {
continue;
}
const edited = await promptEditedSourceConnection({
args: input.args,
source: input.source,
connectionId,
connection,
prompts: input.prompts,
io: input.io,
testGitRepo: input.testGitRepo,
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
});
if (edited === 'back') {
continue;
}
return edited;
}
const sourceArgs = await promptForInteractiveSource(
input.args,
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
},
defaultConnectionId,
input.testGitRepo,
input.discoverMetabaseDatabases,
);
if (sourceArgs === 'back') {
continue;
}
return { kind: 'new', args: sourceArgs };
}
}
function buildConnection(source: KtxSetupSourceType, args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
if (source === 'dbt') {
return buildDbtConnection(args);
}
if (source === 'metricflow') {
return buildMetricflowConnection(args);
}
if (source === 'metabase') {
return buildMetabaseConnection(args);
}
if (source === 'looker') {
return buildLookerConnection(args);
}
if (source === 'lookml') {
return buildLookmlConnection(args);
}
return buildNotionConnection(args);
}
async function validateSource(
source: KtxSetupSourceType,
args: { projectDir: string; connectionId: string; connection: KtxProjectConnectionConfig },
deps: KtxSetupSourcesDeps,
): Promise<SourceValidationResult> {
if (source === 'dbt') {
return await (deps.validateDbt ?? defaultValidateDbt)(args.connection);
}
if (source === 'metricflow') {
return await (deps.validateMetricflow ?? defaultValidateMetricflow)(args.connection);
}
if (source === 'metabase') {
return deps.validateMetabase
? await deps.validateMetabase(args.projectDir, args.connectionId)
: { ok: true, detail: 'mapping validation runs after the connection is saved' };
}
if (source === 'looker') {
return await (deps.validateLooker ?? defaultValidateLooker)(args.projectDir, args.connectionId);
}
if (source === 'lookml') {
return await (deps.validateLookml ?? defaultValidateLookml)(args.connection);
}
return await (deps.validateNotion ?? defaultValidateNotion)(args.connection);
}
async function saveValidateAndMaybeBuildSource(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
sourceChoice: Exclude<InteractiveSourceConnectionChoice, 'back'>;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
deps: KtxSetupSourcesDeps;
}): Promise<SourceSetupChoiceResult> {
const connectionId =
input.sourceChoice.kind === 'existing'
? input.sourceChoice.connectionId
: input.sourceChoice.kind === 'edited'
? input.sourceChoice.connectionId
: (input.sourceChoice.args.sourceConnectionId ?? `${input.source}-main`);
const connection =
input.sourceChoice.kind === 'existing'
? input.sourceChoice.connection
: buildConnection(input.source, input.sourceChoice.args);
const rollback =
input.sourceChoice.kind === 'existing'
? undefined
: await writeSourceConnection(
input.args.projectDir,
connectionId,
connection,
sourceAdapter(input.source),
);
if (input.sourceChoice.kind === 'existing') {
await ensureSourceAdapterEnabled(input.args.projectDir, input.source);
}
const validation = await validateSource(
input.source,
{ projectDir: input.args.projectDir, connectionId, connection },
input.deps,
);
if (!validation.ok) {
await rollback?.();
input.io.stderr.write(`${validation.message}\n`);
return { status: 'failed' };
}
if (input.source === 'metabase' || input.source === 'looker') {
input.prompts.log?.(`Validating ${sourceLabel(input.source)} mapping…`);
const mappingCode = await (input.deps.runMapping ?? defaultRunMapping)(
input.args.projectDir,
connectionId,
createSetupPrefixedIo(input.io),
);
if (mappingCode !== 0) {
await rollback?.();
return { status: 'failed' };
}
}
if (input.args.runInitialSourceIngest) {
const ingestResult = await runInitialSourceIngestWithRecovery({
args: input.args,
connectionId,
io: input.io,
prompts: input.prompts,
deps: input.deps,
});
if (ingestResult === 'failed') {
await rollback?.();
return { status: 'failed' };
}
if (ingestResult === 'back') {
await rollback?.();
return { status: 'back' };
}
} else {
input.io.stdout.write(`│ Context source ${connectionId} saved. It will be built during the context build step.\n`);
}
return { status: 'ready', connectionId };
}
export async function runKtxSetupSourcesStep(
args: KtxSetupSourcesArgs,
io: KtxCliIo,
deps: KtxSetupSourcesDeps = {},
): Promise<KtxSetupSourcesResult> {
try {
if (args.skipSources) {
await markSourcesComplete(args.projectDir);
io.stdout.write('│ Context source setup skipped.\n');
return { status: 'skipped', projectDir: args.projectDir };
}
const prompts = deps.prompts ?? createPromptAdapter();
const project = await loadKtxProject({ projectDir: args.projectDir });
if (!hasPrimarySource(project.config)) {
const message = 'Connect a primary source before adding context sources.';
if (args.source) {
io.stderr.write(`${message}\n`);
return { status: 'failed', projectDir: args.projectDir };
}
if (args.inputMode !== 'disabled') {
io.stdout.write(`${message}\n`);
return { status: 'skipped', projectDir: args.projectDir };
}
}
while (true) {
const contextSourceChecklist = sourceChecklistForConnections(
(await loadKtxProject({ projectDir: args.projectDir })).config.connections,
);
const selected = args.source
? [args.source]
: args.inputMode === 'disabled'
? []
: await prompts.multiselect({
message: withMultiselectNavigation('Which context sources should KTX ingest?'),
options: contextSourceChecklist.options,
...(contextSourceChecklist.initialValues.length > 0
? { initialValues: contextSourceChecklist.initialValues }
: {}),
required: false,
});
if (selected.includes('back')) {
return { status: 'back', projectDir: args.projectDir };
}
if (selected.length === 0) {
if (args.inputMode === 'disabled') {
io.stderr.write('Missing context source selection: pass --source or --skip-sources.\n');
return { status: 'missing-input', projectDir: args.projectDir };
}
await markSourcesComplete(args.projectDir);
io.stdout.write('│ No context sources selected.\n');
return { status: 'skipped', projectDir: args.projectDir };
}
const readyConnectionIds: string[] = [];
let returnToSourceSelection = false;
for (const source of selected as KtxSetupSourceType[]) {
const sourceChoice = args.source
? ({ kind: 'new', args } as const)
: await chooseInteractiveSourceConnection({
args,
source,
connections: (await loadKtxProject({ projectDir: args.projectDir })).config.connections,
prompts,
io,
testGitRepo: deps.testGitRepo,
pickNotionRootPages: deps.pickNotionRootPages,
discoverMetabaseDatabases: deps.discoverMetabaseDatabases,
});
if (sourceChoice === 'back') {
if (args.source) {
return { status: 'back', projectDir: args.projectDir };
}
returnToSourceSelection = true;
break;
}
const choiceResult = await saveValidateAndMaybeBuildSource({
args,
source,
sourceChoice,
prompts,
io,
deps,
});
if (choiceResult.status === 'failed') {
return { status: 'failed', projectDir: args.projectDir };
}
if (choiceResult.status === 'back') {
if (args.source) {
return { status: 'back', projectDir: args.projectDir };
}
returnToSourceSelection = true;
break;
}
if (!readyConnectionIds.includes(choiceResult.connectionId)) {
readyConnectionIds.push(choiceResult.connectionId);
}
}
if (returnToSourceSelection) {
continue;
}
if (readyConnectionIds.length > 0 && !args.source && args.inputMode !== 'disabled') {
let restartSourceSelection = false;
while (true) {
const addMore = await prompts.select({
message: `${readyConnectionIds.length} context source${readyConnectionIds.length > 1 ? 's' : ''} configured (${readyConnectionIds.join(', ')}). Add another?`,
options: [
{ value: 'done', label: 'Done — continue to context build' },
{ value: 'edit', label: 'Edit an existing context source' },
{ value: 'add', label: 'Add another context source' },
],
});
if (addMore === 'add') {
restartSourceSelection = true;
break;
}
if (addMore === 'edit') {
const editTarget = await chooseContextSourceToEdit({ projectDir: args.projectDir, prompts });
if (editTarget === 'back') {
continue;
}
const projectForEdit = await loadKtxProject({ projectDir: args.projectDir });
const connection = projectForEdit.config.connections[editTarget.connectionId];
if (!connection) {
continue;
}
const sourceChoice = await promptEditedSourceConnection({
args,
source: editTarget.source,
connectionId: editTarget.connectionId,
connection,
prompts,
io,
testGitRepo: deps.testGitRepo,
pickNotionRootPages: deps.pickNotionRootPages,
discoverMetabaseDatabases: deps.discoverMetabaseDatabases,
});
if (sourceChoice === 'back') {
continue;
}
const choiceResult = await saveValidateAndMaybeBuildSource({
args,
source: editTarget.source,
sourceChoice,
prompts,
io,
deps,
});
if (choiceResult.status === 'failed') {
return { status: 'failed', projectDir: args.projectDir };
}
if (choiceResult.status === 'back') {
continue;
}
if (!readyConnectionIds.includes(choiceResult.connectionId)) {
readyConnectionIds.push(choiceResult.connectionId);
}
continue;
}
break;
}
if (restartSourceSelection) {
continue;
}
}
await markSourcesComplete(args.projectDir);
return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds };
}
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return { status: 'failed', projectDir: args.projectDir };
}
}