fix(context): pass connection config to ingest query executors

This commit is contained in:
Andrey Avtomonov 2026-05-13 00:02:05 +02:00
parent af9fd77780
commit 6036019a7b
3 changed files with 45 additions and 10 deletions

View file

@ -3,7 +3,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { AgentRunnerService } from '../agent/index.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js';
@ -12,6 +12,7 @@ type RuntimeWithConnectionDeps = {
connections: {
listEnabledConnections(ids: string[]): Promise<Array<{ id: string; name: string; connectionType: string }>>;
getConnectionById(connectionId: string): Promise<{ id: string; name: string; connectionType: string } | null>;
executeQuery(connectionId: string, sql: string): Promise<unknown>;
};
};
};
@ -113,6 +114,37 @@ describe('createLocalBundleIngestRuntime', () => {
]);
});
it('passes project connection config to local ingest query executors', async () => {
const agentRunner = new AgentRunnerService({ llmProvider: { getModel: () => ({}) as never } as any });
const queryExecutor = {
execute: vi.fn(async () => ({
headers: ['answer'],
rows: [[1]],
totalRows: 1,
command: 'SELECT',
rowCount: 1,
})),
};
const runtime = createLocalBundleIngestRuntime({
project,
adapters: [new FakeSourceAdapter()],
agentRunner,
queryExecutor,
});
const connections = (runtime.runner as unknown as RuntimeWithConnectionDeps).deps.connections;
await expect(connections.executeQuery('warehouse', 'select 1')).resolves.toMatchObject({
headers: ['answer'],
});
expect(queryExecutor.execute).toHaveBeenCalledWith({
connectionId: 'warehouse',
projectDir: project.projectDir,
connection: project.config.connections.warehouse,
sql: 'select 1',
});
});
it('accepts a debug LLM request file when constructing the default agent runner', async () => {
await writeFile(
join(project.projectDir, 'ktx.yaml'),

View file

@ -6,7 +6,7 @@ import type { Tool } from 'ai';
import YAML from 'yaml';
import type { AgentRunnerService } from '../agent/index.js';
import { AgentRunnerService as DefaultAgentRunnerService } from '../agent/index.js';
import { localConnectionInfoFromConfig } from '../connections/index.js';
import { localConnectionInfoFromConfig, type KtxSqlQueryExecutorPort } from '../connections/index.js';
import type { KtxEmbeddingPort, KtxLogger } from '../core/index.js';
import { noopLogger, SessionWorktreeService } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
@ -104,7 +104,7 @@ export interface CreateLocalBundleIngestRuntimeOptions {
llmDebugRequestFile?: string;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise<KtxQueryResult> };
queryExecutor?: KtxSqlQueryExecutorPort;
jobIdFactory?: () => string;
logger?: KtxLogger;
}
@ -170,9 +170,7 @@ class LocalAuthorResolver implements GitAuthorResolverPort {
class LocalConnectionCatalog implements SlConnectionCatalogPort {
constructor(
private readonly project: KtxLocalProject,
private readonly queryExecutor?: {
execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise<KtxQueryResult>;
},
private readonly queryExecutor?: KtxSqlQueryExecutorPort,
) {}
async listEnabledConnections(ids: string[]): Promise<KtxConnectionInfo[]> {
@ -193,7 +191,12 @@ class LocalConnectionCatalog implements SlConnectionCatalogPort {
if (!this.queryExecutor) {
throw new Error('Local ingest has no query executor configured');
}
return this.queryExecutor.execute({ connectionId, sql });
return this.queryExecutor.execute({
connectionId,
projectDir: this.project.projectDir,
connection: this.project.config.connections[connectionId],
sql,
});
}
}

View file

@ -3,11 +3,11 @@ import { cp, mkdir, rm } from 'node:fs/promises';
import { isAbsolute, resolve } from 'node:path';
import type { KtxLlmProvider } from '@ktx/llm';
import type { AgentRunnerService } from '../agent/index.js';
import type { KtxSqlQueryExecutorPort } from '../connections/index.js';
import type { KtxLogger } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import type { KtxLocalProject } from '../project/index.js';
import { ktxLocalStateDbPath } from '../project/index.js';
import type { KtxQueryResult } from '../sl/index.js';
import { planMetabaseFanoutChildren } from './adapters/metabase/fanout-planner.js';
import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js';
import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } from './local-adapters.js';
@ -34,7 +34,7 @@ export interface RunLocalIngestOptions {
llmDebugRequestFile?: string;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise<KtxQueryResult> };
queryExecutor?: KtxSqlQueryExecutorPort;
logger?: KtxLogger;
}
@ -172,7 +172,7 @@ async function runScheduledPullJob(options: {
llmProvider?: KtxLlmProvider;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise<KtxQueryResult> };
queryExecutor?: KtxSqlQueryExecutorPort;
logger?: KtxLogger;
}): Promise<LocalIngestResult> {
const runtime = createLocalBundleIngestRuntime(options);