ktx/packages/cli/src/setup-sources.ts
Andrey Avtomonov f8db99811a
feat(context): add driver-discriminated connection schemas (#96)
* refactor(context): export and describe mapping shape schemas

* feat(context): add driver-schemas module with warehouse drivers

* feat(context): add metabase, looker, lookml driver schemas with mappings

* feat(context): add notion, dbt, metricflow driver schemas

* refactor(context): make connectionSchema a driver-discriminated union

* chore(context): re-export KtxConnectionConfig from project package

* docs(context): add connection driver schema plan

* chore(secrets): allowlist example credentials in driver-schemas fixtures

* test(cli): align metabase fixtures with required api_url field

The driver-discriminated union added in this branch now requires api_url
for metabase connections and a known driver for warehouses. Update slow
CLI test fixtures and assertions so they exercise the new schema:
- ingest.test-utils.ts: add api_url to the prod-metabase fixture.
- setup.test.ts: switch metabase fixture from 'url' to 'api_url'.
- local-scan-connectors.test.ts: invalid-driver/missing-driver tests now
  expect the schema error from loadKtxProject (parse-time rejection).
2026-05-15 00:08:11 +02:00

1974 lines
70 KiB
TypeScript

import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join, relative, resolve } from 'node:path';
import { fileURLToPath, pathToFileURL } from 'node:url';
import { localConnectionTypeForConfig, resolveNotionAuthToken } from '@ktx/context/connections';
import { resolveKtxConfigReference } from '@ktx/context/core';
import {
cloneOrPull,
DEFAULT_METABASE_CLIENT_CONFIG,
discoverMetabaseDatabases,
type DiscoveredMetabaseDatabase,
loadDbtSchemaFiles,
loadProjectInfo,
MetabaseClient,
type NotionApi,
NotionClient,
parseLookmlStagedDir,
parseMetricflowFiles,
testRepoConnection,
} from '@ktx/context/ingest';
import {
type KtxProjectConfig,
type KtxProjectConnectionConfig,
loadKtxProject,
markKtxSetupStateStepComplete,
serializeKtxProjectConfig,
} from '@ktx/context/project';
import type { KtxCliIo } from './cli-runtime.js';
import { pickNotionRootPages } from './notion-page-picker.js';
import { runKtxSourceMapping } from './source-mapping.js';
import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js';
import { runKtxPublicIngest } from './public-ingest.js';
import { writeProjectLocalSecretReference } from './setup-secrets.js';
import {
createKtxSetupPromptAdapter,
type KtxSetupPromptOption,
} from './setup-prompts.js';
export type KtxSetupSourceType = 'dbt' | 'metricflow' | 'metabase' | 'looker' | 'lookml' | 'notion';
const DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN = 25;
export interface KtxSetupSourcesArgs {
projectDir: string;
inputMode: 'auto' | 'disabled';
source?: KtxSetupSourceType;
sourceConnectionId?: string;
sourcePath?: string;
sourceGitUrl?: string;
sourceBranch?: string;
sourceSubpath?: string;
sourceAuthTokenRef?: string;
sourceUrl?: string;
sourceApiKeyRef?: string;
sourceClientId?: string;
sourceClientSecretRef?: string;
sourceWarehouseConnectionId?: string;
sourceProjectName?: string;
sourceProfilesPath?: string;
sourceTarget?: string;
metabaseDatabaseId?: number;
notionCrawlMode?: 'all_accessible' | 'selected_roots';
notionRootPageIds?: string[];
runInitialSourceIngest: boolean;
skipSources: boolean;
}
export type KtxSetupSourcesResult =
| { status: 'ready'; projectDir: string; connectionIds: string[] }
| { status: 'skipped'; projectDir: string }
| { status: 'back'; projectDir: string }
| { status: 'missing-input'; projectDir: string }
| { status: 'failed'; projectDir: string };
export interface KtxSetupSourcesPromptAdapter {
multiselect(options: {
message: string;
options: KtxSetupPromptOption[];
initialValues?: string[];
required?: boolean;
}): Promise<string[]>;
select(options: { message: string; options: KtxSetupPromptOption[] }): Promise<string>;
text(options: { message: string; placeholder?: string; initialValue?: string }): Promise<string | undefined>;
password(options: { message: string }): Promise<string | undefined>;
cancel(message: string): void;
log?(message: string): void;
}
export type SourceValidationResult = { ok: true; detail?: string } | { ok: false; message: string };
export interface KtxSetupSourcesDeps {
prompts?: KtxSetupSourcesPromptAdapter;
testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>;
validateDbt?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
validateMetricflow?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
validateMetabase?: (projectDir: string, connectionId: string) => Promise<SourceValidationResult>;
validateLooker?: (projectDir: string, connectionId: string) => Promise<SourceValidationResult>;
validateLookml?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
validateNotion?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
pickNotionRootPages?: typeof pickNotionRootPages;
discoverMetabaseDatabases?: (args: {
sourceUrl: string;
sourceApiKeyRef: string;
sourceConnectionId: string;
}) => Promise<DiscoveredMetabaseDatabase[]>;
runMapping?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
runInitialIngest?: (
projectDir: string,
connectionId: string,
io: KtxCliIo,
options: { inputMode: KtxSetupSourcesArgs['inputMode'] },
) => Promise<number>;
}
const SOURCE_OPTIONS: Array<{ value: KtxSetupSourceType; label: string }> = [
{ value: 'dbt', label: 'dbt' },
{ value: 'metricflow', label: 'MetricFlow' },
{ value: 'metabase', label: 'Metabase' },
{ value: 'looker', label: 'Looker' },
{ value: 'lookml', label: 'LookML' },
{ value: 'notion', label: 'Notion' },
];
const SOURCE_LABELS = Object.fromEntries(SOURCE_OPTIONS.map((option) => [option.value, option.label])) as Record<
KtxSetupSourceType,
string
>;
const PRIMARY_SOURCE_DRIVERS = new Set([
'sqlite',
'postgres',
'mysql',
'clickhouse',
'sqlserver',
'bigquery',
'snowflake',
]);
function createPromptAdapter(): KtxSetupSourcesPromptAdapter {
return createKtxSetupPromptAdapter({
selectCancelValue: 'back',
multiselectCancelValue: 'back',
confirmEmptyOptionalMultiselect: true,
});
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function stringField(value: unknown): string | undefined {
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined;
}
function sourceLabel(source: KtxSetupSourceType): string {
return SOURCE_LABELS[source];
}
function sourceAdapter(source: KtxSetupSourceType): string {
return source;
}
function connectionNamePrompt(label: string): string {
return `Name this ${label} connection\nKTX will use this short name in commands and config. You can rename it now.`;
}
function sourceSubpathPrompt(source: KtxSetupSourceType): string {
if (source === 'dbt') {
return [
'Folder containing dbt_project.yml (optional)',
'Press Enter when dbt_project.yml is at the repo root.',
'For monorepos, enter a relative path like analytics/dbt.',
].join('\n');
}
return [
`${sourceLabel(source)} project folder (optional)`,
'If the project files are inside a subfolder, enter that path.',
'Press Enter if the path or repo already points at the project.',
].join('\n');
}
const SCAN_SKIP_DIRS = new Set(['.git', 'node_modules', '.venv', 'target', 'dbt_packages', 'dbt_modules', '__pycache__']);
async function findDbtProjectSubpaths(rootDir: string): Promise<string[]> {
const entries = await readdir(rootDir, { withFileTypes: true, recursive: true });
const subpaths: string[] = [];
for (const entry of entries) {
if (!entry.isFile()) continue;
if (entry.name !== 'dbt_project.yml' && entry.name !== 'dbt_project.yaml') continue;
const relDir = relative(rootDir, entry.parentPath);
if (relDir.split('/').some((part) => SCAN_SKIP_DIRS.has(part))) continue;
subpaths.push(relDir);
}
return subpaths;
}
async function promptText(
prompts: KtxSetupSourcesPromptAdapter,
options: { message: string; placeholder?: string; initialValue?: string },
): Promise<string | undefined> {
return await prompts.text({ ...options, message: withTextInputNavigation(options.message) });
}
function assertSafeConnectionId(connectionId: string): void {
if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) {
throw new Error(`Unsafe connection id: ${connectionId}`);
}
}
function credentialRef(value: string | undefined, label: string): string {
const ref = value?.trim();
if (!ref) {
throw new Error(`Missing ${label}; use env:NAME or file:/absolute/path`);
}
if (!ref.startsWith('env:') && !ref.startsWith('file:')) {
throw new Error(`${label} must use env:NAME or file:/absolute/path`);
}
return ref;
}
async function chooseSourceCredentialRef(input: {
prompts: KtxSetupSourcesPromptAdapter;
projectDir: string;
label: string;
envName: string;
secretFileName: string;
existingRef?: string;
}): Promise<string | 'back'> {
while (true) {
const choice = await input.prompts.select({
message: `How should KTX find your ${input.label}?`,
options: [
...(input.existingRef ? [{ value: 'keep', label: 'Keep existing credential' }] : []),
{ value: 'env', label: `Use ${input.envName} from the environment` },
{ value: 'paste', label: 'Paste a key and save it as a local secret file' },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
if (choice === 'keep' && input.existingRef) return input.existingRef;
if (choice === 'paste') {
const value = await input.prompts.password({ message: input.label });
if (value === undefined) continue;
if (!value.trim()) continue;
const ref = await writeProjectLocalSecretReference({
projectDir: input.projectDir,
fileName: input.secretFileName,
value,
});
input.prompts.log?.(`Saved to .ktx/secrets/${input.secretFileName}`);
return ref;
}
return `env:${input.envName}`;
}
}
async function chooseGitAuthCredentialRef(input: {
prompts: KtxSetupSourcesPromptAdapter;
projectDir: string;
source: KtxSetupSourceType;
connectionId: string;
existingRef?: string;
repoUrl?: string;
testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>;
}): Promise<string | undefined | 'back'> {
const label = input.source === 'dbt' ? 'This' : `This ${sourceLabel(input.source)}`;
while (true) {
const choice = await input.prompts.select({
message: `${label} repo requires authentication.`,
options: [
...(input.existingRef ? [{ value: 'keep', label: 'Keep existing credential' }] : []),
{ value: 'env', label: 'Use GITHUB_TOKEN from the environment' },
{ value: 'paste', label: 'Paste a token and save it as a local secret file' },
{ value: 'skip', label: 'Skip — try without authentication' },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
if (choice === 'keep' && input.existingRef) return input.existingRef;
if (choice === 'skip') return undefined;
if (choice === 'paste') {
const value = await input.prompts.password({ message: 'Git access token' });
if (value === undefined) continue;
if (!value.trim()) continue;
if (input.testGitRepo && input.repoUrl) {
const result = await input.testGitRepo({ repoUrl: input.repoUrl, authToken: value });
if (!result.ok) {
input.prompts.log?.(`Authentication failed: ${result.error}`);
continue;
}
}
const fileName = `${input.connectionId}-auth-token`;
const ref = await writeProjectLocalSecretReference({
projectDir: input.projectDir,
fileName,
value,
});
input.prompts.log?.(`Saved to .ktx/secrets/${fileName}`);
return ref;
}
return 'env:GITHUB_TOKEN';
}
}
function repoOrLocalSource(args: KtxSetupSourcesArgs): { sourceDir?: string; repoUrl?: string } {
if (args.sourcePath && args.sourceGitUrl) {
throw new Error('Choose only one source location: --source-path or --source-git-url.');
}
if (args.sourcePath) {
return { sourceDir: resolve(args.sourcePath) };
}
if (args.sourceGitUrl) {
return { repoUrl: args.sourceGitUrl };
}
throw new Error('Missing source location: pass --source-path or --source-git-url.');
}
function fileRepoUrl(sourceDir: string): string {
return pathToFileURL(sourceDir).toString();
}
async function writeProjectConfig(projectDir: string, config: KtxProjectConfig): Promise<void> {
const project = await loadKtxProject({ projectDir });
await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8');
}
async function writeSourceConnection(
projectDir: string,
connectionId: string,
connection: KtxProjectConnectionConfig,
adapter: string,
): Promise<() => Promise<void>> {
assertSafeConnectionId(connectionId);
const project = await loadKtxProject({ projectDir });
const previousConnection = project.config.connections[connectionId];
const hadPreviousConnection = previousConnection !== undefined;
const shouldRemoveAdapterOnRollback = !project.config.ingest.adapters.includes(adapter);
const config = {
...project.config,
connections: {
...project.config.connections,
[connectionId]: connection,
},
ingest: {
...project.config.ingest,
adapters: project.config.ingest.adapters.includes(adapter)
? [...project.config.ingest.adapters]
: [...project.config.ingest.adapters, adapter],
},
};
await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8');
return async () => {
const latest = await loadKtxProject({ projectDir });
const connections = { ...latest.config.connections };
if (hadPreviousConnection) {
connections[connectionId] = previousConnection;
} else {
delete connections[connectionId];
}
await writeProjectConfig(projectDir, {
...latest.config,
connections,
ingest: {
...latest.config.ingest,
adapters: shouldRemoveAdapterOnRollback
? latest.config.ingest.adapters.filter((candidate) => candidate !== adapter)
: latest.config.ingest.adapters,
},
});
};
}
async function ensureSourceAdapterEnabled(projectDir: string, source: KtxSetupSourceType): Promise<void> {
const adapter = sourceAdapter(source);
const project = await loadKtxProject({ projectDir });
if (project.config.ingest.adapters.includes(adapter)) {
return;
}
await writeProjectConfig(projectDir, {
...project.config,
ingest: {
...project.config.ingest,
adapters: [...project.config.ingest.adapters, adapter],
},
});
}
async function markSourcesComplete(projectDir: string): Promise<void> {
const project = await loadKtxProject({ projectDir });
await writeFile(project.configPath, serializeKtxProjectConfig(project.config), 'utf-8');
await markKtxSetupStateStepComplete(projectDir, 'sources');
}
function hasPrimarySource(config: KtxProjectConfig): boolean {
const setupPrimaryIds = config.setup?.database_connection_ids ?? [];
if (setupPrimaryIds.some((connectionId) => Object.hasOwn(config.connections, connectionId))) {
return true;
}
return Object.values(config.connections).some((connection) =>
PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()),
);
}
function buildDbtConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const source = repoOrLocalSource(args);
return {
driver: 'dbt',
...(source.sourceDir ? { source_dir: source.sourceDir } : {}),
...(source.repoUrl ? { repo_url: source.repoUrl } : {}),
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
...(args.sourceAuthTokenRef
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'dbt private repo access token') }
: {}),
...(args.sourceProfilesPath ? { profiles_path: resolve(args.sourceProfilesPath) } : {}),
...(args.sourceTarget ? { target: args.sourceTarget } : {}),
...(args.sourceProjectName ? { project_name: args.sourceProjectName } : {}),
};
}
function buildMetricflowConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const source = repoOrLocalSource(args);
return {
driver: 'metricflow',
metricflow: {
repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''),
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
...(args.sourceAuthTokenRef
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'MetricFlow auth token ref') }
: {}),
},
};
}
function buildMetabaseConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
if (!args.sourceUrl) {
throw new Error('Missing Metabase URL: pass --source-url.');
}
if (!args.sourceWarehouseConnectionId) {
throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.');
}
if (!args.metabaseDatabaseId) {
throw new Error('Missing Metabase database id: pass --metabase-database-id.');
}
return {
driver: 'metabase',
api_url: args.sourceUrl,
api_key_ref: credentialRef(args.sourceApiKeyRef, 'Metabase API key ref'),
mappings: {
databaseMappings: { [String(args.metabaseDatabaseId)]: args.sourceWarehouseConnectionId },
syncEnabled: { [String(args.metabaseDatabaseId)]: true },
syncMode: 'ALL',
selections: { collections: [], items: [] },
defaultTagNames: [],
},
};
}
function buildLookerConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
if (!args.sourceUrl) {
throw new Error('Missing Looker base URL: pass --source-url.');
}
if (!args.sourceClientId) {
throw new Error('Missing Looker client id: pass --source-client-id.');
}
if (!args.sourceWarehouseConnectionId) {
throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.');
}
return {
driver: 'looker',
base_url: args.sourceUrl,
client_id: args.sourceClientId,
client_secret_ref: credentialRef(args.sourceClientSecretRef, 'Looker client secret ref'),
mappings: {
connectionMappings: {
[args.sourceTarget ?? args.sourceWarehouseConnectionId]: args.sourceWarehouseConnectionId,
},
},
};
}
function buildLookmlConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const source = repoOrLocalSource(args);
return {
driver: 'lookml',
repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''),
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
...(args.sourceAuthTokenRef
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'LookML auth token ref') }
: {}),
mappings: {
expectedLookerConnectionName: args.sourceTarget ?? args.sourceWarehouseConnectionId ?? null,
},
};
}
function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const rootPageIds = args.notionRootPageIds ?? [];
const crawlMode = rootPageIds.length > 0 ? 'selected_roots' : (args.notionCrawlMode ?? 'selected_roots');
if (crawlMode === 'selected_roots' && rootPageIds.length === 0) {
throw new Error('Notion selected_roots requires --notion-root-page-id.');
}
return {
driver: 'notion',
auth_token_ref: credentialRef(args.sourceApiKeyRef, 'Notion token ref'),
crawl_mode: crawlMode,
...(rootPageIds.length > 0 ? { root_page_ids: rootPageIds } : {}),
root_database_ids: [],
root_data_source_ids: [],
max_pages_per_run: 1000,
max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN,
max_knowledge_updates_per_run: 20,
};
}
function sourcePathFromFileRepoUrl(repoUrl: string, subpath?: string): string {
const root = fileURLToPath(repoUrl);
return subpath ? join(root, subpath) : root;
}
function repoAuthToken(connection: KtxProjectConnectionConfig | Record<string, unknown>): string | null {
const ref = stringField(connection.auth_token_ref);
const literal = stringField(connection.auth_token);
return literal ?? resolveKtxConfigReference(ref, process.env) ?? null;
}
async function collectYamlFilesRecursive(sourceRoot: string): Promise<Array<{ content: string; path: string }>> {
const entries = await readdir(sourceRoot, { withFileTypes: true, recursive: true });
const files: Array<{ content: string; path: string }> = [];
for (const entry of entries) {
if (!entry.isFile() || !/\.ya?ml$/i.test(entry.name)) {
continue;
}
const path = join(entry.parentPath, entry.name);
files.push({ path, content: await readFile(path, 'utf-8') });
}
return files;
}
async function defaultValidateDbt(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
let sourceDir = stringField(connection.source_dir);
const repoUrl = stringField(connection.repo_url);
if (!sourceDir && repoUrl?.startsWith('file:')) {
sourceDir = sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path));
}
if (!sourceDir && repoUrl) {
const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-'));
try {
await cloneOrPull({
repoUrl,
authToken: repoAuthToken(connection),
cacheDir,
branch: stringField(connection.branch) ?? 'main',
});
} catch (error) {
const reason = error instanceof Error ? error.message : String(error);
return { ok: false, message: `Failed to clone ${repoUrl}: ${reason}` };
}
sourceDir = stringField(connection.path) ? join(cacheDir, String(connection.path)) : cacheDir;
}
if (!sourceDir) {
return { ok: false, message: 'dbt setup requires --source-path or --source-git-url.' };
}
const info = await loadProjectInfo(sourceDir);
const schemaFiles = await loadDbtSchemaFiles(sourceDir);
if (!info.projectName && typeof connection.project_name !== 'string') {
return { ok: false, message: 'dbt project metadata is missing project name.' };
}
return { ok: true, detail: `project=${info.projectName ?? connection.project_name} schemas=${schemaFiles.length}` };
}
async function defaultValidateMetricflow(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
const metricflow = isRecord(connection.metricflow) ? connection.metricflow : undefined;
const repoUrl = stringField(metricflow?.repoUrl);
if (!repoUrl) {
return { ok: false, message: 'MetricFlow setup requires repoUrl.' };
}
if (!repoUrl.startsWith('file:')) {
const result = await testRepoConnection({
repoUrl,
authToken: metricflow ? repoAuthToken(metricflow) : null,
});
if (!result.ok) {
return { ok: false, message: result.error };
}
return { ok: true, detail: 'repository reachable' };
}
const path = sourcePathFromFileRepoUrl(repoUrl, stringField(metricflow?.path));
const parsed = parseMetricflowFiles(await collectYamlFilesRecursive(path));
return {
ok: true,
detail: `semanticModels=${parsed.semanticModels.length} metrics=${parsed.crossModelMetrics.length}`,
};
}
async function defaultValidateLooker(projectDir: string, connectionId: string): Promise<SourceValidationResult> {
const code = await runKtxSourceMapping(
{ command: 'refresh', projectDir, connectionId, autoAccept: true },
{ stdout: { write() {} }, stderr: { write() {} } },
);
return code === 0
? { ok: true, detail: 'Looker mapping refreshed' }
: { ok: false, message: 'Looker validation failed' };
}
async function defaultValidateLookml(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
const repoUrl = stringField(connection.repoUrl);
if (!repoUrl) {
return { ok: false, message: 'LookML setup requires repoUrl.' };
}
if (!repoUrl.startsWith('file:')) {
const result = await testRepoConnection({ repoUrl, authToken: repoAuthToken(connection) });
return result.ok ? { ok: true, detail: 'repository reachable' } : { ok: false, message: result.error };
}
const parsed = await parseLookmlStagedDir(sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path)));
const count = parsed.models.length + parsed.views.length + parsed.dashboards.length;
return count > 0 ? { ok: true, detail: `lookmlFiles=${count}` } : { ok: false, message: 'No LookML files found' };
}
async function defaultValidateNotion(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
const token = await resolveNotionAuthToken(String(connection.auth_token_ref));
const client: NotionApi = new NotionClient(token);
await client.retrieveBotUser();
const roots = Array.isArray(connection.root_page_ids)
? connection.root_page_ids.filter((id): id is string => typeof id === 'string')
: [];
for (const root of roots) {
await client.retrievePage(root);
}
return { ok: true, detail: `roots=${roots.length}` };
}
interface MappingJsonOutput {
connectionId: string;
refresh: { ok: boolean; output: string[] };
validation: { ok: boolean; output: string[] };
mappings: unknown[];
}
function splitOutputLines(output: string): string[] {
return output
.split('\n')
.map((line) => line.trim())
.filter(Boolean);
}
function writeSetupPrefixedLines(write: (chunk: string) => void, output: string): void {
for (const line of output.split(/\r?\n/)) {
if (line.length > 0) {
write(`${line}\n`);
}
}
}
function createSetupPrefixedIo(io: KtxCliIo): KtxCliIo {
return {
stdout: {
isTTY: io.stdout.isTTY,
columns: io.stdout.columns,
write(chunk: string) {
writeSetupPrefixedLines((line) => io.stdout.write(line), chunk);
},
},
stderr: {
write(chunk: string) {
writeSetupPrefixedLines((line) => io.stderr.write(line), chunk);
},
},
};
}
function parseMappingListJson(output: string): unknown[] {
const trimmed = output.trim();
if (!trimmed) {
return [];
}
const parsed = JSON.parse(trimmed) as unknown;
return Array.isArray(parsed) ? parsed : [];
}
function summarizeMappingResult(parsed: MappingJsonOutput): string {
const mappingCount = parsed.mappings.length;
const mappingNoun = mappingCount === 1 ? 'mapping' : 'mappings';
return `Mapping validated — ${mappingCount} ${mappingNoun} configured`;
}
async function defaultRunMapping(projectDir: string, connectionId: string, io: KtxCliIo): Promise<number> {
const outputs = {
refresh: '',
validation: '',
list: '',
};
const refreshCode = await runKtxSourceMapping(
{ command: 'refresh', projectDir, connectionId, autoAccept: true },
{
stdout: { write(chunk: string) { outputs.refresh += chunk; } },
stderr: io.stderr,
},
);
if (refreshCode !== 0) {
return refreshCode;
}
const validationCode = await runKtxSourceMapping(
{ command: 'validate', projectDir, connectionId },
{
stdout: { write(chunk: string) { outputs.validation += chunk; } },
stderr: io.stderr,
},
);
if (validationCode !== 0) {
return validationCode;
}
const listCode = await runKtxSourceMapping(
{ command: 'list', projectDir, connectionId, json: true },
{
stdout: { write(chunk: string) { outputs.list += chunk; } },
stderr: io.stderr,
},
);
if (listCode !== 0) {
return listCode;
}
const parsed: MappingJsonOutput = {
connectionId,
refresh: { ok: true, output: splitOutputLines(outputs.refresh) },
validation: { ok: true, output: splitOutputLines(outputs.validation) },
mappings: parseMappingListJson(outputs.list),
};
io.stdout.write(`${summarizeMappingResult(parsed)}\n`);
return 0;
}
async function defaultRunInitialIngest(
projectDir: string,
connectionId: string,
io: KtxCliIo,
options: { inputMode: KtxSetupSourcesArgs['inputMode'] },
): Promise<number> {
return await runKtxPublicIngest(
{
command: 'run',
projectDir,
targetConnectionId: connectionId,
all: false,
json: false,
inputMode: options.inputMode,
},
io,
);
}
async function runInitialSourceIngestWithRecovery(input: {
args: KtxSetupSourcesArgs;
connectionId: string;
io: KtxCliIo;
prompts: KtxSetupSourcesPromptAdapter;
deps: KtxSetupSourcesDeps;
}): Promise<'ready' | 'continue' | 'back' | 'failed'> {
while (true) {
input.io.stdout.write(`│ Building context from ${input.connectionId}. Large sources can take a while.\n`);
const ingestCode = await (input.deps.runInitialIngest ?? defaultRunInitialIngest)(
input.args.projectDir,
input.connectionId,
input.io,
{
inputMode: input.args.inputMode,
},
);
if (ingestCode === 0) {
return 'ready';
}
if (input.args.inputMode === 'disabled') {
return 'failed';
}
const action = await input.prompts.select({
message: `Context build failed for ${input.connectionId}\nRetry now, continue setup and build this source later, or go back.`,
options: [
{ value: 'retry', label: 'Retry context build' },
{ value: 'continue', label: 'Continue setup and build this source later' },
{ value: 'back', label: 'Back' },
],
});
if (action === 'retry') {
continue;
}
if (action === 'continue') {
input.io.stdout.write(`│ Context source saved without a completed context build for ${input.connectionId}.\n`);
input.io.stdout.write(`│ Run later: ktx ingest ${input.connectionId}\n`);
return 'continue';
}
return 'back';
}
}
type SourceLocationChoice = 'path' | 'git';
type SourcePromptState = KtxSetupSourcesArgs & {
sourceLocation?: SourceLocationChoice;
};
type SourcePromptStep = (state: SourcePromptState) => Promise<'next' | 'back'>;
interface WarehouseConnectionChoice {
id: string;
connectionType: string;
}
type InteractiveSourceConnectionChoice =
| { kind: 'existing'; connectionId: string; connection: KtxProjectConnectionConfig }
| { kind: 'new'; args: KtxSetupSourcesArgs }
| { kind: 'edited'; connectionId: string; args: KtxSetupSourcesArgs }
| 'back';
type SourceSetupChoiceResult =
| { status: 'ready'; connectionId: string }
| { status: 'back' }
| { status: 'failed' };
async function runSourcePromptSteps(
initialState: SourcePromptState,
stepsForState: (state: SourcePromptState) => SourcePromptStep[],
): Promise<KtxSetupSourcesArgs | 'back'> {
let stepIndex = 0;
while (true) {
const steps = stepsForState(initialState);
if (stepIndex >= steps.length) {
const { sourceLocation: _sourceLocation, ...sourceArgs } = initialState;
return sourceArgs;
}
const result = await steps[stepIndex]?.(initialState);
if (result === 'back') {
if (stepIndex === 0) {
return 'back';
}
stepIndex -= 1;
continue;
}
stepIndex += 1;
}
}
function resetRepoLocationFields(state: SourcePromptState): void {
delete state.sourcePath;
delete state.sourceGitUrl;
delete state.sourceBranch;
delete state.sourceAuthTokenRef;
delete state.sourceSubpath;
delete state.sourceProjectName;
}
function sourceLocationFromArgs(args: KtxSetupSourcesArgs): SourceLocationChoice | undefined {
if (args.sourcePath) return 'path';
if (args.sourceGitUrl) return 'git';
return undefined;
}
function warehouseConnectionChoices(config: KtxProjectConfig): WarehouseConnectionChoice[] {
return Object.entries(config.connections)
.filter(([, connection]) => PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()))
.map(([id, connection]) => ({ id, connectionType: localConnectionTypeForConfig(id, connection) }))
.sort((left, right) => left.id.localeCompare(right.id));
}
async function chooseMappedWarehouseConnectionId(input: {
projectDir: string;
prompts: KtxSetupSourcesPromptAdapter;
}): Promise<string | 'back'> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const choices = warehouseConnectionChoices(project.config);
if (choices.length === 1) {
return choices[0].id;
}
if (choices.length === 0) {
const entered = await promptText(input.prompts, { message: 'Mapped warehouse connection id' });
return entered === undefined ? 'back' : entered;
}
const selected = await input.prompts.select({
message: 'Mapped warehouse connection',
options: [
...choices.map((choice) => ({
value: choice.id,
label: `${choice.id} (${choice.connectionType})`,
})),
{ value: 'back', label: 'Back' },
],
});
return selected === 'back' ? 'back' : selected;
}
async function defaultDiscoverMetabaseDatabases(input: {
sourceUrl: string;
sourceApiKeyRef: string;
}): Promise<DiscoveredMetabaseDatabase[]> {
const apiKey = resolveKtxConfigReference(input.sourceApiKeyRef, process.env);
if (!apiKey) {
throw new Error('Metabase API key ref could not be resolved');
}
const client = new MetabaseClient(
{ apiUrl: input.sourceUrl, apiKey },
DEFAULT_METABASE_CLIENT_CONFIG,
);
try {
return await discoverMetabaseDatabases(client);
} finally {
await client.cleanup();
}
}
function metabaseDatabaseLabel(database: DiscoveredMetabaseDatabase): string {
const detail = [database.engine].filter(Boolean).join(', ');
return detail ? `${database.id}: ${database.name} (${detail})` : `${database.id}: ${database.name}`;
}
async function chooseMetabaseDatabaseId(input: {
state: SourcePromptState;
prompts: KtxSetupSourcesPromptAdapter;
deps: KtxSetupSourcesDeps;
}): Promise<number | 'back'> {
const sourceUrl = input.state.sourceUrl;
const sourceApiKeyRef = input.state.sourceApiKeyRef;
if (sourceUrl && sourceApiKeyRef) {
try {
const discovered = await (input.deps.discoverMetabaseDatabases ?? defaultDiscoverMetabaseDatabases)({
sourceUrl,
sourceApiKeyRef,
sourceConnectionId: input.state.sourceConnectionId ?? 'metabase-main',
});
if (discovered.length === 1) {
return discovered[0].id;
}
if (discovered.length > 1) {
const selected = await input.prompts.select({
message: 'Metabase database',
options: [
...discovered
.slice()
.sort((left, right) => left.id - right.id)
.map((database) => ({
value: String(database.id),
label: metabaseDatabaseLabel(database),
})),
{ value: 'back', label: 'Back' },
],
});
return selected === 'back' ? 'back' : Number.parseInt(selected, 10);
}
} catch {
// Discovery is a convenience. Fall back to the raw id prompt when credentials
// are unavailable locally or the Metabase API cannot be reached yet.
}
}
const databaseId = await promptText(input.prompts, { message: 'Metabase database id' });
return databaseId === undefined ? 'back' : Number.parseInt(databaseId, 10);
}
function connectionIdPromptSteps(
args: KtxSetupSourcesArgs,
source: KtxSetupSourceType,
prompts: KtxSetupSourcesPromptAdapter,
defaultConnectionId: string,
): SourcePromptStep[] {
if (args.sourceConnectionId) {
return [];
}
return [
async (state) => {
const enteredConnectionId = await promptText(prompts, {
message: connectionNamePrompt(sourceLabel(source)),
placeholder: defaultConnectionId,
initialValue: defaultConnectionId,
});
if (enteredConnectionId === undefined) {
return 'back';
}
state.sourceConnectionId = enteredConnectionId.trim() || defaultConnectionId;
return 'next';
},
];
}
async function promptForInteractiveSource(
args: KtxSetupSourcesArgs,
source: KtxSetupSourceType,
prompts: KtxSetupSourcesPromptAdapter,
io: KtxCliIo,
deps: KtxSetupSourcesDeps,
defaultConnectionId = `${source}-main`,
testGitRepo: KtxSetupSourcesDeps['testGitRepo'] = testRepoConnection,
discoverMetabaseDatabaseList?: KtxSetupSourcesDeps['discoverMetabaseDatabases'],
): Promise<KtxSetupSourcesArgs | 'back'> {
const initialState: SourcePromptState = { ...args, source, sourceLocation: sourceLocationFromArgs(args) };
if (args.sourceConnectionId) {
initialState.sourceConnectionId = args.sourceConnectionId;
}
const connectionSteps = connectionIdPromptSteps(args, source, prompts, defaultConnectionId);
if (source === 'dbt' || source === 'metricflow' || source === 'lookml') {
return await runSourcePromptSteps(initialState, (state) => [
...connectionSteps,
async () => {
const selectedLocation = await prompts.select({
message: `${source} source location`,
options: [
{ value: 'path', label: 'Local path' },
{ value: 'git', label: 'Git URL' },
{ value: 'back', label: 'Back' },
],
});
if (selectedLocation !== 'path' && selectedLocation !== 'git') {
return 'back';
}
if (state.sourceLocation !== selectedLocation) {
resetRepoLocationFields(state);
}
state.sourceLocation = selectedLocation;
return 'next';
},
...(state.sourceLocation === 'path'
? [
async (currentState: SourcePromptState) => {
const sourcePath = await promptText(prompts, {
message: `${source} local path`,
...(currentState.sourcePath ? { initialValue: currentState.sourcePath } : {}),
});
if (sourcePath === undefined) return 'back';
currentState.sourcePath = sourcePath;
return 'next';
},
]
: []),
...(state.sourceLocation === 'git'
? [
async (currentState: SourcePromptState) => {
const sourceGitUrl = await promptText(prompts, {
message: `${source} git URL`,
...(currentState.sourceGitUrl ? { initialValue: currentState.sourceGitUrl } : {}),
});
if (sourceGitUrl === undefined) return 'back';
currentState.sourceGitUrl = sourceGitUrl;
return 'next';
},
async (currentState: SourcePromptState) => {
const branch = await promptText(prompts, {
message: `${source} git branch`,
initialValue: currentState.sourceBranch ?? 'main',
});
if (branch === undefined) return 'back';
currentState.sourceBranch = branch || 'main';
return 'next';
},
]
: []),
...(state.sourceLocation === 'git'
? [
async (currentState: SourcePromptState) => {
const result = await testGitRepo!({ repoUrl: currentState.sourceGitUrl! });
if (result.ok) {
delete currentState.sourceAuthTokenRef;
prompts.log?.('Repository connected.');
return 'next';
}
const authRef = await chooseGitAuthCredentialRef({
prompts,
projectDir: args.projectDir,
source,
connectionId: currentState.sourceConnectionId ?? `${source}-main`,
existingRef: currentState.sourceAuthTokenRef,
repoUrl: currentState.sourceGitUrl,
testGitRepo,
});
if (authRef === 'back') return 'back';
if (authRef) {
currentState.sourceAuthTokenRef = authRef;
} else {
delete currentState.sourceAuthTokenRef;
}
return 'next';
},
]
: []),
...(state.sourceLocation
? [
async (currentState: SourcePromptState) => {
if (source === 'dbt') {
let scanDir: string | undefined;
if (currentState.sourceLocation === 'path' && currentState.sourcePath) {
scanDir = currentState.sourcePath;
} else if (currentState.sourceLocation === 'git' && currentState.sourceGitUrl) {
try {
const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-scan-'));
const authToken = currentState.sourceAuthTokenRef
? resolveKtxConfigReference(currentState.sourceAuthTokenRef, process.env)
: null;
await cloneOrPull({
repoUrl: currentState.sourceGitUrl,
authToken,
cacheDir,
branch: currentState.sourceBranch ?? 'main',
});
scanDir = cacheDir;
} catch {
// Clone failed — fall through to manual prompt
}
}
if (scanDir) {
try {
const subpaths = await findDbtProjectSubpaths(scanDir);
if (subpaths.length === 1) {
const found = subpaths[0]!;
if (found) {
currentState.sourceSubpath = found;
prompts.log?.(`Found dbt_project.yml in ${found}/`);
} else {
delete currentState.sourceSubpath;
}
return 'next';
}
if (subpaths.length > 1) {
const selected = await prompts.select({
message: 'Multiple dbt projects found — which one should KTX use?',
options: [
...subpaths.map((p) => ({ value: p || '.', label: p || '(project root)' })),
{ value: 'back', label: 'Back' },
],
});
if (selected === 'back') return 'back';
const subpath = selected === '.' ? '' : selected;
if (subpath) {
currentState.sourceSubpath = subpath;
} else {
delete currentState.sourceSubpath;
}
return 'next';
}
} catch {
// Directory unreadable — fall through to manual prompt
}
}
}
const subpath = await promptText(prompts, {
message: sourceSubpathPrompt(source),
placeholder: 'optional',
...(currentState.sourceSubpath ? { initialValue: currentState.sourceSubpath } : {}),
});
if (subpath === undefined) return 'back';
if (subpath) {
currentState.sourceSubpath = subpath;
} else {
delete currentState.sourceSubpath;
}
return 'next';
},
]
: []),
]);
}
if (source === 'metabase') {
return await runSourcePromptSteps(initialState, () => [
...connectionSteps,
async (state) => {
const sourceUrl = await promptText(prompts, {
message: 'Metabase URL',
...(state.sourceUrl ? { initialValue: state.sourceUrl } : {}),
});
if (sourceUrl === undefined) return 'back';
state.sourceUrl = sourceUrl;
return 'next';
},
async (state) => {
const ref = await chooseSourceCredentialRef({
prompts,
projectDir: args.projectDir,
label: 'Metabase API key',
envName: 'METABASE_API_KEY',
secretFileName: `${state.sourceConnectionId ?? 'metabase-main'}-api-key`,
existingRef: state.sourceApiKeyRef,
});
if (ref === 'back') return 'back';
state.sourceApiKeyRef = ref;
return 'next';
},
async (state) => {
const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({
projectDir: args.projectDir,
prompts,
});
if (sourceWarehouseConnectionId === 'back') return 'back';
state.sourceWarehouseConnectionId = sourceWarehouseConnectionId;
return 'next';
},
async (state) => {
const databaseId = await chooseMetabaseDatabaseId({
state,
prompts,
deps: { discoverMetabaseDatabases: discoverMetabaseDatabaseList },
});
if (databaseId === 'back') return 'back';
state.metabaseDatabaseId = databaseId;
return 'next';
},
]);
}
if (source === 'looker') {
return await runSourcePromptSteps(initialState, () => [
...connectionSteps,
async (state) => {
const sourceUrl = await promptText(prompts, {
message: 'Looker base URL',
...(state.sourceUrl ? { initialValue: state.sourceUrl } : {}),
});
if (sourceUrl === undefined) return 'back';
state.sourceUrl = sourceUrl;
return 'next';
},
async (state) => {
const sourceClientId = await promptText(prompts, {
message: 'Looker client id',
...(state.sourceClientId ? { initialValue: state.sourceClientId } : {}),
});
if (sourceClientId === undefined) return 'back';
state.sourceClientId = sourceClientId;
return 'next';
},
async (state) => {
const ref = await chooseSourceCredentialRef({
prompts,
projectDir: args.projectDir,
label: 'Looker client secret',
envName: 'LOOKER_CLIENT_SECRET',
secretFileName: `${state.sourceConnectionId ?? 'looker-main'}-client-secret`,
existingRef: state.sourceClientSecretRef,
});
if (ref === 'back') return 'back';
state.sourceClientSecretRef = ref;
return 'next';
},
async (state) => {
const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({
projectDir: args.projectDir,
prompts,
});
if (sourceWarehouseConnectionId === 'back') return 'back';
state.sourceWarehouseConnectionId = sourceWarehouseConnectionId;
return 'next';
},
async (state) => {
const lookerConnectionName = await promptText(prompts, {
message: 'Looker connection name',
placeholder: 'optional',
...(state.sourceTarget ? { initialValue: state.sourceTarget } : {}),
});
if (lookerConnectionName === undefined) return 'back';
if (lookerConnectionName) {
state.sourceTarget = lookerConnectionName;
} else {
delete state.sourceTarget;
}
return 'next';
},
]);
}
return await runSourcePromptSteps(initialState, (state) => [
...connectionSteps,
async (currentState) => {
const ref = await chooseSourceCredentialRef({
prompts,
projectDir: args.projectDir,
label: 'Notion integration token',
envName: 'NOTION_TOKEN',
secretFileName: `${currentState.sourceConnectionId ?? 'notion-main'}-token`,
existingRef: currentState.sourceApiKeyRef,
});
if (ref === 'back') return 'back';
currentState.sourceApiKeyRef = ref;
return 'next';
},
async (currentState) => {
const crawlMode = await prompts.select({
message: 'Which Notion pages should KTX ingest?',
options: [
{ value: 'selected_roots', label: 'Specific pages and their subpages (choose them in a picker)' },
{ value: 'all_accessible', label: 'All pages the integration can access' },
{ value: 'back', label: 'Back' },
],
});
if (crawlMode === 'back') return 'back';
currentState.notionCrawlMode = crawlMode === 'all_accessible' ? 'all_accessible' : 'selected_roots';
if (currentState.notionCrawlMode === 'all_accessible') {
delete currentState.notionRootPageIds;
}
return 'next';
},
...(state.notionCrawlMode === 'selected_roots'
? [
async (currentState: SourcePromptState) => {
const connectionId = currentState.sourceConnectionId ?? 'notion-main';
const result = await (deps.pickNotionRootPages ?? pickNotionRootPages)(
{
connectionId,
connection: {
driver: 'notion',
auth_token_ref: credentialRef(currentState.sourceApiKeyRef, 'Notion token ref'),
crawl_mode: 'selected_roots',
root_page_ids: currentState.notionRootPageIds ?? [],
root_database_ids: [],
root_data_source_ids: [],
},
},
io,
);
if (result.kind === 'back') {
return 'back';
}
if (result.kind === 'unavailable') {
io.stderr.write(`${result.message}\n`);
return 'back';
}
currentState.notionRootPageIds = result.rootPageIds;
return 'next';
},
]
: []),
]);
}
function existingConnectionIdsBySource(
connections: Record<string, KtxProjectConnectionConfig>,
source: KtxSetupSourceType,
): string[] {
return Object.entries(connections)
.filter(([, connection]) => String(connection.driver ?? '').toLowerCase() === source)
.map(([connectionId]) => connectionId)
.sort((left, right) => left.localeCompare(right));
}
function sourceTypeForConnection(connection: KtxProjectConnectionConfig): KtxSetupSourceType | null {
const driver = String(connection.driver ?? '').toLowerCase();
return SOURCE_OPTIONS.some((option) => option.value === driver) ? (driver as KtxSetupSourceType) : null;
}
function contextSourceEditTargets(connections: Record<string, KtxProjectConnectionConfig>): Array<{
connectionId: string;
source: KtxSetupSourceType;
}> {
return Object.entries(connections)
.map(([connectionId, connection]) => {
const source = sourceTypeForConnection(connection);
return source ? { connectionId, source } : null;
})
.filter((target): target is { connectionId: string; source: KtxSetupSourceType } => target !== null)
.sort((left, right) => left.connectionId.localeCompare(right.connectionId));
}
function sourceChecklistForConnections(connections: Record<string, KtxProjectConnectionConfig>): {
options: Array<{ value: KtxSetupSourceType; label: string; hint?: string }>;
initialValues: KtxSetupSourceType[];
} {
const initialValues: KtxSetupSourceType[] = [];
const options = SOURCE_OPTIONS.map((option) => {
const existingIds = existingConnectionIdsBySource(connections, option.value);
if (existingIds.length === 0) {
return option;
}
initialValues.push(option.value);
return { ...option, hint: `configured: ${existingIds.join(', ')}` };
});
return { options, initialValues };
}
function defaultConnectionIdForSource(
connections: Record<string, KtxProjectConnectionConfig>,
source: KtxSetupSourceType,
): string {
const base = `${source}-main`;
if (!connections[base]) {
return base;
}
let index = 2;
while (connections[`${base}-${index}`]) {
index += 1;
}
return `${base}-${index}`;
}
function firstStringRecordEntry(value: unknown): [string, string] | undefined {
if (!isRecord(value)) return undefined;
for (const [key, raw] of Object.entries(value)) {
if (typeof raw === 'string' && raw.trim().length > 0) {
return [key, raw.trim()];
}
}
return undefined;
}
function applyRepoSourceArgs(
args: KtxSetupSourcesArgs,
input: { repoUrl?: string; sourceDir?: string; branch?: string; subpath?: string; authTokenRef?: string },
): void {
if (input.sourceDir) {
args.sourcePath = input.sourceDir;
} else if (input.repoUrl?.startsWith('file:')) {
args.sourcePath = fileURLToPath(input.repoUrl);
} else if (input.repoUrl) {
args.sourceGitUrl = input.repoUrl;
}
if (input.branch) args.sourceBranch = input.branch;
if (input.subpath) args.sourceSubpath = input.subpath;
if (input.authTokenRef) args.sourceAuthTokenRef = input.authTokenRef;
}
function sourceArgsFromExistingConnection(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connectionId: string;
connection: KtxProjectConnectionConfig;
}): KtxSetupSourcesArgs {
const sourceArgs: KtxSetupSourcesArgs = {
projectDir: input.args.projectDir,
inputMode: input.args.inputMode,
source: input.source,
sourceConnectionId: input.connectionId,
runInitialSourceIngest: input.args.runInitialSourceIngest,
skipSources: input.args.skipSources,
};
if (input.source === 'dbt') {
applyRepoSourceArgs(sourceArgs, {
sourceDir: stringField(input.connection.source_dir),
repoUrl: stringField(input.connection.repo_url),
branch: stringField(input.connection.branch),
subpath: stringField(input.connection.path),
authTokenRef: stringField(input.connection.auth_token_ref),
});
const profilesPath = stringField(input.connection.profiles_path);
const target = stringField(input.connection.target);
const projectName = stringField(input.connection.project_name);
if (profilesPath) sourceArgs.sourceProfilesPath = profilesPath;
if (target) sourceArgs.sourceTarget = target;
if (projectName) sourceArgs.sourceProjectName = projectName;
return sourceArgs;
}
if (input.source === 'metricflow') {
const metricflow = isRecord(input.connection.metricflow) ? input.connection.metricflow : {};
applyRepoSourceArgs(sourceArgs, {
repoUrl: stringField(metricflow.repoUrl),
branch: stringField(metricflow.branch),
subpath: stringField(metricflow.path),
authTokenRef: stringField(metricflow.auth_token_ref),
});
return sourceArgs;
}
if (input.source === 'lookml') {
applyRepoSourceArgs(sourceArgs, {
repoUrl: stringField(input.connection.repoUrl),
branch: stringField(input.connection.branch),
subpath: stringField(input.connection.path),
authTokenRef: stringField(input.connection.auth_token_ref),
});
const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {};
const expectedLookerConnectionName = stringField(mappings.expectedLookerConnectionName);
if (expectedLookerConnectionName) sourceArgs.sourceTarget = expectedLookerConnectionName;
return sourceArgs;
}
if (input.source === 'metabase') {
sourceArgs.sourceUrl = stringField(input.connection.api_url);
sourceArgs.sourceApiKeyRef = stringField(input.connection.api_key_ref);
const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {};
const databaseMapping = firstStringRecordEntry(mappings.databaseMappings);
if (databaseMapping) {
sourceArgs.metabaseDatabaseId = Number.parseInt(databaseMapping[0], 10);
sourceArgs.sourceWarehouseConnectionId = databaseMapping[1];
}
return sourceArgs;
}
if (input.source === 'looker') {
sourceArgs.sourceUrl = stringField(input.connection.base_url);
sourceArgs.sourceClientId = stringField(input.connection.client_id);
sourceArgs.sourceClientSecretRef = stringField(input.connection.client_secret_ref);
const mappings = isRecord(input.connection.mappings) ? input.connection.mappings : {};
const connectionMapping = firstStringRecordEntry(mappings.connectionMappings);
if (connectionMapping) {
sourceArgs.sourceTarget = connectionMapping[0];
sourceArgs.sourceWarehouseConnectionId = connectionMapping[1];
}
return sourceArgs;
}
sourceArgs.sourceApiKeyRef = stringField(input.connection.auth_token_ref);
sourceArgs.notionCrawlMode =
input.connection.crawl_mode === 'all_accessible' ? 'all_accessible' : 'selected_roots';
if (Array.isArray(input.connection.root_page_ids)) {
sourceArgs.notionRootPageIds = input.connection.root_page_ids.filter(
(pageId): pageId is string => typeof pageId === 'string',
);
}
return sourceArgs;
}
async function promptEditedSourceConnection(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connectionId: string;
connection: KtxProjectConnectionConfig;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
testGitRepo?: KtxSetupSourcesDeps['testGitRepo'];
pickNotionRootPages?: KtxSetupSourcesDeps['pickNotionRootPages'];
discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases'];
}): Promise<Extract<InteractiveSourceConnectionChoice, { kind: 'edited' }> | 'back'> {
const sourceArgs = await promptForInteractiveSource(
sourceArgsFromExistingConnection({
args: input.args,
source: input.source,
connectionId: input.connectionId,
connection: input.connection,
}),
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
},
input.connectionId,
input.testGitRepo,
input.discoverMetabaseDatabases,
);
return sourceArgs === 'back'
? 'back'
: { kind: 'edited', connectionId: input.connectionId, args: sourceArgs };
}
async function chooseContextSourceToEdit(input: {
projectDir: string;
prompts: KtxSetupSourcesPromptAdapter;
}): Promise<{ connectionId: string; source: KtxSetupSourceType } | 'back'> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const targets = contextSourceEditTargets(project.config.connections);
if (targets.length === 0) return 'back';
const choice = await input.prompts.select({
message: 'Context source to edit',
options: [
...targets.map((target) => ({
value: target.connectionId,
label: `${target.connectionId} (${sourceLabel(target.source)})`,
})),
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
const target = targets.find((candidate) => candidate.connectionId === choice);
return target ?? 'back';
}
async function chooseInteractiveSourceConnection(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connections: Record<string, KtxProjectConnectionConfig>;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
testGitRepo?: KtxSetupSourcesDeps['testGitRepo'];
pickNotionRootPages?: KtxSetupSourcesDeps['pickNotionRootPages'];
discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases'];
}): Promise<InteractiveSourceConnectionChoice> {
const existingIds = existingConnectionIdsBySource(input.connections, input.source);
const defaultConnectionId = defaultConnectionIdForSource(input.connections, input.source);
const label = sourceLabel(input.source);
if (existingIds.length === 0) {
const sourceArgs = await promptForInteractiveSource(
input.args,
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
},
defaultConnectionId,
input.testGitRepo,
input.discoverMetabaseDatabases,
);
return sourceArgs === 'back' ? 'back' : { kind: 'new', args: sourceArgs };
}
while (true) {
const choice = await input.prompts.select({
message: `Configure ${label}`,
options: [
...existingIds.map((connectionId) => ({
value: `existing:${connectionId}`,
label: `Use existing ${label} connection: ${connectionId}`,
})),
...existingIds.map((connectionId) => ({
value: `edit:${connectionId}`,
label: `Edit existing ${label} connection: ${connectionId}`,
})),
{ value: 'new', label: `Add new ${label} connection` },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'back') return 'back';
if (choice.startsWith('existing:')) {
const connectionId = choice.slice('existing:'.length);
const connection = input.connections[connectionId];
if (connection) {
return { kind: 'existing', connectionId, connection };
}
continue;
}
if (choice.startsWith('edit:')) {
const connectionId = choice.slice('edit:'.length);
const connection = input.connections[connectionId];
if (!connection) {
continue;
}
const edited = await promptEditedSourceConnection({
args: input.args,
source: input.source,
connectionId,
connection,
prompts: input.prompts,
io: input.io,
testGitRepo: input.testGitRepo,
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
});
if (edited === 'back') {
continue;
}
return edited;
}
const sourceArgs = await promptForInteractiveSource(
input.args,
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.pickNotionRootPages,
discoverMetabaseDatabases: input.discoverMetabaseDatabases,
},
defaultConnectionId,
input.testGitRepo,
input.discoverMetabaseDatabases,
);
if (sourceArgs === 'back') {
continue;
}
return { kind: 'new', args: sourceArgs };
}
}
function buildConnection(source: KtxSetupSourceType, args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
if (source === 'dbt') {
return buildDbtConnection(args);
}
if (source === 'metricflow') {
return buildMetricflowConnection(args);
}
if (source === 'metabase') {
return buildMetabaseConnection(args);
}
if (source === 'looker') {
return buildLookerConnection(args);
}
if (source === 'lookml') {
return buildLookmlConnection(args);
}
return buildNotionConnection(args);
}
async function validateSource(
source: KtxSetupSourceType,
args: { projectDir: string; connectionId: string; connection: KtxProjectConnectionConfig },
deps: KtxSetupSourcesDeps,
): Promise<SourceValidationResult> {
if (source === 'dbt') {
return await (deps.validateDbt ?? defaultValidateDbt)(args.connection);
}
if (source === 'metricflow') {
return await (deps.validateMetricflow ?? defaultValidateMetricflow)(args.connection);
}
if (source === 'metabase') {
return deps.validateMetabase
? await deps.validateMetabase(args.projectDir, args.connectionId)
: { ok: true, detail: 'mapping validation runs after the connection is saved' };
}
if (source === 'looker') {
return await (deps.validateLooker ?? defaultValidateLooker)(args.projectDir, args.connectionId);
}
if (source === 'lookml') {
return await (deps.validateLookml ?? defaultValidateLookml)(args.connection);
}
return await (deps.validateNotion ?? defaultValidateNotion)(args.connection);
}
async function saveValidateAndMaybeBuildSource(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
sourceChoice: Exclude<InteractiveSourceConnectionChoice, 'back'>;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
deps: KtxSetupSourcesDeps;
}): Promise<SourceSetupChoiceResult> {
const connectionId =
input.sourceChoice.kind === 'existing'
? input.sourceChoice.connectionId
: input.sourceChoice.kind === 'edited'
? input.sourceChoice.connectionId
: (input.sourceChoice.args.sourceConnectionId ?? `${input.source}-main`);
const connection =
input.sourceChoice.kind === 'existing'
? input.sourceChoice.connection
: buildConnection(input.source, input.sourceChoice.args);
const rollback =
input.sourceChoice.kind === 'existing'
? undefined
: await writeSourceConnection(
input.args.projectDir,
connectionId,
connection,
sourceAdapter(input.source),
);
if (input.sourceChoice.kind === 'existing') {
await ensureSourceAdapterEnabled(input.args.projectDir, input.source);
}
const validation = await validateSource(
input.source,
{ projectDir: input.args.projectDir, connectionId, connection },
input.deps,
);
if (!validation.ok) {
await rollback?.();
input.io.stderr.write(`${validation.message}\n`);
return { status: 'failed' };
}
if (input.source === 'metabase' || input.source === 'looker') {
input.prompts.log?.(`Validating ${sourceLabel(input.source)} mapping…`);
const mappingCode = await (input.deps.runMapping ?? defaultRunMapping)(
input.args.projectDir,
connectionId,
createSetupPrefixedIo(input.io),
);
if (mappingCode !== 0) {
await rollback?.();
return { status: 'failed' };
}
}
if (input.args.runInitialSourceIngest) {
const ingestResult = await runInitialSourceIngestWithRecovery({
args: input.args,
connectionId,
io: input.io,
prompts: input.prompts,
deps: input.deps,
});
if (ingestResult === 'failed') {
await rollback?.();
return { status: 'failed' };
}
if (ingestResult === 'back') {
await rollback?.();
return { status: 'back' };
}
} else {
input.io.stdout.write(`│ Context source ${connectionId} saved. It will be built during the context build step.\n`);
}
return { status: 'ready', connectionId };
}
export async function runKtxSetupSourcesStep(
args: KtxSetupSourcesArgs,
io: KtxCliIo,
deps: KtxSetupSourcesDeps = {},
): Promise<KtxSetupSourcesResult> {
try {
if (args.skipSources) {
await markSourcesComplete(args.projectDir);
io.stdout.write('│ Context source setup skipped.\n');
return { status: 'skipped', projectDir: args.projectDir };
}
const prompts = deps.prompts ?? createPromptAdapter();
const project = await loadKtxProject({ projectDir: args.projectDir });
if (!hasPrimarySource(project.config)) {
const message = 'Connect a database before adding context sources.';
if (args.source) {
io.stderr.write(`${message}\n`);
return { status: 'failed', projectDir: args.projectDir };
}
if (args.inputMode !== 'disabled') {
io.stdout.write(`${message}\n`);
return { status: 'skipped', projectDir: args.projectDir };
}
}
while (true) {
const contextSourceChecklist = sourceChecklistForConnections(
(await loadKtxProject({ projectDir: args.projectDir })).config.connections,
);
const selected = args.source
? [args.source]
: args.inputMode === 'disabled'
? []
: await prompts.multiselect({
message: withMultiselectNavigation('Which context sources should KTX ingest?'),
options: contextSourceChecklist.options,
...(contextSourceChecklist.initialValues.length > 0
? { initialValues: contextSourceChecklist.initialValues }
: {}),
required: false,
});
if (selected.includes('back')) {
return { status: 'back', projectDir: args.projectDir };
}
if (selected.length === 0) {
if (args.inputMode === 'disabled') {
io.stderr.write('Missing context source selection: pass --source or --skip-sources.\n');
return { status: 'missing-input', projectDir: args.projectDir };
}
await markSourcesComplete(args.projectDir);
io.stdout.write('│ No context sources selected.\n');
return { status: 'skipped', projectDir: args.projectDir };
}
const readyConnectionIds: string[] = [];
let returnToSourceSelection = false;
for (const source of selected as KtxSetupSourceType[]) {
const sourceChoice = args.source
? ({ kind: 'new', args } as const)
: await chooseInteractiveSourceConnection({
args,
source,
connections: (await loadKtxProject({ projectDir: args.projectDir })).config.connections,
prompts,
io,
testGitRepo: deps.testGitRepo,
pickNotionRootPages: deps.pickNotionRootPages,
discoverMetabaseDatabases: deps.discoverMetabaseDatabases,
});
if (sourceChoice === 'back') {
if (args.source) {
return { status: 'back', projectDir: args.projectDir };
}
returnToSourceSelection = true;
break;
}
const choiceResult = await saveValidateAndMaybeBuildSource({
args,
source,
sourceChoice,
prompts,
io,
deps,
});
if (choiceResult.status === 'failed') {
if (args.source) {
return { status: 'failed', projectDir: args.projectDir };
}
prompts.log?.('Edit the connection or pick a different source to continue.');
returnToSourceSelection = true;
break;
}
if (choiceResult.status === 'back') {
if (args.source) {
return { status: 'back', projectDir: args.projectDir };
}
returnToSourceSelection = true;
break;
}
if (!readyConnectionIds.includes(choiceResult.connectionId)) {
readyConnectionIds.push(choiceResult.connectionId);
}
}
if (returnToSourceSelection) {
continue;
}
if (readyConnectionIds.length > 0 && !args.source && args.inputMode !== 'disabled') {
let restartSourceSelection = false;
while (true) {
const addMore = await prompts.select({
message: `${readyConnectionIds.length} context source${readyConnectionIds.length > 1 ? 's' : ''} configured (${readyConnectionIds.join(', ')}). Add another?`,
options: [
{ value: 'done', label: 'Done — continue to context build' },
{ value: 'edit', label: 'Edit an existing context source' },
{ value: 'add', label: 'Add another context source' },
],
});
if (addMore === 'add') {
restartSourceSelection = true;
break;
}
if (addMore === 'edit') {
const editTarget = await chooseContextSourceToEdit({ projectDir: args.projectDir, prompts });
if (editTarget === 'back') {
continue;
}
const projectForEdit = await loadKtxProject({ projectDir: args.projectDir });
const connection = projectForEdit.config.connections[editTarget.connectionId];
if (!connection) {
continue;
}
const sourceChoice = await promptEditedSourceConnection({
args,
source: editTarget.source,
connectionId: editTarget.connectionId,
connection,
prompts,
io,
testGitRepo: deps.testGitRepo,
pickNotionRootPages: deps.pickNotionRootPages,
discoverMetabaseDatabases: deps.discoverMetabaseDatabases,
});
if (sourceChoice === 'back') {
continue;
}
const choiceResult = await saveValidateAndMaybeBuildSource({
args,
source: editTarget.source,
sourceChoice,
prompts,
io,
deps,
});
if (choiceResult.status === 'failed') {
prompts.log?.('Edit the connection or pick a different source to continue.');
continue;
}
if (choiceResult.status === 'back') {
continue;
}
if (!readyConnectionIds.includes(choiceResult.connectionId)) {
readyConnectionIds.push(choiceResult.connectionId);
}
continue;
}
break;
}
if (restartSourceSelection) {
continue;
}
}
await markSourcesComplete(args.projectDir);
return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds };
}
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return { status: 'failed', projectDir: args.projectDir };
}
}