diff --git a/packages/context/src/ingest/local-bundle-runtime.test.ts b/packages/context/src/ingest/local-bundle-runtime.test.ts index d8cd3907..47d40154 100644 --- a/packages/context/src/ingest/local-bundle-runtime.test.ts +++ b/packages/context/src/ingest/local-bundle-runtime.test.ts @@ -3,7 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { AgentRunnerService } from '../agent/index.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js'; -import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js'; @@ -12,6 +12,7 @@ type RuntimeWithConnectionDeps = { connections: { listEnabledConnections(ids: string[]): Promise>; getConnectionById(connectionId: string): Promise<{ id: string; name: string; connectionType: string } | null>; + executeQuery(connectionId: string, sql: string): Promise; }; }; }; @@ -113,6 +114,37 @@ describe('createLocalBundleIngestRuntime', () => { ]); }); + it('passes project connection config to local ingest query executors', async () => { + const agentRunner = new AgentRunnerService({ llmProvider: { getModel: () => ({}) as never } as any }); + const queryExecutor = { + execute: vi.fn(async () => ({ + headers: ['answer'], + rows: [[1]], + totalRows: 1, + command: 'SELECT', + rowCount: 1, + })), + }; + + const runtime = createLocalBundleIngestRuntime({ + project, + adapters: [new FakeSourceAdapter()], + agentRunner, + queryExecutor, + }); + const connections = (runtime.runner as unknown as RuntimeWithConnectionDeps).deps.connections; + + await expect(connections.executeQuery('warehouse', 'select 1')).resolves.toMatchObject({ + headers: ['answer'], + }); + expect(queryExecutor.execute).toHaveBeenCalledWith({ + connectionId: 'warehouse', + projectDir: project.projectDir, + connection: project.config.connections.warehouse, + sql: 'select 1', + }); + }); + it('accepts a debug LLM request file when constructing the default agent runner', async () => { await writeFile( join(project.projectDir, 'ktx.yaml'), diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index 4e74335a..cffca376 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -6,7 +6,7 @@ import type { Tool } from 'ai'; import YAML from 'yaml'; import type { AgentRunnerService } from '../agent/index.js'; import { AgentRunnerService as DefaultAgentRunnerService } from '../agent/index.js'; -import { localConnectionInfoFromConfig } from '../connections/index.js'; +import { localConnectionInfoFromConfig, type KtxSqlQueryExecutorPort } from '../connections/index.js'; import type { KtxEmbeddingPort, KtxLogger } from '../core/index.js'; import { noopLogger, SessionWorktreeService } from '../core/index.js'; import type { KtxSemanticLayerComputePort } from '../daemon/index.js'; @@ -104,7 +104,7 @@ export interface CreateLocalBundleIngestRuntimeOptions { llmDebugRequestFile?: string; memoryModel?: string; semanticLayerCompute?: KtxSemanticLayerComputePort; - queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise }; + queryExecutor?: KtxSqlQueryExecutorPort; jobIdFactory?: () => string; logger?: KtxLogger; } @@ -170,9 +170,7 @@ class LocalAuthorResolver implements GitAuthorResolverPort { class LocalConnectionCatalog implements SlConnectionCatalogPort { constructor( private readonly project: KtxLocalProject, - private readonly queryExecutor?: { - execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise; - }, + private readonly queryExecutor?: KtxSqlQueryExecutorPort, ) {} async listEnabledConnections(ids: string[]): Promise { @@ -193,7 +191,12 @@ class LocalConnectionCatalog implements SlConnectionCatalogPort { if (!this.queryExecutor) { throw new Error('Local ingest has no query executor configured'); } - return this.queryExecutor.execute({ connectionId, sql }); + return this.queryExecutor.execute({ + connectionId, + projectDir: this.project.projectDir, + connection: this.project.config.connections[connectionId], + sql, + }); } } diff --git a/packages/context/src/ingest/local-ingest.ts b/packages/context/src/ingest/local-ingest.ts index bc6294c4..2ec13184 100644 --- a/packages/context/src/ingest/local-ingest.ts +++ b/packages/context/src/ingest/local-ingest.ts @@ -3,11 +3,11 @@ import { cp, mkdir, rm } from 'node:fs/promises'; import { isAbsolute, resolve } from 'node:path'; import type { KtxLlmProvider } from '@ktx/llm'; import type { AgentRunnerService } from '../agent/index.js'; +import type { KtxSqlQueryExecutorPort } from '../connections/index.js'; import type { KtxLogger } from '../core/index.js'; import type { KtxSemanticLayerComputePort } from '../daemon/index.js'; import type { KtxLocalProject } from '../project/index.js'; import { ktxLocalStateDbPath } from '../project/index.js'; -import type { KtxQueryResult } from '../sl/index.js'; import { planMetabaseFanoutChildren } from './adapters/metabase/fanout-planner.js'; import { LocalMetabaseSourceStateReader } from './adapters/metabase/local-source-state-store.js'; import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } from './local-adapters.js'; @@ -34,7 +34,7 @@ export interface RunLocalIngestOptions { llmDebugRequestFile?: string; memoryModel?: string; semanticLayerCompute?: KtxSemanticLayerComputePort; - queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise }; + queryExecutor?: KtxSqlQueryExecutorPort; logger?: KtxLogger; } @@ -172,7 +172,7 @@ async function runScheduledPullJob(options: { llmProvider?: KtxLlmProvider; memoryModel?: string; semanticLayerCompute?: KtxSemanticLayerComputePort; - queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise }; + queryExecutor?: KtxSqlQueryExecutorPort; logger?: KtxLogger; }): Promise { const runtime = createLocalBundleIngestRuntime(options);