From 807f86d761e74c903492c8a2d9229dab75e8400b Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 14 May 2026 17:57:41 +0200 Subject: [PATCH] feat(context): execute MCP SQL through validated connector path --- .../src/mcp/local-project-ports.test.ts | 100 +++++++++++++++++- .../context/src/mcp/local-project-ports.ts | 62 +++++++++++ 2 files changed, 159 insertions(+), 3 deletions(-) diff --git a/packages/context/src/mcp/local-project-ports.test.ts b/packages/context/src/mcp/local-project-ports.test.ts index fab2f076..190e926c 100644 --- a/packages/context/src/mcp/local-project-ports.test.ts +++ b/packages/context/src/mcp/local-project-ports.test.ts @@ -5,7 +5,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { AgentRunnerService } from '../agent/index.js'; import { FakeSourceAdapter, type MemoryFlowReplayInput } from '../ingest/index.js'; import { initKtxProject } from '../project/index.js'; -import { createKtxConnectorCapabilities, type KtxScanConnector, type KtxSchemaSnapshot } from '../scan/index.js'; +import { + createKtxConnectorCapabilities, + type KtxQueryResult, + type KtxScanConnector, + type KtxSchemaSnapshot, +} from '../scan/index.js'; import { writeLocalSlSource } from '../sl/index.js'; import { createLocalProjectMcpContextPorts } from './local-project-ports.js'; @@ -60,12 +65,13 @@ describe('createLocalProjectMcpContextPorts', () => { }; } - function testConnector(snapshot = testSnapshot()): KtxScanConnector { + function testConnector(snapshot = testSnapshot(), queryResult?: KtxQueryResult): KtxScanConnector { return { id: `test:${snapshot.connectionId}`, driver: snapshot.driver, - capabilities: createKtxConnectorCapabilities(), + capabilities: createKtxConnectorCapabilities({ readOnlySql: queryResult !== undefined }), introspect: vi.fn(async () => snapshot), + executeReadOnly: queryResult === undefined ? undefined : vi.fn(async () => queryResult), cleanup: vi.fn(async () => {}), }; } @@ -119,6 +125,94 @@ describe('createLocalProjectMcpContextPorts', () => { expect(connector.cleanup).toHaveBeenCalled(); }); + it('executes MCP SQL only after parser-backed validation passes', async () => { + const project = await initKtxProject({ projectDir: tempDir, projectName: 'warehouse' }); + project.config.connections.warehouse = { + driver: 'postgres', + url: 'env:DATABASE_URL', + }; + const connector = testConnector(testSnapshot(), { + headers: ['id'], + headerTypes: ['integer'], + rows: [[1]], + totalRows: 1, + rowCount: 1, + }); + const createConnector = vi.fn(async () => connector); + const sqlAnalysis = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(), + validateReadOnly: vi.fn(async () => ({ ok: true, error: null })), + }; + const ports = createLocalProjectMcpContextPorts(project, { + sqlAnalysis, + localScan: { + createConnector, + }, + }); + + await expect( + ports.sqlExecution?.execute({ + connectionId: 'warehouse', + sql: 'select id from public.orders', + maxRows: 5, + }), + ).resolves.toEqual({ + headers: ['id'], + headerTypes: ['integer'], + rows: [[1]], + rowCount: 1, + }); + expect(sqlAnalysis.validateReadOnly).toHaveBeenCalledWith('select id from public.orders', 'postgres'); + expect(createConnector).toHaveBeenCalledWith('warehouse'); + expect(connector.executeReadOnly).toHaveBeenCalledWith( + { + connectionId: 'warehouse', + sql: 'select id from public.orders', + maxRows: 5, + }, + { runId: 'mcp-sql-execution' }, + ); + expect(connector.cleanup).toHaveBeenCalled(); + }); + + it('rejects MCP SQL before connector execution when parser validation fails', async () => { + const project = await initKtxProject({ projectDir: tempDir, projectName: 'warehouse' }); + project.config.connections.warehouse = { + driver: 'postgres', + url: 'env:DATABASE_URL', + }; + const connector = testConnector(testSnapshot(), { + headers: ['id'], + rows: [[1]], + totalRows: 1, + rowCount: 1, + }); + const sqlAnalysis = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(), + validateReadOnly: vi.fn(async () => ({ + ok: false, + error: 'SQL contains read/write operation: Insert', + })), + }; + const ports = createLocalProjectMcpContextPorts(project, { + sqlAnalysis, + localScan: { + createConnector: vi.fn(async () => connector), + }, + }); + + await expect( + ports.sqlExecution?.execute({ + connectionId: 'warehouse', + sql: 'with x as (insert into t values (1) returning *) select * from x', + maxRows: 1000, + }), + ).rejects.toThrow('SQL contains read/write operation: Insert'); + expect(connector.executeReadOnly).not.toHaveBeenCalled(); + }); + it('triggers canonical bundle ingest and reads status, report, and replay through MCP ports', async () => { const project = await initKtxProject({ projectDir: tempDir, projectName: 'warehouse' }); project.config.connections.warehouse = { diff --git a/packages/context/src/mcp/local-project-ports.ts b/packages/context/src/mcp/local-project-ports.ts index 0c325453..a3d42f34 100644 --- a/packages/context/src/mcp/local-project-ports.ts +++ b/packages/context/src/mcp/local-project-ports.ts @@ -26,6 +26,7 @@ import { type LocalScanMcpOptions, runLocalScan, } from '../scan/index.js'; +import type { SqlAnalysisDialect, SqlAnalysisPort } from '../sql-analysis/index.js'; import { compileLocalSlQuery, type LocalSlSourceSearchResult, @@ -44,6 +45,7 @@ import type { KtxScanArtifactReadResponse, KtxScanArtifactSummary, KtxScanArtifactType, + KtxSqlExecutionResponse, } from './types.js'; const LOCAL_AUTHOR = 'ktx'; @@ -53,6 +55,7 @@ const SL_SHAPE_WARNING = 'Local stdio validation checks YAML shape only; Python interface CreateLocalProjectMcpContextPortsOptions { semanticLayerCompute?: KtxSemanticLayerComputePort; queryExecutor?: KtxSqlQueryExecutorPort; + sqlAnalysis?: SqlAnalysisPort; localIngest?: LocalIngestMcpOptions; localScan?: LocalScanMcpOptions; embeddingService?: KtxEmbeddingPort | null; @@ -77,6 +80,10 @@ function dialectForDriver(driver: string | undefined): string { return map[normalized] ?? 'postgres'; } +function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDialect { + return dialectForDriver(driver) as SqlAnalysisDialect; +} + function assertSafePathToken(kind: string, value: string): string { if ( value.trim().length === 0 || @@ -378,6 +385,53 @@ function statusFromIngestReport(report: IngestReportSnapshot): KtxIngestStatusRe }; } +async function executeValidatedReadOnlySql( + project: KtxLocalProject, + options: CreateLocalProjectMcpContextPortsOptions, + input: { connectionId: string; sql: string; maxRows: number }, +): Promise { + const connectionId = assertSafeConnectionId(input.connectionId); + const connection = project.config.connections[connectionId]; + if (!connection) { + throw new Error(`Connection "${connectionId}" is not configured in ktx.yaml`); + } + if (!options.sqlAnalysis) { + throw new Error('sql_execution requires parser-backed SQL validation.'); + } + const validation = await options.sqlAnalysis.validateReadOnly(input.sql, sqlAnalysisDialectForDriver(connection.driver)); + if (!validation.ok) { + throw new Error(validation.error ?? 'SQL is not read-only.'); + } + const createConnector = options.localScan?.createConnector; + if (!createConnector) { + throw new Error('sql_execution requires a local scan connector factory.'); + } + + let connector: KtxScanConnector | null = null; + try { + connector = await createConnector(connectionId); + if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) { + throw new Error(`Connection "${connectionId}" does not support read-only SQL execution.`); + } + const result = await connector.executeReadOnly( + { + connectionId, + sql: input.sql, + maxRows: input.maxRows, + }, + { runId: 'mcp-sql-execution' }, + ); + return { + headers: result.headers, + ...(result.headerTypes ? { headerTypes: result.headerTypes } : {}), + rows: result.rows, + rowCount: result.rowCount ?? result.rows.length, + }; + } finally { + await cleanupConnector(connector); + } +} + export function createLocalProjectMcpContextPorts( project: KtxLocalProject, options: CreateLocalProjectMcpContextPortsOptions = {}, @@ -577,6 +631,14 @@ export function createLocalProjectMcpContextPorts( }, }; + if (options.sqlAnalysis && options.localScan?.createConnector) { + ports.sqlExecution = { + async execute(input) { + return executeValidatedReadOnlySql(project, options, input); + }, + }; + } + if (options.localIngest) { ports.ingest = { async trigger(input) {