From 8e5c8097d503f1177f04b40e13d7c6646c28d72f Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Thu, 21 May 2026 14:31:04 +0200 Subject: [PATCH] refactor(cli): inject embedding provider resolution and detect sentence-transformers runtime Make resolveProjectEmbeddingProvider and runtimeIo injectable in ingest and scan command entrypoints so tests can stub them, and teach resolvePublicIngestRuntimeRequirements to flag the local-embeddings runtime feature when ktx.yaml selects sentence-transformers. --- packages/cli/src/context/ingest/ports.ts | 1 - packages/cli/src/context/llm/runtime-tools.ts | 1 - .../cli/src/context/project/driver-schemas.ts | 1 - packages/cli/src/context/sl/types.ts | 1 - packages/cli/src/context/wiki/ports.ts | 1 - packages/cli/src/context/wiki/types.ts | 1 - packages/cli/src/ingest.test.ts | 46 +++++++++++++++ packages/cli/src/ingest.ts | 6 +- packages/cli/src/public-ingest.test.ts | 57 +++++++++++++++++++ packages/cli/src/public-ingest.ts | 5 +- packages/cli/src/runtime-requirements.test.ts | 45 ++++++++++----- packages/cli/src/runtime-requirements.ts | 9 +++ packages/cli/src/scan.test.ts | 49 ++++++++++++++++ packages/cli/src/scan.ts | 6 +- 14 files changed, 203 insertions(+), 26 deletions(-) diff --git a/packages/cli/src/context/ingest/ports.ts b/packages/cli/src/context/ingest/ports.ts index 54d28fe6..76e9d765 100644 --- a/packages/cli/src/context/ingest/ports.ts +++ b/packages/cli/src/context/ingest/ports.ts @@ -359,4 +359,3 @@ export interface IngestBundleRunnerDeps { curatorPagination?: CuratorPaginationPort; logger?: KtxLogger; } - diff --git a/packages/cli/src/context/llm/runtime-tools.ts b/packages/cli/src/context/llm/runtime-tools.ts index 0c83c8d4..4a52bdd1 100644 --- a/packages/cli/src/context/llm/runtime-tools.ts +++ b/packages/cli/src/context/llm/runtime-tools.ts @@ -83,4 +83,3 @@ export function createRuntimeToolDescriptorFromAiTool(name: string, aiSdkTool: T }, }; } - diff --git a/packages/cli/src/context/project/driver-schemas.ts b/packages/cli/src/context/project/driver-schemas.ts index 2e690295..a3b71bff 100644 --- a/packages/cli/src/context/project/driver-schemas.ts +++ b/packages/cli/src/context/project/driver-schemas.ts @@ -207,4 +207,3 @@ export const connectionConfigSchema = z.discriminatedUnion('driver', [ dbtConnectionSchema, metricflowConnectionSchema, ]); - diff --git a/packages/cli/src/context/sl/types.ts b/packages/cli/src/context/sl/types.ts index f58b57c5..cc9575b7 100644 --- a/packages/cli/src/context/sl/types.ts +++ b/packages/cli/src/context/sl/types.ts @@ -108,4 +108,3 @@ export interface SlSearchLaneSummary { weight: number; reason?: string; } - diff --git a/packages/cli/src/context/wiki/ports.ts b/packages/cli/src/context/wiki/ports.ts index 6b026966..aa5d2fe1 100644 --- a/packages/cli/src/context/wiki/ports.ts +++ b/packages/cli/src/context/wiki/ports.ts @@ -70,4 +70,3 @@ export interface KnowledgeGitDiffPort { ): Promise>; getFileAtCommit(path: string, sha: string): Promise; } - diff --git a/packages/cli/src/context/wiki/types.ts b/packages/cli/src/context/wiki/types.ts index e5ff3312..bd54d130 100644 --- a/packages/cli/src/context/wiki/types.ts +++ b/packages/cli/src/context/wiki/types.ts @@ -49,4 +49,3 @@ export interface WikiSearchLaneSummary { weight: number; reason?: string; } - diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index d5b1689d..33eecfba 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -1415,6 +1415,52 @@ describe('runKtxIngest', () => { ); }); + it('uses runtime IO when resolving managed embedding runtime', async () => { + const projectDir = join(tempDir, 'managed-embedding-ingest-project'); + await initKtxProject({ projectDir }); + await writeWarehouseConfig(projectDir); + const createdAdapters: SourceAdapter[] = [ + { source: 'fake', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) }, + ]; + const createAdapters = vi.fn(() => createdAdapters as never); + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => + completedLocalBundleRun(input, input.jobId ?? 'local-job-1'), + ); + const resolveEmbeddingProvider = vi.fn(async () => ({ kind: 'disabled' as const })); + const io = makeIo(); + const runtimeIo = makeIo({ isTTY: true }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'fake', + cliVersion: '0.2.0', + runtimeInstallPolicy: 'auto', + outputMode: 'plain', + } satisfies KtxIngestArgs, + io.io, + { + createAdapters, + runLocalIngest: runLocal, + jobIdFactory: () => 'local-job-1', + runtimeIo: runtimeIo.io, + resolveEmbeddingProvider, + }, + ), + ).resolves.toBe(0); + + expect(resolveEmbeddingProvider).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + installPolicy: 'auto', + io: runtimeIo.io, + }), + ); + }); + it('passes the target connection id when constructing local historic-sql adapters', async () => { const projectDir = join(tempDir, 'historic-sql-project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 0c84be72..3615f401 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -72,6 +72,7 @@ export interface KtxIngestDeps { now?: () => Date; createAdapters?: typeof createKtxCliLocalIngestAdapters; createQueryExecutor?: (project: KtxLocalProject) => KtxSqlQueryExecutorPort; + resolveEmbeddingProvider?: typeof resolveProjectEmbeddingProvider; runLocalIngest?: typeof runLocalIngest; runLocalMetabaseIngest?: typeof runLocalMetabaseIngest; readReportFile?: typeof readIngestReportSnapshotFile; @@ -675,11 +676,12 @@ export async function runKtxIngest( const project = await loadKtxProject({ projectDir: args.projectDir }); const env = deps.env ?? process.env; if (args.command === 'run') { - const resolution = await resolveProjectEmbeddingProvider(project, { + const resolveEmbeddingProvider = deps.resolveEmbeddingProvider ?? resolveProjectEmbeddingProvider; + const resolution = await resolveEmbeddingProvider(project, { mode: 'ensure', installPolicy: args.runtimeInstallPolicy ?? 'never', cliVersion: args.cliVersion ?? getKtxCliPackageInfo().version, - io, + io: deps.runtimeIo ?? io, }); const embeddingProvider = resolution.kind === 'disabled' || resolution.kind === 'managed-unavailable' ? null : resolution.provider; diff --git a/packages/cli/src/public-ingest.test.ts b/packages/cli/src/public-ingest.test.ts index ced91a92..ac2560cd 100644 --- a/packages/cli/src/public-ingest.test.ts +++ b/packages/cli/src/public-ingest.test.ts @@ -801,6 +801,63 @@ describe('runKtxPublicIngest', () => { ); }); + it('preflights foreground managed embeddings runtime before starting the context-build view', async () => { + const io = makeIo({ isTTY: true, interactive: true }); + const config = buildDefaultKtxProjectConfig(); + const project: KtxPublicIngestProject = { + projectDir: '/tmp/project', + config: { + ...config, + connections: { + warehouse: { driver: 'postgres' }, + }, + ingest: { + ...config.ingest, + embeddings: { + backend: 'sentence-transformers', + model: 'all-MiniLM-L6-v2', + dimensions: 384, + }, + }, + }, + }; + const ensureRuntime = vi.fn(async (): Promise => { + return {} as ManagedPythonCommandRuntime; + }); + const runContextBuild = vi.fn(async () => ({ exitCode: 0 })); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + json: false, + inputMode: 'auto', + queryHistory: 'default', + cliVersion: '0.2.0', + runtimeInstallPolicy: 'prompt', + }, + io.io, + { + loadProject: vi.fn(async () => project), + ensureRuntime, + runContextBuild, + }, + ), + ).resolves.toBe(0); + + expect(ensureRuntime).toHaveBeenCalledWith( + expect.objectContaining({ + cliVersion: '0.2.0', + installPolicy: 'prompt', + feature: 'local-embeddings', + }), + ); + expect(runContextBuild).toHaveBeenCalled(); + }); + it('runs all independent targets and reports partial failures', async () => { const io = makeIo(); const project = projectWithConnections({ diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 1a9c6674..60b9622c 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -874,7 +874,10 @@ export async function runKtxPublicIngest( const project = await loadProject({ projectDir: args.projectDir }); if (shouldUseForegroundContextBuildView(args, io)) { const plan = buildPublicIngestPlan(project, args); - const requirements = resolvePublicIngestRuntimeRequirements(plan, { env: deps.env ?? process.env }); + const requirements = resolvePublicIngestRuntimeRequirements(plan, { + config: project.config, + env: deps.env ?? process.env, + }); const ensureRuntime = deps.ensureRuntime ?? ensureManagedPythonCommandRuntime; for (const feature of requirements.features) { try { diff --git a/packages/cli/src/runtime-requirements.test.ts b/packages/cli/src/runtime-requirements.test.ts index 8eec1116..5f8831cf 100644 --- a/packages/cli/src/runtime-requirements.test.ts +++ b/packages/cli/src/runtime-requirements.test.ts @@ -60,21 +60,36 @@ describe('runtime requirement detection', () => { }); it('detects foreground ingest runtime needs from selected query-history targets', () => { + const config: KtxProjectConfig = { + ...buildDefaultKtxProjectConfig(), + ingest: { + ...buildDefaultKtxProjectConfig().ingest, + embeddings: { + backend: 'sentence-transformers' as const, + model: 'all-MiniLM-L6-v2', + dimensions: 384, + }, + }, + }; + expect( - resolvePublicIngestRuntimeRequirements({ - projectDir: '/tmp/project', - warnings: [], - targets: [ - { - connectionId: 'warehouse', - driver: 'postgres', - operation: 'database-ingest', - debugCommand: 'ktx ingest warehouse --debug', - steps: ['database-schema', 'query-history'], - queryHistory: { enabled: true }, - }, - ], - }).features, - ).toEqual(['core']); + resolvePublicIngestRuntimeRequirements( + { + projectDir: '/tmp/project', + warnings: [], + targets: [ + { + connectionId: 'warehouse', + driver: 'postgres', + operation: 'database-ingest', + debugCommand: 'ktx ingest warehouse --debug', + steps: ['database-schema', 'query-history'], + queryHistory: { enabled: true }, + }, + ], + }, + { config }, + ).features, + ).toEqual(['core', 'local-embeddings']); }); }); diff --git a/packages/cli/src/runtime-requirements.ts b/packages/cli/src/runtime-requirements.ts index df1c2a56..31ad1be0 100644 --- a/packages/cli/src/runtime-requirements.ts +++ b/packages/cli/src/runtime-requirements.ts @@ -25,6 +25,7 @@ export interface KtxProjectRuntimeRequirementOptions { } export interface KtxPublicIngestRuntimeRequirementOptions { + config?: KtxProjectConfig; env?: NodeJS.ProcessEnv | Record; } @@ -149,5 +150,13 @@ export function resolvePublicIngestRuntimeRequirements( } } + if (options.config && requiresManagedLocalEmbeddings(options.config.ingest.embeddings)) { + requirements.push({ + feature: 'local-embeddings', + reason: 'local-embeddings', + detail: 'Local sentence-transformers embeddings use the managed Python runtime.', + }); + } + return uniqueRequirements(requirements); } diff --git a/packages/cli/src/scan.test.ts b/packages/cli/src/scan.test.ts index fafecd97..0d2bcdc9 100644 --- a/packages/cli/src/scan.test.ts +++ b/packages/cli/src/scan.test.ts @@ -428,6 +428,55 @@ describe('runKtxScan', () => { }); }); + it('uses runtime IO when resolving managed embedding runtime', async () => { + await initKtxProject({ projectDir: tempDir }); + const runLocalScan = vi.fn( + async (_input: RunLocalScanOptions): Promise => ({ + runId: 'scan-run-1', + status: 'done', + done: true, + connectionId: 'warehouse', + mode: 'structural', + dryRun: false, + syncId: 'sync-1', + report, + }), + ); + const resolveEmbeddingProvider = vi.fn(async () => ({ kind: 'disabled' as const })); + const io = makeIo(); + const runtimeIo = makeIo({ isTTY: true }); + + await expect( + runKtxScan( + { + command: 'run', + projectDir: tempDir, + connectionId: 'warehouse', + mode: 'structural', + detectRelationships: false, + dryRun: false, + cliVersion: '0.2.0', + runtimeInstallPolicy: 'auto', + }, + io.io, + { + runLocalScan, + createLocalIngestAdapters: noLocalIngestAdapters, + runtimeIo: runtimeIo.io, + resolveEmbeddingProvider, + }, + ), + ).resolves.toBe(0); + + expect(resolveEmbeddingProvider).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + installPolicy: 'auto', + io: runtimeIo.io, + }), + ); + }); + it('explains warnings, capability gaps, and relationships in human scan summaries', async () => { await initKtxProject({ projectDir: tempDir }); const runLocalScan = vi.fn( diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index d5c4ea7c..a92aaa62 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -26,6 +26,7 @@ export interface KtxScanArgs { export interface KtxScanDeps { runLocalScan?: typeof runLocalScan; createLocalIngestAdapters?: typeof createKtxCliLocalIngestAdapters; + resolveEmbeddingProvider?: typeof resolveProjectEmbeddingProvider; progress?: KtxProgressPort; runtimeIo?: KtxCliIo; } @@ -312,11 +313,12 @@ export function createCliScanProgress( export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps: KtxScanDeps = {}): Promise { try { const project = await loadKtxProject({ projectDir: args.projectDir }); - const resolution = await resolveProjectEmbeddingProvider(project, { + const resolveEmbeddingProvider = deps.resolveEmbeddingProvider ?? resolveProjectEmbeddingProvider; + const resolution = await resolveEmbeddingProvider(project, { mode: 'ensure', installPolicy: args.runtimeInstallPolicy ?? 'never', cliVersion: args.cliVersion ?? getKtxCliPackageInfo().version, - io, + io: deps.runtimeIo ?? io, }); const embeddingProvider = resolution.kind === 'disabled' || resolution.kind === 'managed-unavailable' ? null : resolution.provider;