feat(cli): preflight deep public ingest readiness

This commit is contained in:
Andrey Avtomonov 2026-05-13 18:22:19 +02:00
parent 27ba226a57
commit f2b1ecbb61
3 changed files with 223 additions and 37 deletions

View file

@ -0,0 +1,77 @@
import type { KtxProjectConfig, KtxProjectConnectionConfig } from '@ktx/context/project';
export type KtxDatabaseContextDepth = 'fast' | 'deep';
export const KTX_DATABASE_DRIVER_IDS = new Set([
'sqlite',
'postgres',
'postgresql',
'mysql',
'clickhouse',
'sqlserver',
'bigquery',
'snowflake',
]);
export function normalizeConnectionDriver(connection: KtxProjectConnectionConfig): string {
return String(connection.driver ?? '')
.trim()
.toLowerCase();
}
export function isDatabaseDriver(driver: string): boolean {
return KTX_DATABASE_DRIVER_IDS.has(driver.trim().toLowerCase());
}
export function connectionContextRecord(connection: KtxProjectConnectionConfig): Record<string, unknown> {
const context = connection.context;
return typeof context === 'object' && context !== null && !Array.isArray(context)
? (context as Record<string, unknown>)
: {};
}
export function databaseContextDepth(connection: KtxProjectConnectionConfig): KtxDatabaseContextDepth | undefined {
const depth = connectionContextRecord(connection).depth;
return depth === 'fast' || depth === 'deep' ? depth : undefined;
}
export function withDatabaseContextDepth(
connection: KtxProjectConnectionConfig,
depth: KtxDatabaseContextDepth,
): KtxProjectConnectionConfig {
return {
...connection,
context: {
...connectionContextRecord(connection),
depth,
},
};
}
export function deepReadinessGaps(config: KtxProjectConfig): string[] {
const gaps: string[] = [];
if (config.llm.provider.backend === 'none' || !config.llm.models.default) {
gaps.push('model configuration');
}
if (config.scan.enrichment.mode !== 'llm') {
gaps.push('scan enrichment mode');
}
const embeddings = config.scan.enrichment.embeddings;
if (
!embeddings ||
embeddings.backend === 'none' ||
embeddings.backend === 'deterministic' ||
!embeddings.model ||
embeddings.dimensions <= 0
) {
gaps.push('scan embeddings');
}
return gaps;
}
export function recommendedDatabaseContextDepth(config: KtxProjectConfig): KtxDatabaseContextDepth {
return deepReadinessGaps(config).length === 0 ? 'deep' : 'fast';
}

View file

@ -34,6 +34,40 @@ function projectWithConnections(connections: KtxProjectConfig['connections']): K
};
}
function deepReadyProject(
connections: KtxProjectConfig['connections'],
relationshipsEnabled = true,
): KtxPublicIngestProject {
const config = buildDefaultKtxProjectConfig('warehouse');
return {
projectDir: '/tmp/project',
config: {
...config,
connections,
llm: {
...config.llm,
provider: { backend: 'gateway', gateway: { api_key: 'env:KTX_GATEWAY_API_KEY' } },
models: { default: 'gpt-test' },
},
scan: {
...config.scan,
enrichment: {
mode: 'llm',
embeddings: {
backend: 'openai',
model: 'text-embedding-3-small',
dimensions: 1536,
},
},
relationships: {
...config.scan.relationships,
enabled: relationshipsEnabled,
},
},
},
};
}
describe('buildPublicIngestPlan', () => {
it('plans warehouse connections as scan targets and source connections as source ingest targets', () => {
const project = projectWithConnections({
@ -52,6 +86,7 @@ describe('buildPublicIngestPlan', () => {
debugCommand: 'ktx ingest warehouse --debug',
steps: ['database-schema'],
databaseDepth: 'fast',
detectRelationships: false,
queryHistory: { enabled: false },
},
{
@ -158,12 +193,50 @@ describe('buildPublicIngestPlan', () => {
});
expect(plan.warnings).toEqual(['--query-history is not supported for sqlite; running schema ingest for local.']);
});
it('records a preflight failure for deep database ingest when readiness config is missing', () => {
const project = projectWithConnections({
warehouse: { driver: 'postgres', context: { depth: 'deep' } },
});
const plan = buildPublicIngestPlan(project, {
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
queryHistory: 'default',
});
expect(plan.targets[0]).toMatchObject({
connectionId: 'warehouse',
databaseDepth: 'deep',
preflightFailure:
'warehouse requires deep ingest readiness: model configuration, scan enrichment mode, scan embeddings. Run ktx setup or rerun with --fast.',
});
});
it('honors scan.relationships.enabled when planning deep database ingest', () => {
const plan = buildPublicIngestPlan(
deepReadyProject({ warehouse: { driver: 'postgres', context: { depth: 'deep' } } }, false),
{
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
queryHistory: 'default',
},
);
expect(plan.targets[0]).toMatchObject({
connectionId: 'warehouse',
databaseDepth: 'deep',
detectRelationships: false,
});
});
});
describe('runKtxPublicIngest', () => {
it('maps fast and deep database targets to scan internals', async () => {
const io = makeIo();
const project = projectWithConnections({
const project = deepReadyProject({
fast: { driver: 'postgres' },
deep: { driver: 'postgres', context: { depth: 'deep' } },
});
@ -191,7 +264,7 @@ describe('runKtxPublicIngest', () => {
it('runs query history after schema ingest with current-run window override', async () => {
const io = makeIo();
const project = projectWithConnections({
const project = deepReadyProject({
warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, windowDays: 90 } } },
});
const runScan = vi.fn(async () => 0);
@ -278,9 +351,34 @@ describe('runKtxPublicIngest', () => {
expect(io.stdout()).toContain('Debug: ktx ingest warehouse --debug');
});
it('fails deep-readiness targets before work starts while continuing independent --all targets', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres', context: { depth: 'deep' } },
docs: { driver: 'notion' },
});
const runScan = vi.fn(async () => 0);
const runIngest = vi.fn(async () => 0);
await expect(
runKtxPublicIngest(
{ command: 'run', projectDir: '/tmp/project', all: true, json: false, inputMode: 'disabled' },
io.io,
{ loadProject: vi.fn(async () => project), runScan, runIngest },
),
).resolves.toBe(1);
expect(runScan).not.toHaveBeenCalled();
expect(runIngest).toHaveBeenCalledWith(
expect.objectContaining({ command: 'run', connectionId: 'docs', adapter: 'notion' }),
expect.anything(),
);
expect(io.stdout()).toContain('warehouse requires deep ingest readiness');
});
it('can request enriched relationship scans for setup-managed context builds', async () => {
const io = makeIo();
const project = projectWithConnections({ warehouse: { driver: 'postgres' } });
const project = deepReadyProject({ warehouse: { driver: 'postgres' } });
const runScan = vi.fn(async () => 0);
await expect(

View file

@ -2,6 +2,13 @@ import { type KtxLocalProject, type KtxProjectConnectionConfig, loadKtxProject }
import type { KtxProgressPort } from '@ktx/context/scan';
import type { KtxCliIo } from './index.js';
import type { KtxIngestArgs, KtxIngestDeps, KtxIngestProgressUpdate } from './ingest.js';
import {
type KtxDatabaseContextDepth,
databaseContextDepth,
deepReadinessGaps,
isDatabaseDriver,
normalizeConnectionDriver,
} from './ingest-depth.js';
import type { KtxScanArgs, KtxScanDeps } from './scan.js';
import { profileMark } from './startup-profile.js';
@ -10,7 +17,7 @@ profileMark('module:public-ingest');
type KtxPublicIngestStepName = 'database-schema' | 'query-history' | 'source-ingest' | 'memory-update';
type KtxPublicIngestStepStatus = 'done' | 'skipped' | 'failed' | 'not-run';
type KtxPublicIngestInputMode = 'auto' | 'disabled';
type KtxPublicIngestDepth = 'fast' | 'deep';
type KtxPublicIngestDepth = KtxDatabaseContextDepth;
type KtxPublicIngestQueryHistoryFlag = 'default' | 'enabled' | 'disabled';
type HistoricSqlDialect = 'postgres' | 'bigquery' | 'snowflake';
@ -45,6 +52,8 @@ export interface KtxPublicIngestPlanTarget {
debugCommand: string;
steps: KtxPublicIngestStepName[];
databaseDepth?: KtxPublicIngestDepth;
detectRelationships?: boolean;
preflightFailure?: string;
queryHistory?: {
enabled: boolean;
dialect?: HistoricSqlDialect;
@ -92,17 +101,6 @@ const sourceAdapterByDriver = new Map<string, string>([
['lookml', 'lookml'],
]);
const warehouseDrivers = new Set([
'sqlite',
'postgres',
'postgresql',
'mysql',
'clickhouse',
'sqlserver',
'bigquery',
'snowflake',
]);
const queryHistoryDialectByDriver = new Map<string, HistoricSqlDialect>([
['postgres', 'postgres'],
['postgresql', 'postgres'],
@ -110,24 +108,8 @@ const queryHistoryDialectByDriver = new Map<string, HistoricSqlDialect>([
['snowflake', 'snowflake'],
]);
function normalizedDriver(connection: KtxProjectConnectionConfig): string {
return String(connection.driver ?? '')
.trim()
.toLowerCase();
}
function connectionContext(connection: KtxProjectConnectionConfig): Record<string, unknown> {
const value = connection.context;
return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record<string, unknown>) : {};
}
function storedDepth(connection: KtxProjectConnectionConfig): KtxPublicIngestDepth | undefined {
const value = connectionContext(connection).depth;
return value === 'fast' || value === 'deep' ? value : undefined;
}
function storedQueryHistory(connection: KtxProjectConnectionConfig): Record<string, unknown> {
const value = connectionContext(connection).queryHistory;
const value = connection.context?.queryHistory;
return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record<string, unknown>) : {};
}
@ -163,7 +145,8 @@ function resolveDatabaseTargetOptions(input: {
const explicitQueryHistory = input.args.queryHistory ?? 'default';
const storedEnabled = storedQh.enabled === true;
const requestedQh = explicitQueryHistory === 'enabled' || (explicitQueryHistory === 'default' && storedEnabled);
let depth = input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? storedDepth(input.connection) ?? 'fast';
let depth =
input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? databaseContextDepth(input.connection) ?? 'fast';
const queryHistory = {
enabled: false,
...(input.args.queryHistoryWindowDays !== undefined
@ -219,6 +202,7 @@ function resolveDatabaseTargetOptions(input: {
function targetForConnection(
connectionId: string,
connection: KtxProjectConnectionConfig,
projectConfig: KtxPublicIngestProject['config'],
args: {
depth?: KtxPublicIngestDepth;
queryHistory?: KtxPublicIngestQueryHistoryFlag;
@ -227,7 +211,7 @@ function targetForConnection(
},
warnings: string[],
): KtxPublicIngestPlanTarget {
const driver = normalizedDriver(connection);
const driver = normalizeConnectionDriver(connection);
const adapter = sourceAdapterByDriver.get(driver);
const sourceDir = sourceDirForConnection(connection);
if (adapter) {
@ -248,13 +232,22 @@ function targetForConnection(
};
}
if (warehouseDrivers.has(driver)) {
if (isDatabaseDriver(driver)) {
const options = resolveDatabaseTargetOptions({ connectionId, driver, connection, args, warnings });
const gaps = options.databaseDepth === 'deep' ? deepReadinessGaps(projectConfig) : [];
return {
connectionId,
driver,
operation: 'database-ingest',
debugCommand: `ktx ingest ${connectionId} --debug`,
detectRelationships: options.databaseDepth === 'deep' && projectConfig.scan.relationships.enabled,
...(gaps.length > 0
? {
preflightFailure: `${connectionId} requires deep ingest readiness: ${gaps.join(
', ',
)}. Run ktx setup or rerun with --fast.`,
}
: {}),
...options,
};
}
@ -289,7 +282,9 @@ export function buildPublicIngestPlan(
}
const warnings: string[] = [];
const targets = selected.map(([connectionId, connection]) => targetForConnection(connectionId, connection, args, warnings));
const targets = selected.map(([connectionId, connection]) =>
targetForConnection(connectionId, connection, project.config, args, warnings),
);
return {
projectDir: args.projectDir,
targets: [
@ -411,6 +406,22 @@ export async function executePublicIngestTarget(
io: KtxCliIo,
deps: KtxPublicIngestDeps,
): Promise<KtxPublicIngestTargetResult> {
if (target.preflightFailure) {
return {
connectionId: target.connectionId,
driver: target.driver,
steps: defaultSteps(target).map((step) =>
step.operation === 'database-schema'
? {
...step,
status: 'failed',
detail: target.preflightFailure,
}
: step,
),
};
}
if (target.operation === 'database-ingest') {
const { runKtxScan } = await import('./scan.js');
const scanArgs: KtxScanArgs = {
@ -418,7 +429,7 @@ export async function executePublicIngestTarget(
projectDir: args.projectDir,
connectionId: target.connectionId,
mode: target.databaseDepth === 'deep' ? 'enriched' : 'structural',
detectRelationships: target.databaseDepth === 'deep' ? true : false,
detectRelationships: target.detectRelationships === true,
dryRun: false,
};
const runScan = deps.runScan ?? runKtxScan;