diff --git a/packages/cli/src/mcp-server-factory.ts b/packages/cli/src/mcp-server-factory.ts index 2a0c40ee..5209f9b8 100644 --- a/packages/cli/src/mcp-server-factory.ts +++ b/packages/cli/src/mcp-server-factory.ts @@ -1,5 +1,5 @@ import { createDefaultKtxMcpServer, createLocalProjectMcpContextPorts } from '@ktx/context/mcp'; -import { createLocalProjectMemoryCapture } from '@ktx/context/memory'; +import { createLocalProjectMemoryIngest } from '@ktx/context/memory'; import type { KtxLocalProject } from '@ktx/context/project'; import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import type { KtxCliIo } from './cli-runtime.js'; @@ -41,17 +41,13 @@ export async function createKtxMcpServerFactory(input: { localScan: { createConnector: async (connectionId) => createKtxCliScanConnector(input.project, connectionId), }, - localIngest: { - semanticLayerCompute, - queryExecutor, - }, }); - let memoryCapture: ReturnType | undefined; + let memoryIngest: ReturnType | undefined; try { - memoryCapture = createLocalProjectMemoryCapture(input.project, { semanticLayerCompute, queryExecutor }); + memoryIngest = createLocalProjectMemoryIngest(input.project, { semanticLayerCompute, queryExecutor }); } catch (error) { - input.io?.stderr.write(`KTX MCP memory_capture disabled: ${error instanceof Error ? error.message : String(error)}\n`); + input.io?.stderr.write(`KTX MCP memory_ingest disabled: ${error instanceof Error ? error.message : String(error)}\n`); } return () => @@ -59,7 +55,9 @@ export async function createKtxMcpServerFactory(input: { name: 'ktx', version: input.cliVersion, userContext: { userId: 'local' }, - contextTools, - memoryCapture, + contextTools: { + ...contextTools, + ...(memoryIngest ? { memoryIngest } : {}), + }, }); } diff --git a/packages/context/src/mcp/local-project-ports.test.ts b/packages/context/src/mcp/local-project-ports.test.ts index 0c000831..4e5762ff 100644 --- a/packages/context/src/mcp/local-project-ports.test.ts +++ b/packages/context/src/mcp/local-project-ports.test.ts @@ -1,9 +1,7 @@ -import { access, mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; +import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { AgentRunnerService } from '../agent/index.js'; -import { FakeSourceAdapter, type MemoryFlowReplayInput } from '../ingest/index.js'; import { initKtxProject } from '../project/index.js'; import { createKtxConnectorCapabilities, @@ -14,14 +12,6 @@ import { import { writeLocalSlSource } from '../sl/index.js'; import { createLocalProjectMcpContextPorts } from './local-project-ports.js'; -class TestAgentRunner extends AgentRunnerService { - override runLoop = vi.fn().mockResolvedValue({ stopReason: 'natural' as const }); - - constructor() { - super({ llmProvider: { getModel: () => ({}) as never } as never }); - } -} - describe('createLocalProjectMcpContextPorts', () => { let tempDir: string; @@ -178,7 +168,7 @@ describe('createLocalProjectMcpContextPorts', () => { ); } - it('lists local project connections from ktx.yaml', async () => { + it('lists local project connections and exposes only retained research ports', async () => { const project = await initKtxProject({ projectDir: tempDir }); project.config.connections.warehouse = { driver: 'postgres', @@ -186,48 +176,23 @@ describe('createLocalProjectMcpContextPorts', () => { }; const ports = createLocalProjectMcpContextPorts(project); + expect(Object.keys(ports).sort()).toEqual([ + 'connections', + 'dictionarySearch', + 'discover', + 'entityDetails', + 'knowledge', + 'semanticLayer', + ]); + expect(Object.keys(ports.connections ?? {}).sort()).toEqual(['list']); + expect(Object.keys(ports.knowledge ?? {}).sort()).toEqual(['read', 'search']); + expect(Object.keys(ports.semanticLayer ?? {}).sort()).toEqual(['query', 'readSource']); await expect(ports.connections?.list()).resolves.toEqual([ { id: 'warehouse', name: 'warehouse', connectionType: 'POSTGRESQL' }, ]); }); - it('tests a local project connection through the native scan connector factory', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections.warehouse = { - driver: 'postgres', - url: 'env:DATABASE_URL', - }; - const connector = testConnector(); - const createConnector = vi.fn(async () => connector); - const ports = createLocalProjectMcpContextPorts(project, { - localScan: { - createConnector, - }, - }); - - await expect(ports.connections?.test?.({ connectionId: 'warehouse' })).resolves.toEqual({ - id: 'warehouse', - connectionType: 'POSTGRESQL', - ok: true, - tableCount: 1, - message: 'Connection test passed.', - warnings: [], - }); - expect(createConnector).toHaveBeenCalledWith('warehouse'); - expect(connector.introspect).toHaveBeenCalledWith( - { - connectionId: 'warehouse', - driver: 'postgres', - mode: 'structural', - dryRun: true, - detectRelationships: false, - }, - { runId: 'connection-test-warehouse' }, - ); - expect(connector.cleanup).toHaveBeenCalled(); - }); - - it('executes MCP SQL only after parser-backed validation passes', async () => { + it('adds sql_execution when parser validation and a SQL-capable connector are configured', async () => { const project = await initKtxProject({ projectDir: tempDir }); project.config.connections.warehouse = { driver: 'postgres', @@ -253,6 +218,7 @@ describe('createLocalProjectMcpContextPorts', () => { }, }); + expect(Object.keys(ports).sort()).toContain('sqlExecution'); await expect( ports.sqlExecution?.execute({ connectionId: 'warehouse', @@ -603,241 +569,30 @@ describe('createLocalProjectMcpContextPorts', () => { ); }); - it('triggers canonical bundle ingest and reads status, report, and replay through MCP ports', async () => { + it('reads and searches seeded global wiki pages', async () => { const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections.warehouse = { - driver: 'postgres', - }; - project.config.ingest.adapters = ['fake']; - project.config.ingest.embeddings = { - backend: 'deterministic', - dimensions: 8, - batchSize: 64, - }; - project.config.llm = { - provider: { backend: 'none' }, - models: {}, - }; - - const sourceDir = join(tempDir, 'source'); - await mkdir(join(sourceDir, 'orders'), { recursive: true }); - await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); - - const agentRunner = new TestAgentRunner(); - const ports = createLocalProjectMcpContextPorts(project, { - localIngest: { - adapters: [new FakeSourceAdapter()], - jobIdFactory: () => 'mcp-full-1', - agentRunner, - }, - }); - - const trigger = await ports.ingest?.trigger({ - adapter: 'fake', - connectionId: 'warehouse', - trigger: 'manual_resync', - config: { sourceDir }, - }); - - expect(trigger).toMatchObject({ - runId: expect.any(String), - jobId: 'mcp-full-1', - reportId: expect.any(String), - }); - expect(trigger?.runId).not.toBe('mcp-full-1'); - expect(agentRunner.runLoop).toHaveBeenCalledTimes(1); - - await expect(ports.ingest?.status({ runId: trigger?.jobId ?? '' })).resolves.toMatchObject({ - runId: trigger?.runId, - jobId: 'mcp-full-1', - reportId: trigger?.reportId, - status: 'done', - stage: 'done', - progress: 1, - done: true, - adapter: 'fake', - connectionId: 'warehouse', - sourceDir: null, - diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, - rawFileCount: 1, - workUnitCount: 1, - workUnits: [ - { - unitKey: 'fake-orders', - rawFiles: ['orders/orders.json'], - peerFileIndex: [], - dependencyPaths: [], - }, - ], - evictionDeletedRawPaths: [], - errors: [], - }); - - await expect(ports.ingest?.report?.({ runId: trigger?.reportId ?? '' })).resolves.toMatchObject({ - id: trigger?.reportId, - runId: trigger?.runId, - jobId: 'mcp-full-1', - connectionId: 'warehouse', - sourceKey: 'fake', - }); - - const replay = (await ports.ingest?.replay?.({ runId: trigger?.runId ?? '' })) as MemoryFlowReplayInput | null; - expect(replay).toMatchObject({ - runId: trigger?.runId, - reportId: trigger?.reportId, - reportPath: trigger?.reportId, - status: 'done', - adapter: 'fake', - connectionId: 'warehouse', - syncId: expect.stringContaining('mcp-full-1'), - }); - expect(replay?.events).toEqual( - expect.arrayContaining([ - { type: 'work_unit_finished', unitKey: 'fake-orders', status: 'success' }, - { type: 'report_created', runId: trigger?.runId, reportPath: trigger?.reportId }, - ]), + await project.fileStore.writeFile( + 'wiki/global/revenue.md', + [ + '---', + 'summary: Revenue definition', + 'tags: [finance]', + 'refs: [docs/revenue.md]', + 'sl_refs: [warehouse.orders]', + 'usage_mode: auto', + '---', + '', + '# Revenue', + '', + 'Revenue is net of refunds.', + '', + ].join('\n'), + 'ktx', + 'ktx@example.com', + 'Seed wiki', ); - }); - - it('returns child run metadata for local Metabase fan-out triggers', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections = { - 'prod-metabase': { - driver: 'metabase', - api_url: 'https://metabase.example.com', - }, - warehouse_a: { driver: 'postgres', url: 'postgres://localhost/a' }, - warehouse_b: { driver: 'postgres', url: 'postgres://localhost/b' }, - }; - project.config.ingest.adapters = ['metabase']; - const reportA = { - id: 'report-a', - runId: 'run-a', - jobId: 'child-a', - connectionId: 'warehouse_a', - sourceKey: 'metabase', - createdAt: '2026-05-04T12:00:00.000Z', - body: { - syncId: 'sync-a', - diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, - commitSha: null, - workUnits: [], - failedWorkUnits: [], - reconciliationSkipped: false, - conflictsResolved: [], - evictionsApplied: [], - unmappedFallbacks: [], - evictionInputs: [], - unresolvedCards: [], - supersededBy: null, - overrideOf: null, - provenanceRows: [], - toolTranscripts: [], - }, - }; - const reportB = { - ...reportA, - id: 'report-b', - runId: 'run-b', - jobId: 'child-b', - connectionId: 'warehouse_b', - body: { ...reportA.body, syncId: 'sync-b' }, - }; - - const ports = createLocalProjectMcpContextPorts(project, { - localIngest: { - runLocalMetabaseIngest: async () => ({ - metabaseConnectionId: 'prod-metabase', - status: 'all_succeeded', - totals: { workUnits: 2, failedWorkUnits: 0 }, - children: [ - { - jobId: 'child-a', - metabaseConnectionId: 'prod-metabase', - metabaseDatabaseId: 1, - targetConnectionId: 'warehouse_a', - result: { - jobId: 'child-a', - runId: 'run-a', - syncId: 'sync-a', - diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, - workUnitCount: 0, - failedWorkUnits: [], - artifactsWritten: 0, - commitSha: null, - }, - report: reportA, - }, - { - jobId: 'child-b', - metabaseConnectionId: 'prod-metabase', - metabaseDatabaseId: 2, - targetConnectionId: 'warehouse_b', - result: { - jobId: 'child-b', - runId: 'run-b', - syncId: 'sync-b', - diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, - workUnitCount: 0, - failedWorkUnits: [], - artifactsWritten: 0, - commitSha: null, - }, - report: reportB, - }, - ], - }), - }, - }); - - await expect( - ports.ingest?.trigger({ - adapter: 'metabase', - connectionId: 'prod-metabase', - trigger: 'manual_resync', - }), - ).resolves.toEqual({ - runId: 'metabase-fanout:prod-metabase', - jobId: undefined, - reportId: undefined, - fanout: { - status: 'all_succeeded', - children: [ - { - runId: 'run-a', - jobId: 'child-a', - reportId: 'report-a', - targetConnectionId: 'warehouse_a', - metabaseDatabaseId: 1, - }, - { - runId: 'run-b', - jobId: 'child-b', - reportId: 'report-b', - targetConnectionId: 'warehouse_b', - metabaseDatabaseId: 2, - }, - ], - }, - }); - }); - - it('writes, reads, and searches global wiki pages', async () => { - const project = await initKtxProject({ projectDir: tempDir }); const ports = createLocalProjectMcpContextPorts(project); - await expect( - ports.knowledge?.write({ - userId: 'local-user', - key: 'revenue', - summary: 'Revenue definition', - content: '# Revenue\n\nRevenue is net of refunds.', - tags: ['finance'], - refs: ['docs/revenue.md'], - slRefs: ['warehouse.orders'], - }), - ).resolves.toMatchObject({ success: true, key: 'revenue', action: 'created' }); - await expect(ports.knowledge?.read({ userId: 'local-user', key: 'revenue' })).resolves.toMatchObject({ key: 'revenue', scope: 'GLOBAL', @@ -863,231 +618,32 @@ describe('createLocalProjectMcpContextPorts', () => { totalFound: 1, }); expect(search?.results[0]?.score).toBeGreaterThan(0); - await expect(access(join(project.projectDir, '.ktx', 'db.sqlite'))).resolves.toBeUndefined(); }); - it('writes, lists, reads, and validates semantic-layer sources', async () => { + it('reads seeded semantic-layer sources', async () => { const project = await initKtxProject({ projectDir: tempDir }); + await writeLocalSlSource(project, { + connectionId: 'warehouse', + sourceName: 'orders', + yaml: [ + 'name: orders', + 'table: public.orders', + 'grain:', + ' - id', + 'columns:', + ' - name: id', + ' type: number', + '', + ].join('\n'), + }); const ports = createLocalProjectMcpContextPorts(project); - await expect( - ports.semanticLayer?.writeSource({ - connectionId: 'warehouse', - sourceName: 'orders', - source: { - name: 'orders', - table: 'public.orders', - grain: ['id'], - columns: [{ name: 'id', type: 'number' }], - joins: [], - measures: [{ name: 'order_count', expr: 'count(*)' }], - }, - }), - ).resolves.toMatchObject({ success: true, sourceName: 'orders' }); - - await expect(ports.semanticLayer?.listSources({ connectionId: 'warehouse' })).resolves.toEqual({ - sources: [ - { - connectionId: 'warehouse', - connectionName: 'warehouse', - name: 'orders', - columnCount: 1, - measureCount: 1, - joinCount: 0, - }, - ], - totalSources: 1, - }); - - await expect( - ports.semanticLayer?.listSources({ connectionId: 'warehouse', query: 'order_count' }), - ).resolves.toEqual({ - sources: [ - expect.objectContaining({ - connectionId: 'warehouse', - connectionName: 'warehouse', - name: 'orders', - columnCount: 1, - measureCount: 1, - joinCount: 0, - score: expect.any(Number), - matchReasons: expect.arrayContaining(['lexical']), - }), - ], - totalSources: 1, - }); - await expect(access(join(project.projectDir, '.ktx/db.sqlite'))).resolves.toBeUndefined(); - await expect( ports.semanticLayer?.readSource({ connectionId: 'warehouse', sourceName: 'orders' }), ).resolves.toMatchObject({ sourceName: 'orders', yaml: expect.stringContaining('name: orders'), }); - - await expect(ports.semanticLayer?.validate({ connectionId: 'warehouse' })).resolves.toEqual({ - success: true, - errors: [], - warnings: ['Local stdio validation checks YAML shape only; Python semantic validation is not configured.'], - }); - }); - - it('returns semantic-layer hybrid search metadata through local project ports', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - await writeLocalSlSource(project, { - connectionId: 'warehouse', - sourceName: 'orders', - yaml: [ - 'name: orders', - 'table: public.orders', - 'grain:', - ' - order_id', - 'columns:', - ' - name: order_id', - ' type: string', - ' - name: status', - ' type: string', - '', - ].join('\n'), - }); - await project.fileStore.writeFile( - 'raw-sources/warehouse/live-database/sync-1/enrichment/relationship-profile.json', - `${JSON.stringify( - { - connectionId: 'warehouse', - driver: 'postgres', - sqlAvailable: true, - queryCount: 2, - tables: [], - columns: { - 'orders.status': { - table: { catalog: null, db: 'public', name: 'orders' }, - column: 'status', - nativeType: 'text', - normalizedType: 'string', - rowCount: 10, - nullCount: 0, - distinctCount: 2, - uniquenessRatio: 0.2, - nullRate: 0, - sampleValues: ['paid', 'refunded'], - minTextLength: 4, - maxTextLength: 8, - }, - }, - warnings: [], - }, - null, - 2, - )}\n`, - 'ktx', - 'ktx@example.com', - 'Seed dictionary profile', - ); - - const ports = createLocalProjectMcpContextPorts(project); - await expect(ports.semanticLayer?.listSources({ connectionId: 'warehouse', query: 'paid' })).resolves.toEqual({ - sources: [ - expect.objectContaining({ - connectionId: 'warehouse', - connectionName: 'warehouse', - name: 'orders', - score: expect.any(Number), - matchReasons: expect.arrayContaining(['dictionary']), - dictionaryMatches: [{ column: 'status', values: ['paid'] }], - }), - ], - totalSources: 1, - }); - }); - - it('returns historic SQL usage frequency and snippet through semantic-layer list search', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - await project.fileStore.writeFile( - 'semantic-layer/warehouse/_schema/public.yaml', - `tables: - orders: - table: public.orders - usage: - narrative: Analysts inspect paid order lifecycle by customer segment. - frequencyTier: high - commonFilters: - - status - commonGroupBys: - - customer_segment - commonJoins: - - table: public.customers - on: - - customer_id - columns: - - name: order_id - type: string - - name: status - type: string -`, - 'ktx', - 'ktx@example.com', - 'Seed usage-backed manifest shard', - ); - - const ports = createLocalProjectMcpContextPorts(project); - await expect( - ports.semanticLayer?.listSources({ connectionId: 'warehouse', query: 'paid order lifecycle' }), - ).resolves.toEqual({ - sources: [ - expect.objectContaining({ - connectionId: 'warehouse', - connectionName: 'warehouse', - name: 'orders', - frequencyTier: 'high', - snippet: expect.stringContaining(''), - score: expect.any(Number), - matchReasons: expect.arrayContaining(['lexical']), - }), - ], - totalSources: 1, - }); - }); - - it('uses configured local embeddings for semantic-layer search when available', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.ingest.embeddings = { backend: 'none', dimensions: 2 }; - await writeLocalSlSource(project, { - connectionId: 'warehouse', - sourceName: 'orders', - yaml: [ - 'name: orders', - 'descriptions:', - ' user: Revenue facts', - 'table: public.orders', - 'grain:', - ' - order_id', - 'columns:', - ' - name: order_id', - ' type: string', - '', - ].join('\n'), - }); - - const ports = createLocalProjectMcpContextPorts(project, { - embeddingService: { - maxBatchSize: 8, - async computeEmbedding(text: string) { - return text.includes('cash collection') ? [1, 0] : [0, 1]; - }, - async computeEmbeddingsBulk(texts: string[]) { - return texts.map((text) => (text.includes('Revenue facts') ? [1, 0] : [0, 1])); - }, - }, - }); - - const result = await ports.semanticLayer?.listSources({ connectionId: 'warehouse', query: 'cash collection' }); - - expect(result?.sources[0]).toMatchObject({ - name: 'orders', - matchReasons: expect.arrayContaining(['semantic']), - lanes: expect.arrayContaining([expect.objectContaining({ lane: 'semantic', status: 'available' })]), - }); }); it('rejects path traversal keys before touching the project directory', async () => { @@ -1109,36 +665,35 @@ describe('createLocalProjectMcpContextPorts', () => { ).rejects.toThrow('Unsafe semantic-layer source name'); }); - it('uses semantic compute for validation and compile-only sl_query when supplied', async () => { + it('uses semantic compute for compile-only sl_query when supplied', async () => { const project = await initKtxProject({ projectDir: tempDir }); project.config.connections.warehouse = { driver: 'postgres', url: 'env:DATABASE_URL', }; - const shapeOnlyPorts = createLocalProjectMcpContextPorts(project); - await shapeOnlyPorts.semanticLayer?.writeSource({ + await writeLocalSlSource(project, { connectionId: 'warehouse', sourceName: 'orders', - source: { - name: 'orders', - table: 'public.orders', - grain: ['id'], - columns: [ - { name: 'id', type: 'number' }, - { name: 'status', type: 'string' }, - ], - joins: [], - measures: [{ name: 'order_count', expr: 'count(*)' }], - }, + yaml: [ + 'name: orders', + 'table: public.orders', + 'grain:', + ' - id', + 'columns:', + ' - name: id', + ' type: number', + ' - name: status', + ' type: string', + 'joins: []', + 'measures:', + ' - name: order_count', + ' expr: count(*)', + '', + ].join('\n'), }); const semanticLayerCompute = { - validateSources: vi.fn(async () => ({ - valid: true, - errors: [], - warnings: ['python validation ran'], - perSourceWarnings: {}, - })), + validateSources: vi.fn(), query: vi.fn(async () => ({ sql: 'select status, count(*) as order_count from public.orders group by status', dialect: 'postgres', @@ -1149,29 +704,6 @@ describe('createLocalProjectMcpContextPorts', () => { }; const ports = createLocalProjectMcpContextPorts(project, { semanticLayerCompute }); - await expect(ports.semanticLayer?.validate({ connectionId: 'warehouse', names: ['orders'] })).resolves.toEqual({ - success: true, - errors: [], - warnings: ['python validation ran'], - }); - expect(semanticLayerCompute.validateSources).toHaveBeenCalledWith({ - sources: [ - { - name: 'orders', - table: 'public.orders', - grain: ['id'], - columns: [ - { name: 'id', type: 'number' }, - { name: 'status', type: 'string' }, - ], - joins: [], - measures: [{ name: 'order_count', expr: 'count(*)' }], - }, - ], - dialect: 'postgres', - recentlyTouched: ['orders'], - }); - await expect( ports.semanticLayer?.query({ connectionId: 'warehouse', @@ -1201,18 +733,23 @@ describe('createLocalProjectMcpContextPorts', () => { driver: 'postgres', url: 'env:DATABASE_URL', }; - const shapeOnlyPorts = createLocalProjectMcpContextPorts(project); - await shapeOnlyPorts.semanticLayer?.writeSource({ + await writeLocalSlSource(project, { connectionId: 'warehouse', sourceName: 'orders', - source: { - name: 'orders', - table: 'public.orders', - grain: ['id'], - columns: [{ name: 'id', type: 'number' }], - joins: [], - measures: [{ name: 'order_count', expr: 'count(*)' }], - }, + yaml: [ + 'name: orders', + 'table: public.orders', + 'grain:', + ' - id', + 'columns:', + ' - name: id', + ' type: number', + 'joins: []', + 'measures:', + ' - name: order_count', + ' expr: count(*)', + '', + ].join('\n'), }); const compute = { validateSources: vi.fn(), @@ -1252,378 +789,4 @@ describe('createLocalProjectMcpContextPorts', () => { }), ); }); - - it('exposes detailed local ingest trigger and status ports when local ingest is enabled', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections.warehouse = { driver: 'postgres' }; - project.config.ingest.adapters = ['fake']; - project.config.ingest.embeddings = { - backend: 'deterministic', - dimensions: 8, - batchSize: 64, - }; - project.config.llm = { - provider: { backend: 'none' }, - models: {}, - }; - const sourceDir = join(project.projectDir, 'upload'); - await mkdir(join(sourceDir, 'orders'), { recursive: true }); - await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); - - let nextJob = 0; - const agentRunner = new TestAgentRunner(); - const ports = createLocalProjectMcpContextPorts(project, { - localIngest: { - adapters: [new FakeSourceAdapter()], - jobIdFactory: () => `mcp-local-run-${++nextJob}`, - agentRunner, - }, - }); - - const firstTrigger = await ports.ingest?.trigger({ - adapter: 'fake', - connectionId: 'warehouse', - trigger: 'manual_resync', - config: { sourceDir }, - }); - - expect(firstTrigger).toMatchObject({ - runId: expect.any(String), - jobId: 'mcp-local-run-1', - reportId: expect.any(String), - }); - expect(firstTrigger?.runId).not.toBe('mcp-local-run-1'); - - await expect(ports.ingest?.status({ runId: 'mcp-local-run-1' })).resolves.toMatchObject({ - runId: firstTrigger?.runId, - jobId: 'mcp-local-run-1', - reportId: firstTrigger?.reportId, - status: 'done', - stage: 'done', - done: true, - progress: 1, - adapter: 'fake', - connectionId: 'warehouse', - sourceDir: null, - syncId: expect.stringContaining('mcp-local-run-1'), - startedAt: expect.any(String), - completedAt: expect.any(String), - previousRunId: null, - diffSummary: { - added: 1, - modified: 0, - deleted: 0, - unchanged: 0, - }, - rawFileCount: 1, - workUnitCount: 1, - workUnits: [ - { - unitKey: 'fake-orders', - rawFiles: ['orders/orders.json'], - peerFileIndex: [], - dependencyPaths: [], - }, - ], - evictionDeletedRawPaths: [], - errors: [], - }); - - const secondTrigger = await ports.ingest?.trigger({ - adapter: 'fake', - connectionId: 'warehouse', - trigger: 'manual_resync', - config: { sourceDir }, - }); - - expect(secondTrigger).toMatchObject({ - runId: expect.any(String), - jobId: 'mcp-local-run-2', - reportId: expect.any(String), - }); - expect(secondTrigger?.runId).not.toBe('mcp-local-run-2'); - - await expect(ports.ingest?.status({ runId: 'mcp-local-run-2' })).resolves.toMatchObject({ - runId: secondTrigger?.runId, - jobId: 'mcp-local-run-2', - reportId: secondTrigger?.reportId, - status: 'done', - stage: 'done', - done: true, - progress: 1, - adapter: 'fake', - connectionId: 'warehouse', - sourceDir: null, - syncId: expect.stringContaining('mcp-local-run-2'), - startedAt: expect.any(String), - completedAt: expect.any(String), - previousRunId: null, - diffSummary: { - added: 0, - modified: 0, - deleted: 0, - unchanged: 1, - }, - rawFileCount: 0, - workUnitCount: 0, - workUnits: [], - evictionDeletedRawPaths: [], - errors: [], - }); - expect(agentRunner.runLoop).toHaveBeenCalledTimes(1); - }); - - it('passes local ingest pull-config options into runLocalIngest', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections.warehouse = { driver: 'postgres' }; - project.config.ingest.adapters = ['looker']; - const runLocalIngest = vi.fn(async () => ({ - result: { ok: true }, - report: { - id: 'report-1', - runId: 'run-1', - jobId: 'job-1', - sourceKey: 'looker', - connectionId: 'warehouse', - body: { - syncId: 'sync-1', - workUnits: [], - failedWorkUnits: [], - diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 }, - provenanceRows: [], - }, - }, - }) as never); - const ports = createLocalProjectMcpContextPorts(project, { - localIngest: { - adapters: [ - { source: 'looker', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) }, - ], - pullConfigOptions: { - looker: { - daemonBaseUrl: 'http://127.0.0.1:61234', - }, - }, - runLocalIngest, - }, - }); - - await expect( - ports.ingest?.trigger({ - adapter: 'looker', - connectionId: 'warehouse', - trigger: 'manual_resync', - config: {}, - }), - ).resolves.toMatchObject({ - runId: 'run-1', - jobId: 'job-1', - reportId: 'report-1', - }); - - expect(runLocalIngest).toHaveBeenCalledWith( - expect.objectContaining({ - pullConfigOptions: { - looker: { - daemonBaseUrl: 'http://127.0.0.1:61234', - }, - }, - }), - ); - }); - - it('triggers fetch-capable local ingest without sourceDir config', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections.warehouse = { - driver: 'postgres', - url: 'postgres://localhost:5432/warehouse', - }; - project.config.ingest.adapters = ['live-database']; - project.config.llm = { - provider: { backend: 'none' }, - models: {}, - }; - const agentRunner = new TestAgentRunner(); - const ports = createLocalProjectMcpContextPorts(project, { - localIngest: { - adapters: [ - { - source: 'live-database', - skillNames: ['live_database_ingest'], - async fetch(_pullConfig, stagedDir) { - await mkdir(join(stagedDir, 'tables'), { recursive: true }); - await writeFile(join(stagedDir, 'connection.json'), '{"connectionId":"warehouse"}\n', 'utf-8'); - await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8'); - await writeFile( - join(stagedDir, 'tables', 'orders.json'), - '{"name":"orders","db":"public","columns":[]}\n', - 'utf-8', - ); - }, - async detect() { - return true; - }, - async chunk() { - return { - workUnits: [ - { - unitKey: 'live-database-public-orders', - rawFiles: ['tables/orders.json'], - dependencyPaths: ['connection.json', 'foreign-keys.json'], - peerFileIndex: [], - }, - ], - }; - }, - }, - ], - jobIdFactory: () => 'local-live-db-mcp', - agentRunner, - }, - }); - - const result = await ports.ingest?.trigger({ - adapter: 'live-database', - connectionId: 'warehouse', - trigger: 'manual_resync', - config: {}, - }); - - expect(result).toMatchObject({ - runId: expect.any(String), - jobId: 'local-live-db-mcp', - reportId: expect.any(String), - }); - expect(result?.runId).not.toBe('local-live-db-mcp'); - await expect(ports.ingest?.status({ runId: 'local-live-db-mcp' })).resolves.toMatchObject({ - runId: result?.runId, - jobId: 'local-live-db-mcp', - reportId: result?.reportId, - adapter: 'live-database', - sourceDir: null, - rawFileCount: 1, - workUnitCount: 1, - }); - expect(agentRunner.runLoop).toHaveBeenCalledTimes(1); - }); - - it('lists and reads only artifacts that belong to a local scan report', async () => { - const project = await initKtxProject({ projectDir: tempDir }); - project.config.connections.warehouse = { - driver: 'postgres', - url: 'env:DATABASE_URL', - }; - project.config.ingest.adapters = ['live-database']; - const ports = createLocalProjectMcpContextPorts(project, { - localScan: { - adapters: [ - { - source: 'live-database', - skillNames: ['live_database_ingest'], - async fetch(_pullConfig, stagedDir) { - await mkdir(join(stagedDir, 'tables'), { recursive: true }); - await writeFile(join(stagedDir, 'connection.json'), '{"connectionId":"warehouse"}\n', 'utf-8'); - await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8'); - await writeFile( - join(stagedDir, 'tables', 'orders.json'), - '{"name":"orders","db":"public","columns":[]}\n', - 'utf-8', - ); - }, - async detect() { - return true; - }, - async chunk() { - return { - workUnits: [ - { - unitKey: 'live-database-public-orders', - rawFiles: ['tables/orders.json'], - dependencyPaths: ['connection.json', 'foreign-keys.json'], - peerFileIndex: [], - }, - ], - }; - }, - }, - ], - jobIdFactory: () => 'local-scan-artifacts', - now: () => new Date('2026-04-29T12:00:00.000Z'), - }, - }); - - const trigger = await ports.scan?.trigger({ - connectionId: 'warehouse', - mode: 'structural', - detectRelationships: false, - dryRun: false, - }); - - expect(trigger?.runId).toBe('local-scan-artifacts'); - const syncId = '2026-04-29-120000-local-scan-artifacts'; - await expect(ports.scan?.listArtifacts?.({ runId: 'local-scan-artifacts' })).resolves.toEqual({ - runId: 'local-scan-artifacts', - artifacts: [ - { - path: `raw-sources/warehouse/live-database/${syncId}/connection.json`, - type: 'raw_source', - size: 29, - }, - { - path: `raw-sources/warehouse/live-database/${syncId}/foreign-keys.json`, - type: 'raw_source', - size: 19, - }, - { - path: `raw-sources/warehouse/live-database/${syncId}/scan-report.json`, - type: 'report', - size: expect.any(Number), - }, - { - path: `raw-sources/warehouse/live-database/${syncId}/tables/orders.json`, - type: 'raw_source', - size: 45, - }, - { - path: 'semantic-layer/warehouse/_schema/public.yaml', - type: 'manifest_shard', - size: expect.any(Number), - }, - ], - }); - - await expect( - ports.scan?.readArtifact?.({ - runId: 'local-scan-artifacts', - path: `raw-sources/warehouse/live-database/${syncId}/tables/orders.json`, - }), - ).resolves.toEqual({ - runId: 'local-scan-artifacts', - path: `raw-sources/warehouse/live-database/${syncId}/tables/orders.json`, - type: 'raw_source', - size: 45, - content: '{"name":"orders","db":"public","columns":[]}\n', - }); - - await expect( - ports.scan?.readArtifact?.({ - runId: 'local-scan-artifacts', - path: 'semantic-layer/warehouse/_schema/public.yaml', - }), - ).resolves.toMatchObject({ - runId: 'local-scan-artifacts', - path: 'semantic-layer/warehouse/_schema/public.yaml', - type: 'manifest_shard', - content: expect.stringContaining('orders:'), - }); - - await expect( - ports.scan?.readArtifact?.({ - runId: 'local-scan-artifacts', - path: 'ktx.yaml', - }), - ).resolves.toBeNull(); - await expect(ports.scan?.listArtifacts?.({ runId: 'missing' })).resolves.toBeNull(); - await expect(readFile(join(project.projectDir, 'ktx.yaml'), 'utf-8')).resolves.not.toContain('project:'); - }); }); diff --git a/packages/context/src/mcp/local-project-ports.ts b/packages/context/src/mcp/local-project-ports.ts index 8088f27a..45871326 100644 --- a/packages/context/src/mcp/local-project-ports.ts +++ b/packages/context/src/mcp/local-project-ports.ts @@ -1,65 +1,19 @@ -import YAML from 'yaml'; -import { - type KtxSqlQueryExecutorPort, - localConnectionInfoFromConfig, - localConnectionTypeForConfig, -} from '../connections/index.js'; +import { type KtxSqlQueryExecutorPort, localConnectionInfoFromConfig } from '../connections/index.js'; import type { KtxEmbeddingPort } from '../core/index.js'; import type { KtxSemanticLayerComputePort } from '../daemon/index.js'; -import { - createDefaultLocalIngestAdapters, - getLocalIngestStatus, - type IngestReportSnapshot, - ingestReportToMemoryFlowReplay, - type LocalIngestMcpOptions, - runLocalIngest, - runLocalMetabaseIngest, -} from '../ingest/index.js'; import { createLocalKtxEmbeddingProviderFromConfig, KtxIngestEmbeddingPortAdapter } from '../llm/index.js'; import type { KtxLocalProject } from '../project/index.js'; -import { - createKtxEntityDetailsService, - getLocalScanReport, - getLocalScanStatus, - type KtxConnectionDriver, - type KtxScanConnector, - type KtxScanReport, - type LocalScanMcpOptions, - runLocalScan, -} from '../scan/index.js'; +import { createKtxEntityDetailsService, type KtxScanConnector, type LocalScanMcpOptions } from '../scan/index.js'; import { createKtxDiscoverDataService } from '../search/index.js'; import type { SqlAnalysisDialect, SqlAnalysisPort } from '../sql-analysis/index.js'; -import { - compileLocalSlQuery, - createKtxDictionarySearchService, - type LocalSlSourceSearchResult, - type LocalSlSourceSummary, - listLocalSlSources, - searchLocalSlSources, - sourceDefinitionSchema, - sourceOverlaySchema, -} from '../sl/index.js'; -import { readLocalKnowledgePage, searchLocalKnowledgePages, writeLocalKnowledgePage } from '../wiki/local-knowledge.js'; -import type { - KtxConnectionTestResponse, - KtxIngestStatusResponse, - KtxMcpContextPorts, - KtxScanArtifactListResponse, - KtxScanArtifactReadResponse, - KtxScanArtifactSummary, - KtxScanArtifactType, - KtxSqlExecutionResponse, -} from './types.js'; - -const LOCAL_AUTHOR = 'ktx'; -const LOCAL_AUTHOR_EMAIL = 'ktx@example.com'; -const SL_SHAPE_WARNING = 'Local stdio validation checks YAML shape only; Python semantic validation is not configured.'; +import { compileLocalSlQuery, createKtxDictionarySearchService } from '../sl/index.js'; +import { readLocalKnowledgePage, searchLocalKnowledgePages } from '../wiki/local-knowledge.js'; +import type { KtxMcpContextPorts, KtxSqlExecutionResponse } from './types.js'; interface CreateLocalProjectMcpContextPortsOptions { semanticLayerCompute?: KtxSemanticLayerComputePort; queryExecutor?: KtxSqlQueryExecutorPort; sqlAnalysis?: SqlAnalysisPort; - localIngest?: LocalIngestMcpOptions; localScan?: LocalScanMcpOptions; embeddingService?: KtxEmbeddingPort | null; } @@ -115,279 +69,16 @@ function assertSafeSourceName(sourceName: string): string { return assertSafePathToken('semantic-layer source name', sourceName); } -function normalizeScanDriver(driver: string | undefined): KtxConnectionDriver { - const normalized = (driver ?? '').toLowerCase(); - if ( - normalized === 'postgres' || - normalized === 'postgresql' || - normalized === 'sqlite' || - normalized === 'sqlite3' || - normalized === 'mysql' || - normalized === 'clickhouse' || - normalized === 'sqlserver' || - normalized === 'bigquery' || - normalized === 'snowflake' - ) { - return normalized === 'sqlite3' ? 'sqlite' : normalized; - } - return 'postgres'; -} - async function cleanupConnector(connector: KtxScanConnector | null): Promise { if (connector?.cleanup) { await connector.cleanup(); } } -async function testLocalConnection( - project: KtxLocalProject, - options: CreateLocalProjectMcpContextPortsOptions, - connectionId: string, -): Promise { - const safeConnectionId = assertSafeConnectionId(connectionId); - const connection = project.config.connections[safeConnectionId]; - if (!connection) { - return null; - } - const connectionType = localConnectionTypeForConfig(safeConnectionId, connection); - const createConnector = options.localScan?.createConnector; - if (!createConnector) { - return { - id: safeConnectionId, - connectionType, - ok: true, - tableCount: null, - message: 'Connection is configured; no native scan connector is available for live testing.', - warnings: ['ktx serve was not configured with a local scan connector factory.'], - }; - } - - let connector: KtxScanConnector | null = null; - try { - connector = await createConnector(safeConnectionId); - const snapshot = await connector.introspect( - { - connectionId: safeConnectionId, - driver: normalizeScanDriver(connection.driver), - mode: 'structural', - dryRun: true, - detectRelationships: false, - }, - { runId: `connection-test-${safeConnectionId}` }, - ); - return { - id: safeConnectionId, - connectionType, - ok: true, - tableCount: snapshot.tables.length, - message: 'Connection test passed.', - warnings: [], - }; - } catch (error) { - return { - id: safeConnectionId, - connectionType, - ok: false, - tableCount: null, - message: error instanceof Error ? error.message : String(error), - warnings: [], - }; - } finally { - await cleanupConnector(connector); - } -} - -function scanArtifactType(path: string, report: KtxScanReport): KtxScanArtifactType { - if (path === report.artifactPaths.reportPath) { - return 'report'; - } - if (report.artifactPaths.manifestShards.includes(path)) { - return 'manifest_shard'; - } - if (report.artifactPaths.enrichmentArtifacts.includes(path)) { - return 'enrichment_artifact'; - } - return 'raw_source'; -} - -async function artifactSize(project: KtxLocalProject, path: string): Promise { - try { - const result = await project.fileStore.readFile(path); - return typeof result.size === 'number' ? result.size : undefined; - } catch { - return undefined; - } -} - -async function listArtifactsForReport( - project: KtxLocalProject, - runId: string, - report: KtxScanReport, -): Promise { - const paths = new Set(); - if (report.artifactPaths.rawSourcesDir) { - const listed = await project.fileStore.listFiles(report.artifactPaths.rawSourcesDir); - for (const file of listed.files) { - paths.add(file); - } - } - if (report.artifactPaths.reportPath) { - paths.add(report.artifactPaths.reportPath); - } - for (const path of report.artifactPaths.manifestShards) { - paths.add(path); - } - for (const path of report.artifactPaths.enrichmentArtifacts) { - paths.add(path); - } - - const artifacts: KtxScanArtifactSummary[] = []; - for (const path of [...paths].sort()) { - const size = await artifactSize(project, path); - artifacts.push({ - path, - type: scanArtifactType(path, report), - ...(size === undefined ? {} : { size }), - }); - } - return { runId, artifacts }; -} - -async function readScanArtifact( - project: KtxLocalProject, - runId: string, - path: string, -): Promise { - const report = await getLocalScanReport(project, runId); - if (!report) { - return null; - } - const listed = await listArtifactsForReport(project, runId, report); - const artifact = listed.artifacts.find((candidate) => candidate.path === path); - if (!artifact) { - return null; - } - const result = await project.fileStore.readFile(path); - return { - runId, - path, - type: artifact.type, - ...(typeof result.size === 'number' ? { size: result.size } : {}), - content: result.content, - }; -} - function slPath(connectionId: string, sourceName: string): string { return `semantic-layer/${assertSafeConnectionId(connectionId)}/${assertSafeSourceName(sourceName)}.yaml`; } -function sourceNameFromPath(path: string): string { - return ( - path - .split('/') - .at(-1) - ?.replace(/\.ya?ml$/, '') ?? path - ); -} - -function isRecord(value: unknown): value is Record { - return typeof value === 'object' && value !== null && !Array.isArray(value); -} - -function parseYamlRecord(raw: string): Record { - const parsed = YAML.parse(raw) as unknown; - if (!isRecord(parsed)) { - throw new Error('Semantic-layer source YAML must contain an object'); - } - return parsed; -} - -async function listSlPaths(project: KtxLocalProject, connectionId?: string): Promise { - const root = connectionId ? `semantic-layer/${assertSafeConnectionId(connectionId)}` : 'semantic-layer'; - const listed = await project.fileStore.listFiles(root); - return listed.files.filter((file) => file.endsWith('.yaml') || file.endsWith('.yml')).sort(); -} - -async function loadComputableSources( - project: KtxLocalProject, - connectionId: string, -): Promise[]> { - const paths = await listSlPaths(project, connectionId); - const sources: Record[] = []; - for (const path of paths) { - const raw = await project.fileStore.readFile(path); - const source = parseYamlRecord(raw.content); - if (source.table || source.sql) { - sources.push(source); - } - } - return sources; -} - -function validateSourceRecord(sourceName: string, source: Record): string[] { - const namedSource = { ...source, name: typeof source.name === 'string' ? source.name : sourceName }; - const definition = sourceDefinitionSchema.safeParse(namedSource); - if (definition.success) { - return []; - } - const overlay = sourceOverlaySchema.safeParse(namedSource); - if (overlay.success) { - return []; - } - return definition.error.issues.map((issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`); -} - -function localIngestSourceDir(config: unknown): string | undefined { - if (!isRecord(config) || config.sourceDir === undefined) { - return undefined; - } - if (typeof config.sourceDir !== 'string' || config.sourceDir.trim().length === 0) { - throw new Error('Local ingest config sourceDir must be a non-empty string when provided'); - } - return config.sourceDir; -} - -function rawFileCountFromIngestReport(report: IngestReportSnapshot): number { - return new Set(report.body.workUnits.flatMap((workUnit) => workUnit.rawFiles)).size; -} - -function hasSlSearchMetadata( - source: LocalSlSourceSummary | LocalSlSourceSearchResult, -): source is LocalSlSourceSearchResult { - return 'score' in source; -} - -function statusFromIngestReport(report: IngestReportSnapshot): KtxIngestStatusResponse { - const failedWorkUnits = report.body.failedWorkUnits; - return { - runId: report.runId, - jobId: report.jobId, - reportId: report.id, - status: failedWorkUnits.length > 0 ? 'error' : 'done', - stage: 'done', - progress: 1, - errors: failedWorkUnits, - done: true, - adapter: report.sourceKey, - connectionId: report.connectionId, - sourceDir: null, - syncId: report.body.syncId, - startedAt: report.createdAt, - completedAt: report.createdAt, - previousRunId: null, - diffSummary: report.body.diffSummary, - workUnitCount: report.body.workUnits.length, - rawFileCount: rawFileCountFromIngestReport(report), - workUnits: report.body.workUnits.map((workUnit) => ({ - unitKey: workUnit.unitKey, - rawFiles: [...workUnit.rawFiles], - peerFileIndex: [], - dependencyPaths: [], - })), - evictionDeletedRawPaths: [...report.body.evictionInputs], - }; -} - async function executeValidatedReadOnlySql( project: KtxLocalProject, options: CreateLocalProjectMcpContextPortsOptions, @@ -453,9 +144,6 @@ export function createLocalProjectMcpContextPorts( ) .sort((a, b) => a.id.localeCompare(b.id)); }, - async test(input) { - return testLocalConnection(project, options, input.connectionId); - }, }, knowledge: { async search(input) { @@ -495,58 +183,8 @@ export function createLocalProjectMcpContextPorts( } : null; }, - async write(input) { - const existing = await readLocalKnowledgePage(project, { - key: input.key, - userId: input.userId, - }); - await writeLocalKnowledgePage(project, { - key: input.key, - scope: 'GLOBAL', - userId: input.userId, - summary: input.summary, - content: input.content, - tags: input.tags, - refs: input.refs, - slRefs: input.slRefs, - source: input.source, - intent: input.intent, - tables: input.tables, - representativeSql: input.representativeSql, - usage: input.usage, - fingerprints: input.fingerprints, - }); - return { success: true, key: input.key, action: existing ? 'updated' : 'created' }; - }, }, semanticLayer: { - async listSources(input) { - const listed: Array = input.query - ? await searchLocalSlSources(project, { - connectionId: input.connectionId, - query: input.query, - embeddingService, - }) - : await listLocalSlSources(project, { connectionId: input.connectionId }); - const sources = listed.map((source) => ({ - connectionId: source.connectionId, - connectionName: source.connectionId, - name: source.name, - description: source.description, - columnCount: source.columnCount, - measureCount: source.measureCount, - joinCount: source.joinCount, - ...(hasSlSearchMetadata(source) && source.frequencyTier ? { frequencyTier: source.frequencyTier } : {}), - ...(hasSlSearchMetadata(source) && source.snippet ? { snippet: source.snippet } : {}), - ...(hasSlSearchMetadata(source) ? { score: source.score } : {}), - ...(hasSlSearchMetadata(source) && source.matchReasons ? { matchReasons: source.matchReasons } : {}), - ...(hasSlSearchMetadata(source) && source.dictionaryMatches - ? { dictionaryMatches: source.dictionaryMatches } - : {}), - ...(hasSlSearchMetadata(source) && source.lanes ? { lanes: source.lanes } : {}), - })); - return { sources, totalSources: sources.length }; - }, async readSource(input) { const path = slPath(input.connectionId, input.sourceName); try { @@ -556,71 +194,9 @@ export function createLocalProjectMcpContextPorts( return null; } }, - async writeSource(input) { - const path = slPath(input.connectionId, input.sourceName); - if (input.delete) { - const deleted = await project.fileStore.deleteFile( - path, - LOCAL_AUTHOR, - LOCAL_AUTHOR_EMAIL, - `Remove semantic-layer source: ${input.sourceName}`, - ); - return { success: Boolean(deleted), sourceName: input.sourceName }; - } - - const yaml = - input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0, version: '1.1' }); - parseYamlRecord(yaml); - await project.fileStore.writeFile( - path, - `${yaml.trimEnd()}\n`, - LOCAL_AUTHOR, - LOCAL_AUTHOR_EMAIL, - `Update semantic-layer source: ${input.sourceName}`, - ); - return { success: true, sourceName: input.sourceName, yaml: `${yaml.trimEnd()}\n` }; - }, - async validate(input) { - if (options.semanticLayerCompute) { - const connectionId = assertSafeConnectionId(input.connectionId); - const result = await options.semanticLayerCompute.validateSources({ - sources: await loadComputableSources(project, connectionId), - dialect: dialectForDriver(project.config.connections[connectionId]?.driver), - recentlyTouched: input.names, - }); - return { - success: result.valid, - errors: result.errors, - warnings: result.warnings, - }; - } - - const names = new Set(input.names ?? []); - const paths = await listSlPaths(project, input.connectionId); - const errors: string[] = []; - for (const path of paths) { - const sourceName = sourceNameFromPath(path); - if (names.size > 0 && !names.has(sourceName)) { - continue; - } - try { - const raw = await project.fileStore.readFile(path); - errors.push(...validateSourceRecord(sourceName, parseYamlRecord(raw.content))); - } catch (error) { - errors.push(`${sourceName}: ${error instanceof Error ? error.message : String(error)}`); - } - } - return { - success: errors.length === 0, - errors, - warnings: [SL_SHAPE_WARNING], - }; - }, async query(input) { if (!options.semanticLayerCompute) { - throw new Error( - 'sl_query requires a semantic-layer query adapter. Local stdio MCP exposes file-backed SL CRUD only.', - ); + throw new Error('sl_query requires a semantic-layer query adapter.'); } return compileLocalSlQuery(project, { connectionId: input.connectionId, @@ -657,111 +233,5 @@ export function createLocalProjectMcpContextPorts( }; } - if (options.localIngest) { - ports.ingest = { - async trigger(input) { - const sourceDir = localIngestSourceDir(input.config); - if (input.adapter === 'metabase' && !sourceDir) { - const result = await (options.localIngest?.runLocalMetabaseIngest ?? runLocalMetabaseIngest)({ - project, - adapters: options.localIngest?.adapters ?? createDefaultLocalIngestAdapters(project), - metabaseConnectionId: input.connectionId, - trigger: input.trigger, - jobIdFactory: options.localIngest?.jobIdFactory, - pullConfigOptions: options.localIngest?.pullConfigOptions, - agentRunner: options.localIngest?.agentRunner, - llmProvider: options.localIngest?.llmProvider, - memoryModel: options.localIngest?.memoryModel, - semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute, - queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor, - logger: options.localIngest?.logger, - }); - return { - runId: `metabase-fanout:${result.metabaseConnectionId}`, - jobId: undefined, - reportId: undefined, - fanout: { - status: result.status, - children: result.children.map((child) => ({ - runId: child.report.runId, - jobId: child.report.jobId, - reportId: child.report.id, - targetConnectionId: child.targetConnectionId, - metabaseDatabaseId: child.metabaseDatabaseId, - })), - }, - }; - } - - const executeLocalIngest = options.localIngest?.runLocalIngest ?? runLocalIngest; - const result = await executeLocalIngest({ - project, - adapters: options.localIngest?.adapters ?? createDefaultLocalIngestAdapters(project), - adapter: input.adapter, - connectionId: input.connectionId, - sourceDir, - pullConfigOptions: options.localIngest?.pullConfigOptions, - trigger: input.trigger, - jobId: options.localIngest?.jobIdFactory?.(), - agentRunner: options.localIngest?.agentRunner, - llmProvider: options.localIngest?.llmProvider, - memoryModel: options.localIngest?.memoryModel, - semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute, - queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor, - logger: options.localIngest?.logger, - }); - return { - runId: result.report.runId, - jobId: result.report.jobId, - reportId: result.report.id, - }; - }, - async status(input) { - const report = await getLocalIngestStatus(project, input.runId); - return report ? statusFromIngestReport(report) : null; - }, - async report(input) { - return getLocalIngestStatus(project, input.runId); - }, - async replay(input) { - const report = await getLocalIngestStatus(project, input.runId); - return report ? ingestReportToMemoryFlowReplay(report) : null; - }, - }; - } - - if (options.localScan) { - ports.scan = { - async trigger(input) { - return runLocalScan({ - project, - connectionId: input.connectionId, - mode: input.mode, - detectRelationships: input.detectRelationships, - dryRun: input.dryRun, - trigger: 'mcp', - adapters: options.localScan?.adapters, - databaseIntrospectionUrl: options.localScan?.databaseIntrospectionUrl, - createConnector: options.localScan?.createConnector, - jobId: options.localScan?.jobIdFactory?.(), - now: options.localScan?.now, - }); - }, - async status(input) { - return getLocalScanStatus(project, input.runId); - }, - async report(input) { - return getLocalScanReport(project, input.runId); - }, - async listArtifacts(input) { - const report = await getLocalScanReport(project, input.runId); - return report ? listArtifactsForReport(project, input.runId, report) : null; - }, - async readArtifact(input) { - return readScanArtifact(project, input.runId, input.path); - }, - }; - } - return ports; } diff --git a/packages/context/src/mcp/server.test.ts b/packages/context/src/mcp/server.test.ts index 608b637b..7b2719b2 100644 --- a/packages/context/src/mcp/server.test.ts +++ b/packages/context/src/mcp/server.test.ts @@ -475,21 +475,17 @@ describe('createKtxMcpServer', () => { }); it('runs MCP memory_ingest against a local project memory port', async () => { - const tempDir = await mkdtemp(join(tmpdir(), 'ktx-mcp-local-memory-')); - try { - const project = await initKtxProject({ projectDir: tempDir }); - let receivedInput: MemoryAgentInput | undefined; - const agentRunner = { - runLoop: async ({ - input, - toolSet, - }: { - input: MemoryAgentInput; - toolSet: Record Promise }>; - }) => { - receivedInput = input; - await toolSet.load_skill.execute({ name: 'wiki_capture' }); - await toolSet.wiki_write.execute( + const tempDir = await mkdtemp(join(tmpdir(), 'ktx-mcp-local-memory-')); + try { + const project = await initKtxProject({ projectDir: tempDir }); + const agentRunner = { + runLoop: async ({ + toolSet, + }: { + toolSet: Record Promise }>; + }) => { + await toolSet.load_skill.execute({ name: 'wiki_capture' }); + await toolSet.wiki_write.execute( { key: 'arr', summary: 'ARR definition', @@ -504,6 +500,7 @@ describe('createKtxMcpServer', () => { agentRunner: agentRunner as never, runIdFactory: () => 'memory-run-mcp', }); + const ingestSpy = vi.spyOn(memoryIngest, 'ingest'); const fake = makeFakeServer(); createKtxMcpServer({ @@ -520,7 +517,7 @@ describe('createKtxMcpServer', () => { structuredContent: { runId: 'memory-run-mcp' }, }); await memoryIngest.waitForRun('memory-run-mcp'); - expect(receivedInput).toMatchObject({ + expect(ingestSpy).toHaveBeenCalledWith({ userId: 'local', chatId: expect.stringMatching(/^mcp-/), userMessage: 'Ingest external knowledge into KTX memory.',