refactor(cli): inject embedding provider resolution and detect sentence-transformers runtime

Make resolveProjectEmbeddingProvider and runtimeIo injectable in ingest and
scan command entrypoints so tests can stub them, and teach
resolvePublicIngestRuntimeRequirements to flag the local-embeddings runtime
feature when ktx.yaml selects sentence-transformers.
This commit is contained in:
Andrey Avtomonov 2026-05-21 14:31:04 +02:00
parent e096bcf728
commit 8e5c8097d5
14 changed files with 203 additions and 26 deletions

View file

@ -359,4 +359,3 @@ export interface IngestBundleRunnerDeps {
curatorPagination?: CuratorPaginationPort;
logger?: KtxLogger;
}

View file

@ -83,4 +83,3 @@ export function createRuntimeToolDescriptorFromAiTool(name: string, aiSdkTool: T
},
};
}

View file

@ -207,4 +207,3 @@ export const connectionConfigSchema = z.discriminatedUnion('driver', [
dbtConnectionSchema,
metricflowConnectionSchema,
]);

View file

@ -108,4 +108,3 @@ export interface SlSearchLaneSummary {
weight: number;
reason?: string;
}

View file

@ -70,4 +70,3 @@ export interface KnowledgeGitDiffPort {
): Promise<Array<{ status: string; path: string }>>;
getFileAtCommit(path: string, sha: string): Promise<string>;
}

View file

@ -49,4 +49,3 @@ export interface WikiSearchLaneSummary {
weight: number;
reason?: string;
}

View file

@ -1415,6 +1415,52 @@ describe('runKtxIngest', () => {
);
});
it('uses runtime IO when resolving managed embedding runtime', async () => {
const projectDir = join(tempDir, 'managed-embedding-ingest-project');
await initKtxProject({ projectDir });
await writeWarehouseConfig(projectDir);
const createdAdapters: SourceAdapter[] = [
{ source: 'fake', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
];
const createAdapters = vi.fn(() => createdAdapters as never);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) =>
completedLocalBundleRun(input, input.jobId ?? 'local-job-1'),
);
const resolveEmbeddingProvider = vi.fn(async () => ({ kind: 'disabled' as const }));
const io = makeIo();
const runtimeIo = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
outputMode: 'plain',
} satisfies KtxIngestArgs,
io.io,
{
createAdapters,
runLocalIngest: runLocal,
jobIdFactory: () => 'local-job-1',
runtimeIo: runtimeIo.io,
resolveEmbeddingProvider,
},
),
).resolves.toBe(0);
expect(resolveEmbeddingProvider).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
installPolicy: 'auto',
io: runtimeIo.io,
}),
);
});
it('passes the target connection id when constructing local historic-sql adapters', async () => {
const projectDir = join(tempDir, 'historic-sql-project');
await writeWarehouseConfig(projectDir);

View file

@ -72,6 +72,7 @@ export interface KtxIngestDeps {
now?: () => Date;
createAdapters?: typeof createKtxCliLocalIngestAdapters;
createQueryExecutor?: (project: KtxLocalProject) => KtxSqlQueryExecutorPort;
resolveEmbeddingProvider?: typeof resolveProjectEmbeddingProvider;
runLocalIngest?: typeof runLocalIngest;
runLocalMetabaseIngest?: typeof runLocalMetabaseIngest;
readReportFile?: typeof readIngestReportSnapshotFile;
@ -675,11 +676,12 @@ export async function runKtxIngest(
const project = await loadKtxProject({ projectDir: args.projectDir });
const env = deps.env ?? process.env;
if (args.command === 'run') {
const resolution = await resolveProjectEmbeddingProvider(project, {
const resolveEmbeddingProvider = deps.resolveEmbeddingProvider ?? resolveProjectEmbeddingProvider;
const resolution = await resolveEmbeddingProvider(project, {
mode: 'ensure',
installPolicy: args.runtimeInstallPolicy ?? 'never',
cliVersion: args.cliVersion ?? getKtxCliPackageInfo().version,
io,
io: deps.runtimeIo ?? io,
});
const embeddingProvider =
resolution.kind === 'disabled' || resolution.kind === 'managed-unavailable' ? null : resolution.provider;

View file

@ -801,6 +801,63 @@ describe('runKtxPublicIngest', () => {
);
});
it('preflights foreground managed embeddings runtime before starting the context-build view', async () => {
const io = makeIo({ isTTY: true, interactive: true });
const config = buildDefaultKtxProjectConfig();
const project: KtxPublicIngestProject = {
projectDir: '/tmp/project',
config: {
...config,
connections: {
warehouse: { driver: 'postgres' },
},
ingest: {
...config.ingest,
embeddings: {
backend: 'sentence-transformers',
model: 'all-MiniLM-L6-v2',
dimensions: 384,
},
},
},
};
const ensureRuntime = vi.fn(async (): Promise<ManagedPythonCommandRuntime> => {
return {} as ManagedPythonCommandRuntime;
});
const runContextBuild = vi.fn(async () => ({ exitCode: 0 }));
await expect(
runKtxPublicIngest(
{
command: 'run',
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
json: false,
inputMode: 'auto',
queryHistory: 'default',
cliVersion: '0.2.0',
runtimeInstallPolicy: 'prompt',
},
io.io,
{
loadProject: vi.fn(async () => project),
ensureRuntime,
runContextBuild,
},
),
).resolves.toBe(0);
expect(ensureRuntime).toHaveBeenCalledWith(
expect.objectContaining({
cliVersion: '0.2.0',
installPolicy: 'prompt',
feature: 'local-embeddings',
}),
);
expect(runContextBuild).toHaveBeenCalled();
});
it('runs all independent targets and reports partial failures', async () => {
const io = makeIo();
const project = projectWithConnections({

View file

@ -874,7 +874,10 @@ export async function runKtxPublicIngest(
const project = await loadProject({ projectDir: args.projectDir });
if (shouldUseForegroundContextBuildView(args, io)) {
const plan = buildPublicIngestPlan(project, args);
const requirements = resolvePublicIngestRuntimeRequirements(plan, { env: deps.env ?? process.env });
const requirements = resolvePublicIngestRuntimeRequirements(plan, {
config: project.config,
env: deps.env ?? process.env,
});
const ensureRuntime = deps.ensureRuntime ?? ensureManagedPythonCommandRuntime;
for (const feature of requirements.features) {
try {

View file

@ -60,21 +60,36 @@ describe('runtime requirement detection', () => {
});
it('detects foreground ingest runtime needs from selected query-history targets', () => {
const config: KtxProjectConfig = {
...buildDefaultKtxProjectConfig(),
ingest: {
...buildDefaultKtxProjectConfig().ingest,
embeddings: {
backend: 'sentence-transformers' as const,
model: 'all-MiniLM-L6-v2',
dimensions: 384,
},
},
};
expect(
resolvePublicIngestRuntimeRequirements({
projectDir: '/tmp/project',
warnings: [],
targets: [
{
connectionId: 'warehouse',
driver: 'postgres',
operation: 'database-ingest',
debugCommand: 'ktx ingest warehouse --debug',
steps: ['database-schema', 'query-history'],
queryHistory: { enabled: true },
},
],
}).features,
).toEqual(['core']);
resolvePublicIngestRuntimeRequirements(
{
projectDir: '/tmp/project',
warnings: [],
targets: [
{
connectionId: 'warehouse',
driver: 'postgres',
operation: 'database-ingest',
debugCommand: 'ktx ingest warehouse --debug',
steps: ['database-schema', 'query-history'],
queryHistory: { enabled: true },
},
],
},
{ config },
).features,
).toEqual(['core', 'local-embeddings']);
});
});

View file

@ -25,6 +25,7 @@ export interface KtxProjectRuntimeRequirementOptions {
}
export interface KtxPublicIngestRuntimeRequirementOptions {
config?: KtxProjectConfig;
env?: NodeJS.ProcessEnv | Record<string, string | undefined>;
}
@ -149,5 +150,13 @@ export function resolvePublicIngestRuntimeRequirements(
}
}
if (options.config && requiresManagedLocalEmbeddings(options.config.ingest.embeddings)) {
requirements.push({
feature: 'local-embeddings',
reason: 'local-embeddings',
detail: 'Local sentence-transformers embeddings use the managed Python runtime.',
});
}
return uniqueRequirements(requirements);
}

View file

@ -428,6 +428,55 @@ describe('runKtxScan', () => {
});
});
it('uses runtime IO when resolving managed embedding runtime', async () => {
await initKtxProject({ projectDir: tempDir });
const runLocalScan = vi.fn(
async (_input: RunLocalScanOptions): Promise<LocalScanRunResult> => ({
runId: 'scan-run-1',
status: 'done',
done: true,
connectionId: 'warehouse',
mode: 'structural',
dryRun: false,
syncId: 'sync-1',
report,
}),
);
const resolveEmbeddingProvider = vi.fn(async () => ({ kind: 'disabled' as const }));
const io = makeIo();
const runtimeIo = makeIo({ isTTY: true });
await expect(
runKtxScan(
{
command: 'run',
projectDir: tempDir,
connectionId: 'warehouse',
mode: 'structural',
detectRelationships: false,
dryRun: false,
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
},
io.io,
{
runLocalScan,
createLocalIngestAdapters: noLocalIngestAdapters,
runtimeIo: runtimeIo.io,
resolveEmbeddingProvider,
},
),
).resolves.toBe(0);
expect(resolveEmbeddingProvider).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
installPolicy: 'auto',
io: runtimeIo.io,
}),
);
});
it('explains warnings, capability gaps, and relationships in human scan summaries', async () => {
await initKtxProject({ projectDir: tempDir });
const runLocalScan = vi.fn(

View file

@ -26,6 +26,7 @@ export interface KtxScanArgs {
export interface KtxScanDeps {
runLocalScan?: typeof runLocalScan;
createLocalIngestAdapters?: typeof createKtxCliLocalIngestAdapters;
resolveEmbeddingProvider?: typeof resolveProjectEmbeddingProvider;
progress?: KtxProgressPort;
runtimeIo?: KtxCliIo;
}
@ -312,11 +313,12 @@ export function createCliScanProgress(
export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps: KtxScanDeps = {}): Promise<number> {
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
const resolution = await resolveProjectEmbeddingProvider(project, {
const resolveEmbeddingProvider = deps.resolveEmbeddingProvider ?? resolveProjectEmbeddingProvider;
const resolution = await resolveEmbeddingProvider(project, {
mode: 'ensure',
installPolicy: args.runtimeInstallPolicy ?? 'never',
cliVersion: args.cliVersion ?? getKtxCliPackageInfo().version,
io,
io: deps.runtimeIo ?? io,
});
const embeddingProvider =
resolution.kind === 'disabled' || resolution.kind === 'managed-unavailable' ? null : resolution.provider;