mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
1508 lines
54 KiB
TypeScript
1508 lines
54 KiB
TypeScript
import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises';
|
|
import { tmpdir } from 'node:os';
|
|
import { join, relative, resolve } from 'node:path';
|
|
import { fileURLToPath, pathToFileURL } from 'node:url';
|
|
import { cancel, confirm, isCancel, log, multiselect, password, select, text } from '@clack/prompts';
|
|
import { localConnectionTypeForConfig, resolveNotionAuthToken } from '@ktx/context/connections';
|
|
import { resolveKtxConfigReference } from '@ktx/context/core';
|
|
import {
|
|
cloneOrPull,
|
|
DEFAULT_METABASE_CLIENT_CONFIG,
|
|
discoverMetabaseDatabases,
|
|
type DiscoveredMetabaseDatabase,
|
|
loadDbtSchemaFiles,
|
|
loadProjectInfo,
|
|
MetabaseClient,
|
|
type NotionApi,
|
|
NotionClient,
|
|
parseLookmlStagedDir,
|
|
parseMetricflowFiles,
|
|
testRepoConnection,
|
|
} from '@ktx/context/ingest';
|
|
import {
|
|
type KtxProjectConfig,
|
|
type KtxProjectConnectionConfig,
|
|
loadKtxProject,
|
|
markKtxSetupStateStepComplete,
|
|
serializeKtxProjectConfig,
|
|
stripKtxSetupCompletedSteps,
|
|
} from '@ktx/context/project';
|
|
import type { KtxCliIo } from './cli-runtime.js';
|
|
import { runKtxConnectionMapping } from './commands/connection-mapping.js';
|
|
import { runKtxConnection } from './connection.js';
|
|
import { withMenuOptionsSpacing, withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js';
|
|
import { runKtxPublicIngest } from './public-ingest.js';
|
|
import { withSetupInterruptConfirmation } from './setup-interrupt.js';
|
|
import { writeProjectLocalSecretReference } from './setup-secrets.js';
|
|
|
|
export type KtxSetupSourceType = 'dbt' | 'metricflow' | 'metabase' | 'looker' | 'lookml' | 'notion';
|
|
|
|
const DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN = 25;
|
|
|
|
export interface KtxSetupSourcesArgs {
|
|
projectDir: string;
|
|
inputMode: 'auto' | 'disabled';
|
|
source?: KtxSetupSourceType;
|
|
sourceConnectionId?: string;
|
|
sourcePath?: string;
|
|
sourceGitUrl?: string;
|
|
sourceBranch?: string;
|
|
sourceSubpath?: string;
|
|
sourceAuthTokenRef?: string;
|
|
sourceUrl?: string;
|
|
sourceApiKeyRef?: string;
|
|
sourceClientId?: string;
|
|
sourceClientSecretRef?: string;
|
|
sourceWarehouseConnectionId?: string;
|
|
sourceProjectName?: string;
|
|
sourceProfilesPath?: string;
|
|
sourceTarget?: string;
|
|
metabaseDatabaseId?: number;
|
|
notionCrawlMode?: 'all_accessible' | 'selected_roots';
|
|
notionRootPageIds?: string[];
|
|
runInitialSourceIngest: boolean;
|
|
skipSources: boolean;
|
|
}
|
|
|
|
export type KtxSetupSourcesResult =
|
|
| { status: 'ready'; projectDir: string; connectionIds: string[] }
|
|
| { status: 'skipped'; projectDir: string }
|
|
| { status: 'back'; projectDir: string }
|
|
| { status: 'missing-input'; projectDir: string }
|
|
| { status: 'failed'; projectDir: string };
|
|
|
|
export interface KtxSetupSourcesPromptAdapter {
|
|
multiselect(options: {
|
|
message: string;
|
|
options: Array<{ value: string; label: string }>;
|
|
required?: boolean;
|
|
}): Promise<string[]>;
|
|
select(options: { message: string; options: Array<{ value: string; label: string }> }): Promise<string>;
|
|
text(options: { message: string; placeholder?: string; initialValue?: string }): Promise<string | undefined>;
|
|
password(options: { message: string }): Promise<string | undefined>;
|
|
cancel(message: string): void;
|
|
log?(message: string): void;
|
|
}
|
|
|
|
export type SourceValidationResult = { ok: true; detail?: string } | { ok: false; message: string };
|
|
|
|
export interface KtxSetupSourcesDeps {
|
|
prompts?: KtxSetupSourcesPromptAdapter;
|
|
testGitRepo?: (args: { repoUrl: string; authToken?: string | null }) => Promise<{ ok: true } | { ok: false; error: string }>;
|
|
validateDbt?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
|
|
validateMetricflow?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
|
|
validateMetabase?: (projectDir: string, connectionId: string) => Promise<SourceValidationResult>;
|
|
validateLooker?: (projectDir: string, connectionId: string) => Promise<SourceValidationResult>;
|
|
validateLookml?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
|
|
validateNotion?: (connection: KtxProjectConnectionConfig) => Promise<SourceValidationResult>;
|
|
discoverMetabaseDatabases?: (args: {
|
|
sourceUrl: string;
|
|
sourceApiKeyRef: string;
|
|
sourceConnectionId: string;
|
|
}) => Promise<DiscoveredMetabaseDatabase[]>;
|
|
runMapping?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
|
|
runInitialIngest?: (
|
|
projectDir: string,
|
|
connectionId: string,
|
|
io: KtxCliIo,
|
|
options: { inputMode: KtxSetupSourcesArgs['inputMode'] },
|
|
) => Promise<number>;
|
|
}
|
|
|
|
const SOURCE_OPTIONS: Array<{ value: KtxSetupSourceType; label: string }> = [
|
|
{ value: 'dbt', label: 'dbt' },
|
|
{ value: 'metricflow', label: 'MetricFlow' },
|
|
{ value: 'metabase', label: 'Metabase' },
|
|
{ value: 'looker', label: 'Looker' },
|
|
{ value: 'lookml', label: 'LookML' },
|
|
{ value: 'notion', label: 'Notion' },
|
|
];
|
|
|
|
const SOURCE_LABELS = Object.fromEntries(SOURCE_OPTIONS.map((option) => [option.value, option.label])) as Record<
|
|
KtxSetupSourceType,
|
|
string
|
|
>;
|
|
|
|
const PRIMARY_SOURCE_DRIVERS = new Set([
|
|
'sqlite',
|
|
'postgres',
|
|
'mysql',
|
|
'clickhouse',
|
|
'sqlserver',
|
|
'bigquery',
|
|
'snowflake',
|
|
]);
|
|
|
|
function createPromptAdapter(): KtxSetupSourcesPromptAdapter {
|
|
return {
|
|
async multiselect(options) {
|
|
while (true) {
|
|
const value = await withSetupInterruptConfirmation(() => multiselect(withMenuOptionsSpacing(options)));
|
|
if (isCancel(value)) {
|
|
cancel('Setup cancelled.');
|
|
return ['back'];
|
|
}
|
|
const selected = [...value] as string[];
|
|
if (selected.length === 0 && !options.required) {
|
|
const skipConfirmed = await confirm({ message: 'Nothing selected. Skip this step?', initialValue: false });
|
|
if (isCancel(skipConfirmed)) {
|
|
cancel('Setup cancelled.');
|
|
return ['back'];
|
|
}
|
|
if (!skipConfirmed) continue;
|
|
}
|
|
return selected;
|
|
}
|
|
},
|
|
async select(options) {
|
|
const value = await withSetupInterruptConfirmation(() => select(withMenuOptionsSpacing(options)));
|
|
if (isCancel(value)) {
|
|
cancel('Setup cancelled.');
|
|
return 'back';
|
|
}
|
|
return String(value);
|
|
},
|
|
async text(options) {
|
|
const value = await withSetupInterruptConfirmation(() =>
|
|
text({ ...options, message: withTextInputNavigation(options.message) }),
|
|
);
|
|
return isCancel(value) ? undefined : String(value);
|
|
},
|
|
async password(options) {
|
|
const value = await withSetupInterruptConfirmation(() =>
|
|
password({ ...options, message: withTextInputNavigation(options.message) }),
|
|
);
|
|
return isCancel(value) ? undefined : String(value);
|
|
},
|
|
cancel(message) {
|
|
cancel(message);
|
|
},
|
|
log(message) {
|
|
log.info(message);
|
|
},
|
|
};
|
|
}
|
|
|
|
function isRecord(value: unknown): value is Record<string, unknown> {
|
|
return typeof value === 'object' && value !== null && !Array.isArray(value);
|
|
}
|
|
|
|
function stringField(value: unknown): string | undefined {
|
|
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined;
|
|
}
|
|
|
|
function sourceLabel(source: KtxSetupSourceType): string {
|
|
return SOURCE_LABELS[source];
|
|
}
|
|
|
|
function sourceAdapter(source: KtxSetupSourceType): string {
|
|
return source;
|
|
}
|
|
|
|
function connectionNamePrompt(label: string): string {
|
|
return `Name this ${label} connection\nKTX will use this short name in commands and config. You can rename it now.`;
|
|
}
|
|
|
|
function sourceSubpathPrompt(source: KtxSetupSourceType): string {
|
|
if (source === 'dbt') {
|
|
return [
|
|
'Folder containing dbt_project.yml (optional)',
|
|
'Press Enter when dbt_project.yml is at the repo root.',
|
|
'For monorepos, enter a relative path like analytics/dbt.',
|
|
].join('\n');
|
|
}
|
|
return [
|
|
`${sourceLabel(source)} project folder (optional)`,
|
|
'If the project files are inside a subfolder, enter that path.',
|
|
'Press Enter if the path or repo already points at the project.',
|
|
].join('\n');
|
|
}
|
|
|
|
const SCAN_SKIP_DIRS = new Set(['.git', 'node_modules', '.venv', 'target', 'dbt_packages', 'dbt_modules', '__pycache__']);
|
|
|
|
async function findDbtProjectSubpaths(rootDir: string): Promise<string[]> {
|
|
const entries = await readdir(rootDir, { withFileTypes: true, recursive: true });
|
|
const subpaths: string[] = [];
|
|
for (const entry of entries) {
|
|
if (!entry.isFile()) continue;
|
|
if (entry.name !== 'dbt_project.yml' && entry.name !== 'dbt_project.yaml') continue;
|
|
const relDir = relative(rootDir, entry.parentPath);
|
|
if (relDir.split('/').some((part) => SCAN_SKIP_DIRS.has(part))) continue;
|
|
subpaths.push(relDir);
|
|
}
|
|
return subpaths;
|
|
}
|
|
|
|
async function promptText(
|
|
prompts: KtxSetupSourcesPromptAdapter,
|
|
options: { message: string; placeholder?: string; initialValue?: string },
|
|
): Promise<string | undefined> {
|
|
return await prompts.text({ ...options, message: withTextInputNavigation(options.message) });
|
|
}
|
|
|
|
function assertSafeConnectionId(connectionId: string): void {
|
|
if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) {
|
|
throw new Error(`Unsafe connection id: ${connectionId}`);
|
|
}
|
|
}
|
|
|
|
function credentialRef(value: string | undefined, label: string): string {
|
|
const ref = value?.trim();
|
|
if (!ref) {
|
|
throw new Error(`Missing ${label}; use env:NAME or file:/absolute/path`);
|
|
}
|
|
if (!ref.startsWith('env:') && !ref.startsWith('file:')) {
|
|
throw new Error(`${label} must use env:NAME or file:/absolute/path`);
|
|
}
|
|
return ref;
|
|
}
|
|
|
|
async function chooseSourceCredentialRef(input: {
|
|
prompts: KtxSetupSourcesPromptAdapter;
|
|
projectDir: string;
|
|
label: string;
|
|
envName: string;
|
|
secretFileName: string;
|
|
}): Promise<string | 'back'> {
|
|
while (true) {
|
|
const choice = await input.prompts.select({
|
|
message: `How should KTX find your ${input.label}?`,
|
|
options: [
|
|
{ value: 'env', label: `Use ${input.envName} from the environment` },
|
|
{ value: 'paste', label: 'Paste a key and save it as a local secret file' },
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (choice === 'back') return 'back';
|
|
if (choice === 'paste') {
|
|
const value = await input.prompts.password({ message: input.label });
|
|
if (value === undefined) continue;
|
|
if (!value.trim()) continue;
|
|
const ref = await writeProjectLocalSecretReference({
|
|
projectDir: input.projectDir,
|
|
fileName: input.secretFileName,
|
|
value,
|
|
});
|
|
input.prompts.log?.(`Saved to .ktx/secrets/${input.secretFileName}`);
|
|
return ref;
|
|
}
|
|
return `env:${input.envName}`;
|
|
}
|
|
}
|
|
|
|
async function chooseGitAuthCredentialRef(input: {
|
|
prompts: KtxSetupSourcesPromptAdapter;
|
|
projectDir: string;
|
|
source: KtxSetupSourceType;
|
|
connectionId: string;
|
|
}): Promise<string | undefined | 'back'> {
|
|
const label = input.source === 'dbt' ? 'This' : `This ${sourceLabel(input.source)}`;
|
|
while (true) {
|
|
const choice = await input.prompts.select({
|
|
message: `${label} repo requires authentication.`,
|
|
options: [
|
|
{ value: 'env', label: 'Use GITHUB_TOKEN from the environment' },
|
|
{ value: 'paste', label: 'Paste a token and save it as a local secret file' },
|
|
{ value: 'skip', label: 'Skip — try without authentication' },
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (choice === 'back') return 'back';
|
|
if (choice === 'skip') return undefined;
|
|
if (choice === 'paste') {
|
|
const value = await input.prompts.password({ message: 'Git access token' });
|
|
if (value === undefined) continue;
|
|
if (!value.trim()) continue;
|
|
const fileName = `${input.connectionId}-auth-token`;
|
|
const ref = await writeProjectLocalSecretReference({
|
|
projectDir: input.projectDir,
|
|
fileName,
|
|
value,
|
|
});
|
|
input.prompts.log?.(`Saved to .ktx/secrets/${fileName}`);
|
|
return ref;
|
|
}
|
|
return 'env:GITHUB_TOKEN';
|
|
}
|
|
}
|
|
|
|
function repoOrLocalSource(args: KtxSetupSourcesArgs): { sourceDir?: string; repoUrl?: string } {
|
|
if (args.sourcePath && args.sourceGitUrl) {
|
|
throw new Error('Choose only one source location: --source-path or --source-git-url.');
|
|
}
|
|
if (args.sourcePath) {
|
|
return { sourceDir: resolve(args.sourcePath) };
|
|
}
|
|
if (args.sourceGitUrl) {
|
|
return { repoUrl: args.sourceGitUrl };
|
|
}
|
|
throw new Error('Missing source location: pass --source-path or --source-git-url.');
|
|
}
|
|
|
|
function fileRepoUrl(sourceDir: string): string {
|
|
return pathToFileURL(sourceDir).toString();
|
|
}
|
|
|
|
async function writeProjectConfig(projectDir: string, config: KtxProjectConfig): Promise<void> {
|
|
const project = await loadKtxProject({ projectDir });
|
|
await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(config)), 'utf-8');
|
|
}
|
|
|
|
async function writeSourceConnection(
|
|
projectDir: string,
|
|
connectionId: string,
|
|
connection: KtxProjectConnectionConfig,
|
|
adapter: string,
|
|
): Promise<() => Promise<void>> {
|
|
assertSafeConnectionId(connectionId);
|
|
const project = await loadKtxProject({ projectDir });
|
|
const previousConnection = project.config.connections[connectionId];
|
|
const hadPreviousConnection = previousConnection !== undefined;
|
|
const shouldRemoveAdapterOnRollback = !project.config.ingest.adapters.includes(adapter);
|
|
const config = {
|
|
...project.config,
|
|
connections: {
|
|
...project.config.connections,
|
|
[connectionId]: connection,
|
|
},
|
|
ingest: {
|
|
...project.config.ingest,
|
|
adapters: project.config.ingest.adapters.includes(adapter)
|
|
? [...project.config.ingest.adapters]
|
|
: [...project.config.ingest.adapters, adapter],
|
|
},
|
|
};
|
|
await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(config)), 'utf-8');
|
|
return async () => {
|
|
const latest = await loadKtxProject({ projectDir });
|
|
const connections = { ...latest.config.connections };
|
|
if (hadPreviousConnection) {
|
|
connections[connectionId] = previousConnection;
|
|
} else {
|
|
delete connections[connectionId];
|
|
}
|
|
await writeProjectConfig(projectDir, {
|
|
...latest.config,
|
|
connections,
|
|
ingest: {
|
|
...latest.config.ingest,
|
|
adapters: shouldRemoveAdapterOnRollback
|
|
? latest.config.ingest.adapters.filter((candidate) => candidate !== adapter)
|
|
: latest.config.ingest.adapters,
|
|
},
|
|
});
|
|
};
|
|
}
|
|
|
|
async function ensureSourceAdapterEnabled(projectDir: string, source: KtxSetupSourceType): Promise<void> {
|
|
const adapter = sourceAdapter(source);
|
|
const project = await loadKtxProject({ projectDir });
|
|
if (project.config.ingest.adapters.includes(adapter)) {
|
|
return;
|
|
}
|
|
await writeProjectConfig(projectDir, {
|
|
...project.config,
|
|
ingest: {
|
|
...project.config.ingest,
|
|
adapters: [...project.config.ingest.adapters, adapter],
|
|
},
|
|
});
|
|
}
|
|
|
|
async function markSourcesComplete(projectDir: string): Promise<void> {
|
|
const project = await loadKtxProject({ projectDir });
|
|
await writeFile(project.configPath, serializeKtxProjectConfig(stripKtxSetupCompletedSteps(project.config)), 'utf-8');
|
|
await markKtxSetupStateStepComplete(projectDir, 'sources');
|
|
}
|
|
|
|
function hasPrimarySource(config: KtxProjectConfig): boolean {
|
|
const setupPrimaryIds = config.setup?.database_connection_ids ?? [];
|
|
if (setupPrimaryIds.some((connectionId) => Object.hasOwn(config.connections, connectionId))) {
|
|
return true;
|
|
}
|
|
return Object.values(config.connections).some((connection) =>
|
|
PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()),
|
|
);
|
|
}
|
|
|
|
function buildDbtConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
const source = repoOrLocalSource(args);
|
|
return {
|
|
driver: 'dbt',
|
|
...(source.sourceDir ? { source_dir: source.sourceDir } : {}),
|
|
...(source.repoUrl ? { repo_url: source.repoUrl } : {}),
|
|
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
|
|
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
|
|
...(args.sourceAuthTokenRef
|
|
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'dbt private repo access token') }
|
|
: {}),
|
|
...(args.sourceProfilesPath ? { profiles_path: resolve(args.sourceProfilesPath) } : {}),
|
|
...(args.sourceTarget ? { target: args.sourceTarget } : {}),
|
|
...(args.sourceProjectName ? { project_name: args.sourceProjectName } : {}),
|
|
};
|
|
}
|
|
|
|
function buildMetricflowConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
const source = repoOrLocalSource(args);
|
|
return {
|
|
driver: 'metricflow',
|
|
metricflow: {
|
|
repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''),
|
|
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
|
|
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
|
|
...(args.sourceAuthTokenRef
|
|
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'MetricFlow auth token ref') }
|
|
: {}),
|
|
},
|
|
};
|
|
}
|
|
|
|
function buildMetabaseConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
if (!args.sourceUrl) {
|
|
throw new Error('Missing Metabase URL: pass --source-url.');
|
|
}
|
|
if (!args.sourceWarehouseConnectionId) {
|
|
throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.');
|
|
}
|
|
if (!args.metabaseDatabaseId) {
|
|
throw new Error('Missing Metabase database id: pass --metabase-database-id.');
|
|
}
|
|
return {
|
|
driver: 'metabase',
|
|
api_url: args.sourceUrl,
|
|
api_key_ref: credentialRef(args.sourceApiKeyRef, 'Metabase API key ref'),
|
|
mappings: {
|
|
databaseMappings: { [String(args.metabaseDatabaseId)]: args.sourceWarehouseConnectionId },
|
|
syncEnabled: { [String(args.metabaseDatabaseId)]: true },
|
|
syncMode: 'ALL',
|
|
},
|
|
};
|
|
}
|
|
|
|
function buildLookerConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
if (!args.sourceUrl) {
|
|
throw new Error('Missing Looker base URL: pass --source-url.');
|
|
}
|
|
if (!args.sourceClientId) {
|
|
throw new Error('Missing Looker client id: pass --source-client-id.');
|
|
}
|
|
if (!args.sourceWarehouseConnectionId) {
|
|
throw new Error('Missing mapped warehouse: pass --source-warehouse-connection-id.');
|
|
}
|
|
return {
|
|
driver: 'looker',
|
|
base_url: args.sourceUrl,
|
|
client_id: args.sourceClientId,
|
|
client_secret_ref: credentialRef(args.sourceClientSecretRef, 'Looker client secret ref'),
|
|
mappings: {
|
|
connectionMappings: {
|
|
[args.sourceTarget ?? args.sourceWarehouseConnectionId]: args.sourceWarehouseConnectionId,
|
|
},
|
|
},
|
|
};
|
|
}
|
|
|
|
function buildLookmlConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
const source = repoOrLocalSource(args);
|
|
return {
|
|
driver: 'lookml',
|
|
repoUrl: source.repoUrl ?? fileRepoUrl(source.sourceDir ?? ''),
|
|
...(args.sourceBranch ? { branch: args.sourceBranch } : {}),
|
|
...(args.sourceSubpath ? { path: args.sourceSubpath } : {}),
|
|
...(args.sourceAuthTokenRef
|
|
? { auth_token_ref: credentialRef(args.sourceAuthTokenRef, 'LookML auth token ref') }
|
|
: {}),
|
|
mappings: {
|
|
expectedLookerConnectionName: args.sourceTarget ?? args.sourceWarehouseConnectionId ?? null,
|
|
},
|
|
};
|
|
}
|
|
|
|
function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
const rootPageIds = args.notionRootPageIds ?? [];
|
|
const crawlMode = rootPageIds.length > 0 ? 'selected_roots' : (args.notionCrawlMode ?? 'selected_roots');
|
|
if (crawlMode === 'selected_roots' && rootPageIds.length === 0) {
|
|
throw new Error('Notion selected_roots requires --notion-root-page-id.');
|
|
}
|
|
return {
|
|
driver: 'notion',
|
|
auth_token_ref: credentialRef(args.sourceApiKeyRef, 'Notion token ref'),
|
|
crawl_mode: crawlMode,
|
|
root_page_ids: rootPageIds,
|
|
root_database_ids: [],
|
|
root_data_source_ids: [],
|
|
max_pages_per_run: 1000,
|
|
max_knowledge_creates_per_run: DEFAULT_NOTION_MAX_KNOWLEDGE_CREATES_PER_RUN,
|
|
max_knowledge_updates_per_run: 20,
|
|
last_successful_cursor: null,
|
|
};
|
|
}
|
|
|
|
function sourcePathFromFileRepoUrl(repoUrl: string, subpath?: string): string {
|
|
const root = fileURLToPath(repoUrl);
|
|
return subpath ? join(root, subpath) : root;
|
|
}
|
|
|
|
function repoAuthToken(connection: KtxProjectConnectionConfig | Record<string, unknown>): string | null {
|
|
const ref = stringField(connection.auth_token_ref);
|
|
const literal = stringField(connection.auth_token);
|
|
return literal ?? resolveKtxConfigReference(ref, process.env) ?? null;
|
|
}
|
|
|
|
async function collectYamlFilesRecursive(sourceRoot: string): Promise<Array<{ content: string; path: string }>> {
|
|
const entries = await readdir(sourceRoot, { withFileTypes: true, recursive: true });
|
|
const files: Array<{ content: string; path: string }> = [];
|
|
for (const entry of entries) {
|
|
if (!entry.isFile() || !/\.ya?ml$/i.test(entry.name)) {
|
|
continue;
|
|
}
|
|
const path = join(entry.parentPath, entry.name);
|
|
files.push({ path, content: await readFile(path, 'utf-8') });
|
|
}
|
|
return files;
|
|
}
|
|
|
|
async function defaultValidateDbt(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
|
|
let sourceDir = stringField(connection.source_dir);
|
|
const repoUrl = stringField(connection.repo_url);
|
|
if (!sourceDir && repoUrl?.startsWith('file:')) {
|
|
sourceDir = sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path));
|
|
}
|
|
if (!sourceDir && repoUrl) {
|
|
const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-'));
|
|
await cloneOrPull({
|
|
repoUrl,
|
|
authToken: repoAuthToken(connection),
|
|
cacheDir,
|
|
branch: stringField(connection.branch) ?? 'main',
|
|
});
|
|
sourceDir = stringField(connection.path) ? join(cacheDir, String(connection.path)) : cacheDir;
|
|
}
|
|
if (!sourceDir) {
|
|
return { ok: false, message: 'dbt setup requires --source-path or --source-git-url.' };
|
|
}
|
|
const info = await loadProjectInfo(sourceDir);
|
|
const schemaFiles = await loadDbtSchemaFiles(sourceDir);
|
|
if (!info.projectName && typeof connection.project_name !== 'string') {
|
|
return { ok: false, message: 'dbt project metadata is missing project name.' };
|
|
}
|
|
return { ok: true, detail: `project=${info.projectName ?? connection.project_name} schemas=${schemaFiles.length}` };
|
|
}
|
|
|
|
async function defaultValidateMetricflow(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
|
|
const metricflow = isRecord(connection.metricflow) ? connection.metricflow : undefined;
|
|
const repoUrl = stringField(metricflow?.repoUrl);
|
|
if (!repoUrl) {
|
|
return { ok: false, message: 'MetricFlow setup requires repoUrl.' };
|
|
}
|
|
if (!repoUrl.startsWith('file:')) {
|
|
const result = await testRepoConnection({
|
|
repoUrl,
|
|
authToken: metricflow ? repoAuthToken(metricflow) : null,
|
|
});
|
|
if (!result.ok) {
|
|
return { ok: false, message: result.error };
|
|
}
|
|
return { ok: true, detail: 'repository reachable' };
|
|
}
|
|
const path = sourcePathFromFileRepoUrl(repoUrl, stringField(metricflow?.path));
|
|
const parsed = parseMetricflowFiles(await collectYamlFilesRecursive(path));
|
|
return {
|
|
ok: true,
|
|
detail: `semanticModels=${parsed.semanticModels.length} metrics=${parsed.crossModelMetrics.length}`,
|
|
};
|
|
}
|
|
|
|
async function defaultValidateLooker(projectDir: string, connectionId: string): Promise<SourceValidationResult> {
|
|
const code = await runKtxConnectionMapping(
|
|
{ command: 'refresh', projectDir, connectionId, autoAccept: true },
|
|
{ stdout: { write() {} }, stderr: { write() {} } },
|
|
);
|
|
return code === 0
|
|
? { ok: true, detail: 'Looker mapping refreshed' }
|
|
: { ok: false, message: 'Looker validation failed' };
|
|
}
|
|
|
|
async function defaultValidateLookml(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
|
|
const repoUrl = stringField(connection.repoUrl);
|
|
if (!repoUrl) {
|
|
return { ok: false, message: 'LookML setup requires repoUrl.' };
|
|
}
|
|
if (!repoUrl.startsWith('file:')) {
|
|
const result = await testRepoConnection({ repoUrl, authToken: repoAuthToken(connection) });
|
|
return result.ok ? { ok: true, detail: 'repository reachable' } : { ok: false, message: result.error };
|
|
}
|
|
const parsed = await parseLookmlStagedDir(sourcePathFromFileRepoUrl(repoUrl, stringField(connection.path)));
|
|
const count = parsed.models.length + parsed.views.length + parsed.dashboards.length;
|
|
return count > 0 ? { ok: true, detail: `lookmlFiles=${count}` } : { ok: false, message: 'No LookML files found' };
|
|
}
|
|
|
|
async function defaultValidateNotion(connection: KtxProjectConnectionConfig): Promise<SourceValidationResult> {
|
|
const token = await resolveNotionAuthToken(String(connection.auth_token_ref));
|
|
const client: NotionApi = new NotionClient(token);
|
|
await client.retrieveBotUser();
|
|
const roots = Array.isArray(connection.root_page_ids)
|
|
? connection.root_page_ids.filter((id): id is string => typeof id === 'string')
|
|
: [];
|
|
for (const root of roots) {
|
|
await client.retrievePage(root);
|
|
}
|
|
return { ok: true, detail: `roots=${roots.length}` };
|
|
}
|
|
|
|
interface MappingJsonOutput {
|
|
connectionId: string;
|
|
refresh: { ok: boolean; output: string[] };
|
|
validation: { ok: boolean; output: string[] };
|
|
mappings: unknown[];
|
|
}
|
|
|
|
function summarizeMappingResult(parsed: MappingJsonOutput): string {
|
|
const mappingCount = parsed.mappings.length;
|
|
const mappingNoun = mappingCount === 1 ? 'mapping' : 'mappings';
|
|
return `Mapping validated — ${mappingCount} ${mappingNoun} configured`;
|
|
}
|
|
|
|
async function defaultRunMapping(projectDir: string, connectionId: string, io: KtxCliIo): Promise<number> {
|
|
let captured = '';
|
|
const captureIo: KtxCliIo = {
|
|
stdout: { write(chunk: string) { captured += chunk; } },
|
|
stderr: io.stderr,
|
|
};
|
|
const code = await runKtxConnection(
|
|
{ command: 'map', projectDir, sourceConnectionId: connectionId, json: true },
|
|
captureIo,
|
|
);
|
|
if (code !== 0) return code;
|
|
try {
|
|
const parsed = JSON.parse(captured.trim()) as MappingJsonOutput;
|
|
io.stdout.write(`${summarizeMappingResult(parsed)}\n`);
|
|
} catch {
|
|
io.stdout.write(captured);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
async function defaultRunInitialIngest(
|
|
projectDir: string,
|
|
connectionId: string,
|
|
io: KtxCliIo,
|
|
options: { inputMode: KtxSetupSourcesArgs['inputMode'] },
|
|
): Promise<number> {
|
|
return await runKtxPublicIngest(
|
|
{
|
|
command: 'run',
|
|
projectDir,
|
|
targetConnectionId: connectionId,
|
|
all: false,
|
|
json: false,
|
|
inputMode: options.inputMode,
|
|
},
|
|
io,
|
|
);
|
|
}
|
|
|
|
async function runInitialSourceIngestWithRecovery(input: {
|
|
args: KtxSetupSourcesArgs;
|
|
connectionId: string;
|
|
io: KtxCliIo;
|
|
prompts: KtxSetupSourcesPromptAdapter;
|
|
deps: KtxSetupSourcesDeps;
|
|
}): Promise<'ready' | 'continue' | 'back' | 'failed'> {
|
|
while (true) {
|
|
input.io.stdout.write(`│ Building context from ${input.connectionId}. Large sources can take a while.\n`);
|
|
const ingestCode = await (input.deps.runInitialIngest ?? defaultRunInitialIngest)(
|
|
input.args.projectDir,
|
|
input.connectionId,
|
|
input.io,
|
|
{
|
|
inputMode: input.args.inputMode,
|
|
},
|
|
);
|
|
if (ingestCode === 0) {
|
|
return 'ready';
|
|
}
|
|
if (input.args.inputMode === 'disabled') {
|
|
return 'failed';
|
|
}
|
|
|
|
const action = await input.prompts.select({
|
|
message: `Context build failed for ${input.connectionId}\nRetry now, continue setup and build this source later, or go back.`,
|
|
options: [
|
|
{ value: 'retry', label: 'Retry context build' },
|
|
{ value: 'continue', label: 'Continue setup and build this source later' },
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (action === 'retry') {
|
|
continue;
|
|
}
|
|
if (action === 'continue') {
|
|
input.io.stdout.write(`│ Context source saved without a completed context build for ${input.connectionId}.\n`);
|
|
input.io.stdout.write(`│ Run later: ktx ingest run --connection-id ${input.connectionId} --adapter <adapter>\n`);
|
|
return 'continue';
|
|
}
|
|
return 'back';
|
|
}
|
|
}
|
|
|
|
type SourceLocationChoice = 'path' | 'git';
|
|
|
|
type SourcePromptState = KtxSetupSourcesArgs & {
|
|
sourceLocation?: SourceLocationChoice;
|
|
};
|
|
|
|
type SourcePromptStep = (state: SourcePromptState) => Promise<'next' | 'back'>;
|
|
|
|
interface WarehouseConnectionChoice {
|
|
id: string;
|
|
connectionType: string;
|
|
}
|
|
|
|
type InteractiveSourceConnectionChoice =
|
|
| { kind: 'existing'; connectionId: string; connection: KtxProjectConnectionConfig }
|
|
| { kind: 'new'; args: KtxSetupSourcesArgs }
|
|
| 'back';
|
|
|
|
async function runSourcePromptSteps(
|
|
initialState: SourcePromptState,
|
|
stepsForState: (state: SourcePromptState) => SourcePromptStep[],
|
|
): Promise<KtxSetupSourcesArgs | 'back'> {
|
|
let stepIndex = 0;
|
|
while (true) {
|
|
const steps = stepsForState(initialState);
|
|
if (stepIndex >= steps.length) {
|
|
const { sourceLocation: _sourceLocation, ...sourceArgs } = initialState;
|
|
return sourceArgs;
|
|
}
|
|
|
|
const result = await steps[stepIndex]?.(initialState);
|
|
if (result === 'back') {
|
|
if (stepIndex === 0) {
|
|
return 'back';
|
|
}
|
|
stepIndex -= 1;
|
|
continue;
|
|
}
|
|
stepIndex += 1;
|
|
}
|
|
}
|
|
|
|
function resetRepoLocationFields(state: SourcePromptState): void {
|
|
delete state.sourcePath;
|
|
delete state.sourceGitUrl;
|
|
delete state.sourceBranch;
|
|
delete state.sourceAuthTokenRef;
|
|
delete state.sourceSubpath;
|
|
delete state.sourceProjectName;
|
|
}
|
|
|
|
function warehouseConnectionChoices(config: KtxProjectConfig): WarehouseConnectionChoice[] {
|
|
return Object.entries(config.connections)
|
|
.filter(([, connection]) => PRIMARY_SOURCE_DRIVERS.has(String(connection.driver ?? '').toLowerCase()))
|
|
.map(([id, connection]) => ({ id, connectionType: localConnectionTypeForConfig(id, connection) }))
|
|
.sort((left, right) => left.id.localeCompare(right.id));
|
|
}
|
|
|
|
async function chooseMappedWarehouseConnectionId(input: {
|
|
projectDir: string;
|
|
prompts: KtxSetupSourcesPromptAdapter;
|
|
}): Promise<string | 'back'> {
|
|
const project = await loadKtxProject({ projectDir: input.projectDir });
|
|
const choices = warehouseConnectionChoices(project.config);
|
|
if (choices.length === 1) {
|
|
return choices[0].id;
|
|
}
|
|
if (choices.length === 0) {
|
|
const entered = await promptText(input.prompts, { message: 'Mapped warehouse connection id' });
|
|
return entered === undefined ? 'back' : entered;
|
|
}
|
|
|
|
const selected = await input.prompts.select({
|
|
message: 'Mapped warehouse connection',
|
|
options: [
|
|
...choices.map((choice) => ({
|
|
value: choice.id,
|
|
label: `${choice.id} (${choice.connectionType})`,
|
|
})),
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
return selected === 'back' ? 'back' : selected;
|
|
}
|
|
|
|
async function defaultDiscoverMetabaseDatabases(input: {
|
|
sourceUrl: string;
|
|
sourceApiKeyRef: string;
|
|
}): Promise<DiscoveredMetabaseDatabase[]> {
|
|
const apiKey = resolveKtxConfigReference(input.sourceApiKeyRef, process.env);
|
|
if (!apiKey) {
|
|
throw new Error('Metabase API key ref could not be resolved');
|
|
}
|
|
const client = new MetabaseClient(
|
|
{ apiUrl: input.sourceUrl, apiKey },
|
|
DEFAULT_METABASE_CLIENT_CONFIG,
|
|
);
|
|
try {
|
|
return await discoverMetabaseDatabases(client);
|
|
} finally {
|
|
await client.cleanup();
|
|
}
|
|
}
|
|
|
|
function metabaseDatabaseLabel(database: DiscoveredMetabaseDatabase): string {
|
|
const detail = [database.engine].filter(Boolean).join(', ');
|
|
return detail ? `${database.id}: ${database.name} (${detail})` : `${database.id}: ${database.name}`;
|
|
}
|
|
|
|
async function chooseMetabaseDatabaseId(input: {
|
|
state: SourcePromptState;
|
|
prompts: KtxSetupSourcesPromptAdapter;
|
|
deps: KtxSetupSourcesDeps;
|
|
}): Promise<number | 'back'> {
|
|
const sourceUrl = input.state.sourceUrl;
|
|
const sourceApiKeyRef = input.state.sourceApiKeyRef;
|
|
if (sourceUrl && sourceApiKeyRef) {
|
|
try {
|
|
const discovered = await (input.deps.discoverMetabaseDatabases ?? defaultDiscoverMetabaseDatabases)({
|
|
sourceUrl,
|
|
sourceApiKeyRef,
|
|
sourceConnectionId: input.state.sourceConnectionId ?? 'metabase-main',
|
|
});
|
|
if (discovered.length === 1) {
|
|
return discovered[0].id;
|
|
}
|
|
if (discovered.length > 1) {
|
|
const selected = await input.prompts.select({
|
|
message: 'Metabase database',
|
|
options: [
|
|
...discovered
|
|
.slice()
|
|
.sort((left, right) => left.id - right.id)
|
|
.map((database) => ({
|
|
value: String(database.id),
|
|
label: metabaseDatabaseLabel(database),
|
|
})),
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
return selected === 'back' ? 'back' : Number.parseInt(selected, 10);
|
|
}
|
|
} catch {
|
|
// Discovery is a convenience. Fall back to the raw id prompt when credentials
|
|
// are unavailable locally or the Metabase API cannot be reached yet.
|
|
}
|
|
}
|
|
|
|
const databaseId = await promptText(input.prompts, { message: 'Metabase database id' });
|
|
return databaseId === undefined ? 'back' : Number.parseInt(databaseId, 10);
|
|
}
|
|
|
|
function connectionIdPromptSteps(
|
|
args: KtxSetupSourcesArgs,
|
|
source: KtxSetupSourceType,
|
|
prompts: KtxSetupSourcesPromptAdapter,
|
|
defaultConnectionId: string,
|
|
): SourcePromptStep[] {
|
|
if (args.sourceConnectionId) {
|
|
return [];
|
|
}
|
|
return [
|
|
async (state) => {
|
|
const enteredConnectionId = await promptText(prompts, {
|
|
message: connectionNamePrompt(sourceLabel(source)),
|
|
placeholder: defaultConnectionId,
|
|
initialValue: defaultConnectionId,
|
|
});
|
|
if (enteredConnectionId === undefined) {
|
|
return 'back';
|
|
}
|
|
state.sourceConnectionId = enteredConnectionId.trim() || defaultConnectionId;
|
|
return 'next';
|
|
},
|
|
];
|
|
}
|
|
|
|
async function promptForInteractiveSource(
|
|
args: KtxSetupSourcesArgs,
|
|
source: KtxSetupSourceType,
|
|
prompts: KtxSetupSourcesPromptAdapter,
|
|
defaultConnectionId = `${source}-main`,
|
|
testGitRepo: KtxSetupSourcesDeps['testGitRepo'] = testRepoConnection,
|
|
discoverMetabaseDatabaseList?: KtxSetupSourcesDeps['discoverMetabaseDatabases'],
|
|
): Promise<KtxSetupSourcesArgs | 'back'> {
|
|
const initialState: SourcePromptState = { ...args, source };
|
|
if (args.sourceConnectionId) {
|
|
initialState.sourceConnectionId = args.sourceConnectionId;
|
|
}
|
|
const connectionSteps = connectionIdPromptSteps(args, source, prompts, defaultConnectionId);
|
|
|
|
if (source === 'dbt' || source === 'metricflow' || source === 'lookml') {
|
|
return await runSourcePromptSteps(initialState, (state) => [
|
|
...connectionSteps,
|
|
async () => {
|
|
const selectedLocation = await prompts.select({
|
|
message: `${source} source location`,
|
|
options: [
|
|
{ value: 'path', label: 'Local path' },
|
|
{ value: 'git', label: 'Git URL' },
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (selectedLocation !== 'path' && selectedLocation !== 'git') {
|
|
return 'back';
|
|
}
|
|
if (state.sourceLocation !== selectedLocation) {
|
|
resetRepoLocationFields(state);
|
|
}
|
|
state.sourceLocation = selectedLocation;
|
|
return 'next';
|
|
},
|
|
...(state.sourceLocation === 'path'
|
|
? [
|
|
async (currentState: SourcePromptState) => {
|
|
const sourcePath = await promptText(prompts, { message: `${source} local path` });
|
|
if (sourcePath === undefined) return 'back';
|
|
currentState.sourcePath = sourcePath;
|
|
return 'next';
|
|
},
|
|
]
|
|
: []),
|
|
...(state.sourceLocation === 'git'
|
|
? [
|
|
async (currentState: SourcePromptState) => {
|
|
const sourceGitUrl = await promptText(prompts, { message: `${source} git URL` });
|
|
if (sourceGitUrl === undefined) return 'back';
|
|
currentState.sourceGitUrl = sourceGitUrl;
|
|
return 'next';
|
|
},
|
|
async (currentState: SourcePromptState) => {
|
|
const branch = await promptText(prompts, { message: `${source} git branch`, initialValue: 'main' });
|
|
if (branch === undefined) return 'back';
|
|
currentState.sourceBranch = branch || 'main';
|
|
return 'next';
|
|
},
|
|
]
|
|
: []),
|
|
...(state.sourceLocation === 'git'
|
|
? [
|
|
async (currentState: SourcePromptState) => {
|
|
const result = await testGitRepo!({ repoUrl: currentState.sourceGitUrl! });
|
|
if (result.ok) {
|
|
delete currentState.sourceAuthTokenRef;
|
|
prompts.log?.('Repository connected.');
|
|
return 'next';
|
|
}
|
|
const authRef = await chooseGitAuthCredentialRef({
|
|
prompts,
|
|
projectDir: args.projectDir,
|
|
source,
|
|
connectionId: currentState.sourceConnectionId ?? `${source}-main`,
|
|
});
|
|
if (authRef === 'back') return 'back';
|
|
if (authRef) {
|
|
currentState.sourceAuthTokenRef = authRef;
|
|
} else {
|
|
delete currentState.sourceAuthTokenRef;
|
|
}
|
|
return 'next';
|
|
},
|
|
]
|
|
: []),
|
|
...(state.sourceLocation
|
|
? [
|
|
async (currentState: SourcePromptState) => {
|
|
if (source === 'dbt') {
|
|
let scanDir: string | undefined;
|
|
if (currentState.sourceLocation === 'path' && currentState.sourcePath) {
|
|
scanDir = currentState.sourcePath;
|
|
} else if (currentState.sourceLocation === 'git' && currentState.sourceGitUrl) {
|
|
try {
|
|
const cacheDir = await mkdtemp(join(tmpdir(), 'ktx-setup-dbt-scan-'));
|
|
const authToken = currentState.sourceAuthTokenRef
|
|
? resolveKtxConfigReference(currentState.sourceAuthTokenRef, process.env)
|
|
: null;
|
|
await cloneOrPull({
|
|
repoUrl: currentState.sourceGitUrl,
|
|
authToken,
|
|
cacheDir,
|
|
branch: currentState.sourceBranch ?? 'main',
|
|
});
|
|
scanDir = cacheDir;
|
|
} catch {
|
|
// Clone failed — fall through to manual prompt
|
|
}
|
|
}
|
|
if (scanDir) {
|
|
try {
|
|
const subpaths = await findDbtProjectSubpaths(scanDir);
|
|
if (subpaths.length === 1) {
|
|
const found = subpaths[0]!;
|
|
if (found) {
|
|
currentState.sourceSubpath = found;
|
|
prompts.log?.(`Found dbt_project.yml in ${found}/`);
|
|
} else {
|
|
delete currentState.sourceSubpath;
|
|
}
|
|
return 'next';
|
|
}
|
|
if (subpaths.length > 1) {
|
|
const selected = await prompts.select({
|
|
message: 'Multiple dbt projects found — which one should KTX use?',
|
|
options: [
|
|
...subpaths.map((p) => ({ value: p || '.', label: p || '(project root)' })),
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (selected === 'back') return 'back';
|
|
const subpath = selected === '.' ? '' : selected;
|
|
if (subpath) {
|
|
currentState.sourceSubpath = subpath;
|
|
} else {
|
|
delete currentState.sourceSubpath;
|
|
}
|
|
return 'next';
|
|
}
|
|
} catch {
|
|
// Directory unreadable — fall through to manual prompt
|
|
}
|
|
}
|
|
}
|
|
const subpath = await promptText(prompts, {
|
|
message: sourceSubpathPrompt(source),
|
|
placeholder: 'optional',
|
|
});
|
|
if (subpath === undefined) return 'back';
|
|
if (subpath) {
|
|
currentState.sourceSubpath = subpath;
|
|
} else {
|
|
delete currentState.sourceSubpath;
|
|
}
|
|
return 'next';
|
|
},
|
|
]
|
|
: []),
|
|
]);
|
|
}
|
|
|
|
if (source === 'metabase') {
|
|
return await runSourcePromptSteps(initialState, () => [
|
|
...connectionSteps,
|
|
async (state) => {
|
|
const sourceUrl = await promptText(prompts, { message: 'Metabase URL' });
|
|
if (sourceUrl === undefined) return 'back';
|
|
state.sourceUrl = sourceUrl;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const ref = await chooseSourceCredentialRef({
|
|
prompts,
|
|
projectDir: args.projectDir,
|
|
label: 'Metabase API key',
|
|
envName: 'METABASE_API_KEY',
|
|
secretFileName: `${state.sourceConnectionId ?? 'metabase-main'}-api-key`,
|
|
});
|
|
if (ref === 'back') return 'back';
|
|
state.sourceApiKeyRef = ref;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({
|
|
projectDir: args.projectDir,
|
|
prompts,
|
|
});
|
|
if (sourceWarehouseConnectionId === 'back') return 'back';
|
|
state.sourceWarehouseConnectionId = sourceWarehouseConnectionId;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const databaseId = await chooseMetabaseDatabaseId({
|
|
state,
|
|
prompts,
|
|
deps: { discoverMetabaseDatabases: discoverMetabaseDatabaseList },
|
|
});
|
|
if (databaseId === 'back') return 'back';
|
|
state.metabaseDatabaseId = databaseId;
|
|
return 'next';
|
|
},
|
|
]);
|
|
}
|
|
|
|
if (source === 'looker') {
|
|
return await runSourcePromptSteps(initialState, () => [
|
|
...connectionSteps,
|
|
async (state) => {
|
|
const sourceUrl = await promptText(prompts, { message: 'Looker base URL' });
|
|
if (sourceUrl === undefined) return 'back';
|
|
state.sourceUrl = sourceUrl;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const sourceClientId = await promptText(prompts, { message: 'Looker client id' });
|
|
if (sourceClientId === undefined) return 'back';
|
|
state.sourceClientId = sourceClientId;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const ref = await chooseSourceCredentialRef({
|
|
prompts,
|
|
projectDir: args.projectDir,
|
|
label: 'Looker client secret',
|
|
envName: 'LOOKER_CLIENT_SECRET',
|
|
secretFileName: `${state.sourceConnectionId ?? 'looker-main'}-client-secret`,
|
|
});
|
|
if (ref === 'back') return 'back';
|
|
state.sourceClientSecretRef = ref;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const sourceWarehouseConnectionId = await chooseMappedWarehouseConnectionId({
|
|
projectDir: args.projectDir,
|
|
prompts,
|
|
});
|
|
if (sourceWarehouseConnectionId === 'back') return 'back';
|
|
state.sourceWarehouseConnectionId = sourceWarehouseConnectionId;
|
|
return 'next';
|
|
},
|
|
async (state) => {
|
|
const lookerConnectionName = await promptText(prompts, {
|
|
message: 'Looker connection name',
|
|
placeholder: 'optional',
|
|
});
|
|
if (lookerConnectionName === undefined) return 'back';
|
|
if (lookerConnectionName) {
|
|
state.sourceTarget = lookerConnectionName;
|
|
} else {
|
|
delete state.sourceTarget;
|
|
}
|
|
return 'next';
|
|
},
|
|
]);
|
|
}
|
|
|
|
return await runSourcePromptSteps(initialState, (state) => [
|
|
...connectionSteps,
|
|
async (currentState) => {
|
|
const ref = await chooseSourceCredentialRef({
|
|
prompts,
|
|
projectDir: args.projectDir,
|
|
label: 'Notion integration token',
|
|
envName: 'NOTION_TOKEN',
|
|
secretFileName: `${currentState.sourceConnectionId ?? 'notion-main'}-token`,
|
|
});
|
|
if (ref === 'back') return 'back';
|
|
currentState.sourceApiKeyRef = ref;
|
|
return 'next';
|
|
},
|
|
async (currentState) => {
|
|
const crawlMode = await prompts.select({
|
|
message: 'Which Notion pages should KTX ingest?',
|
|
options: [
|
|
{ value: 'selected_roots', label: 'Specific pages and their subpages (you\'ll paste page IDs)' },
|
|
{ value: 'all_accessible', label: 'All pages the integration can access' },
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (crawlMode === 'back') return 'back';
|
|
currentState.notionCrawlMode = crawlMode === 'all_accessible' ? 'all_accessible' : 'selected_roots';
|
|
if (currentState.notionCrawlMode === 'all_accessible') {
|
|
delete currentState.notionRootPageIds;
|
|
}
|
|
return 'next';
|
|
},
|
|
...(state.notionCrawlMode === 'selected_roots'
|
|
? [
|
|
async (currentState: SourcePromptState) => {
|
|
const roots = await promptText(prompts, {
|
|
message: 'Notion page IDs to ingest (each page includes all its subpages)',
|
|
placeholder: 'page-id-1, page-id-2',
|
|
});
|
|
if (roots === undefined) return 'back';
|
|
currentState.notionRootPageIds = roots
|
|
.split(',')
|
|
.map((root) => root.trim())
|
|
.filter(Boolean);
|
|
return 'next';
|
|
},
|
|
]
|
|
: []),
|
|
]);
|
|
}
|
|
|
|
function existingConnectionIdsBySource(
|
|
connections: Record<string, KtxProjectConnectionConfig>,
|
|
source: KtxSetupSourceType,
|
|
): string[] {
|
|
return Object.entries(connections)
|
|
.filter(([, connection]) => String(connection.driver ?? '').toLowerCase() === source)
|
|
.map(([connectionId]) => connectionId)
|
|
.sort((left, right) => left.localeCompare(right));
|
|
}
|
|
|
|
function defaultConnectionIdForSource(
|
|
connections: Record<string, KtxProjectConnectionConfig>,
|
|
source: KtxSetupSourceType,
|
|
): string {
|
|
const base = `${source}-main`;
|
|
if (!connections[base]) {
|
|
return base;
|
|
}
|
|
let index = 2;
|
|
while (connections[`${base}-${index}`]) {
|
|
index += 1;
|
|
}
|
|
return `${base}-${index}`;
|
|
}
|
|
|
|
async function chooseInteractiveSourceConnection(input: {
|
|
args: KtxSetupSourcesArgs;
|
|
source: KtxSetupSourceType;
|
|
connections: Record<string, KtxProjectConnectionConfig>;
|
|
prompts: KtxSetupSourcesPromptAdapter;
|
|
testGitRepo?: KtxSetupSourcesDeps['testGitRepo'];
|
|
discoverMetabaseDatabases?: KtxSetupSourcesDeps['discoverMetabaseDatabases'];
|
|
}): Promise<InteractiveSourceConnectionChoice> {
|
|
const existingIds = existingConnectionIdsBySource(input.connections, input.source);
|
|
const defaultConnectionId = defaultConnectionIdForSource(input.connections, input.source);
|
|
const label = sourceLabel(input.source);
|
|
|
|
if (existingIds.length === 0) {
|
|
const sourceArgs = await promptForInteractiveSource(
|
|
input.args,
|
|
input.source,
|
|
input.prompts,
|
|
defaultConnectionId,
|
|
input.testGitRepo,
|
|
input.discoverMetabaseDatabases,
|
|
);
|
|
return sourceArgs === 'back' ? 'back' : { kind: 'new', args: sourceArgs };
|
|
}
|
|
|
|
while (true) {
|
|
const choice = await input.prompts.select({
|
|
message: `Configure ${label}`,
|
|
options: [
|
|
...existingIds.map((connectionId) => ({
|
|
value: `existing:${connectionId}`,
|
|
label: `Use existing ${label} connection: ${connectionId}`,
|
|
})),
|
|
{ value: 'new', label: `Add new ${label} connection` },
|
|
{ value: 'back', label: 'Back' },
|
|
],
|
|
});
|
|
if (choice === 'back') return 'back';
|
|
if (choice.startsWith('existing:')) {
|
|
const connectionId = choice.slice('existing:'.length);
|
|
const connection = input.connections[connectionId];
|
|
if (connection) {
|
|
return { kind: 'existing', connectionId, connection };
|
|
}
|
|
continue;
|
|
}
|
|
const sourceArgs = await promptForInteractiveSource(
|
|
input.args,
|
|
input.source,
|
|
input.prompts,
|
|
defaultConnectionId,
|
|
input.testGitRepo,
|
|
input.discoverMetabaseDatabases,
|
|
);
|
|
if (sourceArgs === 'back') {
|
|
continue;
|
|
}
|
|
return { kind: 'new', args: sourceArgs };
|
|
}
|
|
}
|
|
|
|
function buildConnection(source: KtxSetupSourceType, args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
|
|
if (source === 'dbt') {
|
|
return buildDbtConnection(args);
|
|
}
|
|
if (source === 'metricflow') {
|
|
return buildMetricflowConnection(args);
|
|
}
|
|
if (source === 'metabase') {
|
|
return buildMetabaseConnection(args);
|
|
}
|
|
if (source === 'looker') {
|
|
return buildLookerConnection(args);
|
|
}
|
|
if (source === 'lookml') {
|
|
return buildLookmlConnection(args);
|
|
}
|
|
return buildNotionConnection(args);
|
|
}
|
|
|
|
async function validateSource(
|
|
source: KtxSetupSourceType,
|
|
args: { projectDir: string; connectionId: string; connection: KtxProjectConnectionConfig },
|
|
deps: KtxSetupSourcesDeps,
|
|
): Promise<SourceValidationResult> {
|
|
if (source === 'dbt') {
|
|
return await (deps.validateDbt ?? defaultValidateDbt)(args.connection);
|
|
}
|
|
if (source === 'metricflow') {
|
|
return await (deps.validateMetricflow ?? defaultValidateMetricflow)(args.connection);
|
|
}
|
|
if (source === 'metabase') {
|
|
return deps.validateMetabase
|
|
? await deps.validateMetabase(args.projectDir, args.connectionId)
|
|
: { ok: true, detail: 'mapping validation runs after the connection is saved' };
|
|
}
|
|
if (source === 'looker') {
|
|
return await (deps.validateLooker ?? defaultValidateLooker)(args.projectDir, args.connectionId);
|
|
}
|
|
if (source === 'lookml') {
|
|
return await (deps.validateLookml ?? defaultValidateLookml)(args.connection);
|
|
}
|
|
return await (deps.validateNotion ?? defaultValidateNotion)(args.connection);
|
|
}
|
|
|
|
export async function runKtxSetupSourcesStep(
|
|
args: KtxSetupSourcesArgs,
|
|
io: KtxCliIo,
|
|
deps: KtxSetupSourcesDeps = {},
|
|
): Promise<KtxSetupSourcesResult> {
|
|
try {
|
|
if (args.skipSources) {
|
|
await markSourcesComplete(args.projectDir);
|
|
io.stdout.write('│ Context source setup skipped.\n');
|
|
return { status: 'skipped', projectDir: args.projectDir };
|
|
}
|
|
|
|
const prompts = deps.prompts ?? createPromptAdapter();
|
|
const project = await loadKtxProject({ projectDir: args.projectDir });
|
|
if (!hasPrimarySource(project.config)) {
|
|
const message = 'Connect a primary source before adding context sources.';
|
|
if (args.source) {
|
|
io.stderr.write(`${message}\n`);
|
|
return { status: 'failed', projectDir: args.projectDir };
|
|
}
|
|
if (args.inputMode !== 'disabled') {
|
|
io.stdout.write(`│ ${message}\n`);
|
|
return { status: 'skipped', projectDir: args.projectDir };
|
|
}
|
|
}
|
|
|
|
while (true) {
|
|
const selected = args.source
|
|
? [args.source]
|
|
: args.inputMode === 'disabled'
|
|
? []
|
|
: await prompts.multiselect({
|
|
message: withMultiselectNavigation('Which context sources should KTX ingest?'),
|
|
options: [...SOURCE_OPTIONS],
|
|
required: false,
|
|
});
|
|
if (selected.includes('back')) {
|
|
return { status: 'back', projectDir: args.projectDir };
|
|
}
|
|
if (selected.length === 0) {
|
|
if (args.inputMode === 'disabled') {
|
|
io.stderr.write('Missing context source selection: pass --source or --skip-sources.\n');
|
|
return { status: 'missing-input', projectDir: args.projectDir };
|
|
}
|
|
await markSourcesComplete(args.projectDir);
|
|
io.stdout.write('│ No context sources selected.\n');
|
|
return { status: 'skipped', projectDir: args.projectDir };
|
|
}
|
|
|
|
const readyConnectionIds: string[] = [];
|
|
let returnToSourceSelection = false;
|
|
for (const source of selected as KtxSetupSourceType[]) {
|
|
const sourceChoice = args.source
|
|
? ({ kind: 'new', args } as const)
|
|
: await chooseInteractiveSourceConnection({
|
|
args,
|
|
source,
|
|
connections: (await loadKtxProject({ projectDir: args.projectDir })).config.connections,
|
|
prompts,
|
|
testGitRepo: deps.testGitRepo,
|
|
discoverMetabaseDatabases: deps.discoverMetabaseDatabases,
|
|
});
|
|
if (sourceChoice === 'back') {
|
|
if (args.source) {
|
|
return { status: 'back', projectDir: args.projectDir };
|
|
}
|
|
returnToSourceSelection = true;
|
|
break;
|
|
}
|
|
const connectionId =
|
|
sourceChoice.kind === 'existing'
|
|
? sourceChoice.connectionId
|
|
: (sourceChoice.args.sourceConnectionId ?? `${source}-main`);
|
|
const connection =
|
|
sourceChoice.kind === 'existing' ? sourceChoice.connection : buildConnection(source, sourceChoice.args);
|
|
const rollback =
|
|
sourceChoice.kind === 'existing'
|
|
? undefined
|
|
: await writeSourceConnection(args.projectDir, connectionId, connection, sourceAdapter(source));
|
|
if (sourceChoice.kind === 'existing') {
|
|
await ensureSourceAdapterEnabled(args.projectDir, source);
|
|
}
|
|
const validation = await validateSource(source, { projectDir: args.projectDir, connectionId, connection }, deps);
|
|
|
|
if (!validation.ok) {
|
|
await rollback?.();
|
|
io.stderr.write(`${validation.message}\n`);
|
|
return { status: 'failed', projectDir: args.projectDir };
|
|
}
|
|
if (source === 'metabase' || source === 'looker') {
|
|
prompts.log?.(`Validating ${sourceLabel(source)} mapping…`);
|
|
const mappingCode = await (deps.runMapping ?? defaultRunMapping)(args.projectDir, connectionId, io);
|
|
if (mappingCode !== 0) {
|
|
await rollback?.();
|
|
return { status: 'failed', projectDir: args.projectDir };
|
|
}
|
|
}
|
|
if (args.runInitialSourceIngest) {
|
|
const ingestResult = await runInitialSourceIngestWithRecovery({
|
|
args,
|
|
connectionId,
|
|
io,
|
|
prompts,
|
|
deps,
|
|
});
|
|
if (ingestResult === 'failed') {
|
|
await rollback?.();
|
|
return { status: 'failed', projectDir: args.projectDir };
|
|
}
|
|
if (ingestResult === 'back') {
|
|
await rollback?.();
|
|
if (args.source) {
|
|
return { status: 'back', projectDir: args.projectDir };
|
|
}
|
|
returnToSourceSelection = true;
|
|
break;
|
|
}
|
|
} else {
|
|
io.stdout.write(`│ Context source ${connectionId} saved. It will be built during the context build step.\n`);
|
|
}
|
|
readyConnectionIds.push(connectionId);
|
|
}
|
|
|
|
if (returnToSourceSelection) {
|
|
continue;
|
|
}
|
|
|
|
if (readyConnectionIds.length > 0 && !args.source && args.inputMode !== 'disabled') {
|
|
const addMore = await prompts.select({
|
|
message: `${readyConnectionIds.length} context source${readyConnectionIds.length > 1 ? 's' : ''} configured (${readyConnectionIds.join(', ')}). Add another?`,
|
|
options: [
|
|
{ value: 'done', label: 'Done — continue to context build' },
|
|
{ value: 'add', label: 'Add another context source' },
|
|
],
|
|
});
|
|
if (addMore === 'add') {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
await markSourcesComplete(args.projectDir);
|
|
return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds };
|
|
}
|
|
} catch (error) {
|
|
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
|
|
return { status: 'failed', projectDir: args.projectDir };
|
|
}
|
|
}
|