feat(mcp):added MCP server (#97)

* docs(specs): design research-agent MCP tools and ktx mcp daemon

Adds the 2026-05-14 design spec for exposing four new MCP tools
(discover_data, entity_details, dictionary_search, sql_execution),
shipping a ktx-research skill, and introducing an HTTP-only ktx mcp
daemon so external agents can use KTX as a research-capable context
layer.

* Refine research-agent MCP tools spec after adversarial review iteration 1

* Refine research-agent MCP tools spec after adversarial review iteration 2

* Refine research-agent MCP tools spec after adversarial review iteration 3

* Refine spec: drop connectionName compat carve-out and ground summary/snippet provenance per kind

* feat(daemon): validate read-only SQL with sqlglot

* feat(context): expose read-only SQL validation port

* feat(context): register MCP sql execution tool

* feat(context): execute MCP SQL through validated connector path

* test(context): update SQL analysis port fixtures

* docs: add research-agent MCP sql execution foundation plan

* feat(context): add scan-backed entity details service

* feat(context): register MCP entity details tool

* feat(context): expose local MCP entity details

* test(context): align entity details scan fixtures

* docs: add research-agent MCP entity_details plan

* feat(context): add dictionary search service

* feat(context): register MCP dictionary search tool

* feat(context): expose local MCP dictionary search

* docs: add research-agent MCP dictionary_search plan

* feat: add MCP discover data service

* feat: expose discover data MCP tool

* feat: wire local discover data MCP port

* docs: add research-agent MCP discover_data plan

* feat(cli): add mcp http security helpers

* feat(cli): host mcp over streamable http

* feat(cli): manage mcp daemon lifecycle

* feat(cli): add ktx mcp commands

* fix(cli): stabilize mcp daemon verification

* docs: add research-agent MCP http daemon plan

* feat(cli): install KTX research skill

* feat(cli): configure MCP clients in setup agents

* feat(cli): support Claude local MCP setup scope

* docs: add research-agent MCP setup-agents plan

* refactor(context): use connectionId in warehouse verification tools

* docs(context): update ingest verification prompts for connectionId

* docs: add research-agent MCP ingest contract convergence plan

* chore: build runtime artifacts in conductor setup

---------

Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
Andrey Avtomonov 2026-05-15 02:35:09 +02:00 committed by GitHub
parent c7b64379bf
commit b759a4a286
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
78 changed files with 13689 additions and 190 deletions

View file

@ -18,6 +18,9 @@ const sqlAnalysis: SqlAnalysisPort = {
async analyzeBatch() {
return new Map();
},
async validateReadOnly() {
return { ok: true };
},
};
const reader: HistoricSqlReader = {
@ -79,6 +82,9 @@ describe('HistoricSqlSourceAdapter', () => {
],
]);
},
async validateReadOnly() {
return { ok: true };
},
};
const adapter = new HistoricSqlSourceAdapter({
sqlAnalysis: batchSqlAnalysis,

View file

@ -159,6 +159,7 @@ function acceptanceSqlAnalysis(): SqlAnalysisPort {
);
},
),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
}

View file

@ -83,6 +83,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
],
['bad-parse', { tablesTouched: [], columnsByClause: {}, error: 'parse failed' }],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
@ -207,6 +208,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
},
],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
@ -283,6 +285,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
},
],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({
@ -403,6 +406,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
},
],
])),
validateReadOnly: vi.fn(async () => ({ ok: true })),
};
await stageHistoricSqlAggregatedSnapshot({

View file

@ -94,11 +94,15 @@ describe('ingest runtime assets', () => {
it('packages identifier verification prompt assets', async () => {
const shared = await readFile(join(skillsDir, '_shared', 'identifier-verification.md'), 'utf-8');
const legacyConnectionPrefix = ['connection', 'Name'].join('');
expect(shared).toContain('## Identifier Verification Protocol');
expect(shared).toContain('discover_data');
expect(shared).toContain('entity_details');
expect(shared).toContain('sql_execution');
expect(shared).toContain('sql_execution({connectionName, sql: "SELECT DISTINCT');
expect(shared).toContain('sql_execution({connectionName, sql: "SELECT 1 FROM');
expect(shared).toContain('sql_execution({connectionId, sql: "SELECT DISTINCT');
expect(shared).toContain('sql_execution({connectionId, sql: "SELECT 1 FROM');
expect(shared).not.toContain(`entity_details({${legacyConnectionPrefix}`);
expect(shared).not.toContain(`sql_execution({${legacyConnectionPrefix}`);
});
});

View file

@ -97,6 +97,9 @@ describe('local ingest adapters', () => {
async analyzeBatch() {
return new Map();
},
async validateReadOnly() {
return { ok: true };
},
};
const adapters = createDefaultLocalIngestAdapters(project, {
historicSql: {
@ -140,6 +143,9 @@ describe('local ingest adapters', () => {
async analyzeBatch() {
return new Map();
},
async validateReadOnly() {
return { ok: true };
},
},
reader,
queryClient,
@ -166,6 +172,9 @@ describe('local ingest adapters', () => {
async analyzeBatch() {
return new Map();
},
async validateReadOnly() {
return { ok: true };
},
},
postgresQueryClient: {
async executeQuery() {
@ -258,6 +267,9 @@ describe('local ingest adapters', () => {
async analyzeBatch() {
return new Map();
},
async validateReadOnly() {
return { ok: true };
},
},
postgresQueryClient: {
async executeQuery() {

View file

@ -1,7 +1,7 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { WarehouseCatalogService } from '../../../scan/warehouse-catalog.js';
import type { BaseTool, ToolContext } from '../../../tools/index.js';
import { DiscoverDataTool } from './discover-data.tool.js';
import type { WarehouseCatalogService } from './warehouse-catalog.service.js';
describe('DiscoverDataTool', () => {
const wikiSearchTool = { call: vi.fn() } as unknown as BaseTool & { call: ReturnType<typeof vi.fn> };
@ -36,7 +36,7 @@ describe('DiscoverDataTool', () => {
catalog.searchByName.mockResolvedValue([
{
kind: 'table',
connectionName: 'warehouse',
connectionId: 'warehouse',
ref: { catalog: null, db: 'public', name: 'orders' },
display: 'public.orders',
matchedOn: 'name',
@ -45,28 +45,28 @@ describe('DiscoverDataTool', () => {
});
it('groups wiki, semantic layer, and raw schema hits with routing hints', async () => {
const result = await tool.call({ query: 'orders', connectionName: 'warehouse', limit: 5 }, context);
const result = await tool.call({ query: 'orders', connectionId: 'warehouse', limit: 5 }, context);
expect(result.markdown).toContain('## Wiki Pages');
expect(result.markdown).toContain('use `wiki_read(blockKey)` for full content');
expect(result.markdown).toContain('## Semantic Layer Sources');
expect(result.markdown).toContain('use `sl_read_source(sourceName)` for the YAML');
expect(result.markdown).toContain('## Raw Warehouse Schema');
expect(result.markdown).toContain('use `entity_details({connectionName, targets: [{display}]})`');
expect(result.markdown).toContain('use `entity_details({connectionId, targets: [{display}]})`');
expect(result.structured.raw?.hits).toHaveLength(1);
});
it('includes connectionName on raw schema hits so entity_details can follow up', async () => {
it('includes connectionId on raw schema hits so entity_details can follow up', async () => {
const multiConnectionContext: ToolContext = {
...context,
session: { allowedConnectionNames: new Set(['warehouse', 'analytics']) } as any,
};
catalog.searchByName.mockImplementation(async (connectionName: string, query: string) => [
catalog.searchByName.mockImplementation(async (connectionId: string, query: string) => [
{
kind: 'table',
connectionName,
ref: { catalog: null, db: 'public', name: `${connectionName}_${query}` },
display: `public.${connectionName}_${query}`,
connectionId,
ref: { catalog: null, db: 'public', name: `${connectionId}_${query}` },
display: `public.${connectionId}_${query}`,
matchedOn: 'name',
},
]);
@ -75,16 +75,16 @@ describe('DiscoverDataTool', () => {
expect(catalog.searchByName).toHaveBeenCalledWith('analytics', 'orders', 10);
expect(catalog.searchByName).toHaveBeenCalledWith('warehouse', 'orders', 10);
expect(result.markdown).toContain('connectionName=analytics');
expect(result.markdown).toContain('connectionName=warehouse');
expect(result.markdown).toContain('connectionId=analytics');
expect(result.markdown).toContain('connectionId=warehouse');
expect(result.markdown).toContain(
'entity_details({connectionName: "analytics", targets: [{display: "public.analytics_orders"}]})',
'entity_details({connectionId: "analytics", targets: [{display: "public.analytics_orders"}]})',
);
expect(result.structured.raw?.hits.map((hit) => hit.connectionName)).toEqual(['analytics', 'warehouse']);
expect(result.structured.raw?.hits.map((hit) => hit.connectionId)).toEqual(['analytics', 'warehouse']);
});
it('refuses explicit out-of-scope connection names', async () => {
const result = await tool.call({ query: 'orders', connectionName: 'billing' }, context);
const result = await tool.call({ query: 'orders', connectionId: 'billing' }, context);
expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.');
expect(result.structured).toEqual({ wiki: null, sl: null, raw: null });
@ -99,7 +99,7 @@ describe('DiscoverDataTool', () => {
structured: { sourceName: 'orders' },
});
const result = await tool.call({ sourceName: 'orders', connectionName: 'warehouse' }, context);
const result = await tool.call({ sourceName: 'orders', connectionId: 'warehouse' }, context);
expect(slDiscoverTool.call).toHaveBeenCalledWith({ sourceName: 'orders', connectionId: 'warehouse' }, context);
expect(wikiSearchTool.call).not.toHaveBeenCalled();
@ -112,8 +112,20 @@ describe('DiscoverDataTool', () => {
slDiscoverTool.call.mockResolvedValueOnce({ markdown: '', structured: { totalSources: 0, sources: [] } });
catalog.searchByName.mockResolvedValueOnce([]);
const result = await tool.call({ query: 'customer source', connectionName: 'warehouse' }, context);
const result = await tool.call({ query: 'customer source', connectionId: 'warehouse' }, context);
expect(result.markdown).toContain('No matches for "customer source" across wiki, semantic layer, or raw warehouse schema.');
});
it('uses connectionId as the optional connection filter', () => {
const legacyConnectionField = ['connection', 'Name'].join('');
expect(tool.parseInput({ query: 'orders', connectionId: 'warehouse', limit: 5 })).toEqual({
query: 'orders',
connectionId: 'warehouse',
limit: 5,
});
expect(() => tool.parseInput({ query: 'orders', [legacyConnectionField]: 'warehouse', limit: 5 })).toThrow();
});
});

View file

@ -1,13 +1,13 @@
import { z } from 'zod';
import { WarehouseCatalogService, type RawSchemaHit } from '../../../scan/warehouse-catalog.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
import { WarehouseCatalogService, type RawSchemaHit } from './warehouse-catalog.service.js';
const discoverDataInputSchema = z.object({
query: z.string().optional(),
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/).optional(),
connectionId: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/).optional(),
limit: z.number().int().positive().max(50).optional().default(10),
sourceName: z.string().optional(),
});
}).strict();
type DiscoverDataInput = z.input<typeof discoverDataInputSchema>;
@ -62,16 +62,16 @@ export class DiscoverDataTool extends BaseTool<typeof discoverDataInputSchema> {
async call(input: DiscoverDataInput, context: ToolContext): Promise<ToolOutput<DiscoverDataStructured>> {
const allowed = allowedConnectionNames(context);
if (input.connectionName && allowed && !allowed.has(input.connectionName)) {
if (input.connectionId && allowed && !allowed.has(input.connectionId)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
markdown: `Connection "${input.connectionId}" is not available to this ingest stage.`,
structured: { wiki: null, sl: null, raw: null },
};
}
if (input.sourceName) {
const sl = await this.deps.slDiscoverTool.call(
{ sourceName: input.sourceName, connectionId: input.connectionName },
{ sourceName: input.sourceName, connectionId: input.connectionId },
context,
);
return { markdown: sl.markdown, structured: { wiki: null, sl: sl.structured, raw: null } };
@ -93,7 +93,7 @@ export class DiscoverDataTool extends BaseTool<typeof discoverDataInputSchema> {
}
const slResult = await this.deps.slDiscoverTool.call(
{ query: query || undefined, connectionId: input.connectionName },
{ query: query || undefined, connectionId: input.connectionId },
context,
);
if (totalSources(slResult.structured) > 0) {
@ -107,23 +107,23 @@ export class DiscoverDataTool extends BaseTool<typeof discoverDataInputSchema> {
}
const catalog = this.deps.catalogFactory(context);
const connections = input.connectionName ? [input.connectionName] : [...(allowed ?? [])].sort();
const connections = input.connectionId ? [input.connectionId] : [...(allowed ?? [])].sort();
const rawHits: RawSchemaHit[] = [];
for (const connectionName of connections) {
rawHits.push(...(await catalog.searchByName(connectionName, query, limit)));
for (const connectionId of connections) {
rawHits.push(...(await catalog.searchByName(connectionId, query, limit)));
}
if (rawHits.length > 0) {
parts.push(
'## Raw Warehouse Schema',
'> use `entity_details({connectionName, targets: [{display}]})` for full DDL + sample values',
'> use `entity_details({connectionId, targets: [{display}]})` for full DDL + sample values',
);
parts.push(
rawHits
.slice(0, limit)
.map(
(hit) =>
`- ${hit.kind}: ${hit.display} [connectionName=${hit.connectionName}] (matched on ${hit.matchedOn}) - ` +
`follow up with \`entity_details({connectionName: "${hit.connectionName}", targets: [{display: "${hit.display}"}]})\``,
`- ${hit.kind}: ${hit.display} [connectionId=${hit.connectionId}] (matched on ${hit.matchedOn}) - ` +
`follow up with \`entity_details({connectionId: "${hit.connectionId}", targets: [{display: "${hit.display}"}]})\``,
)
.join('\n'),
);

View file

@ -3,9 +3,9 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
import { WarehouseCatalogService } from '../../../scan/warehouse-catalog.js';
import type { ToolContext } from '../../../tools/index.js';
import { EntityDetailsTool } from './entity-details.tool.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
describe('EntityDetailsTool', () => {
let tempDir: string;
@ -32,11 +32,11 @@ describe('EntityDetailsTool', () => {
await rm(tempDir, { recursive: true, force: true });
});
async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-1') {
const root = `raw-sources/${connectionName}/live-database/${syncId}`;
async function seedLiveDatabaseScan(connectionId = 'warehouse', syncId = 'sync-1') {
const root = `raw-sources/${connectionId}/live-database/${syncId}`;
await project.fileStore.writeFile(
`${root}/connection.json`,
JSON.stringify({ connectionId: connectionName, driver: 'postgres', extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
JSON.stringify({ connectionId, driver: 'postgres', extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
'ktx',
'ktx@example.com',
'seed connection',
@ -84,7 +84,7 @@ describe('EntityDetailsTool', () => {
`${root}/enrichment/relationship-profile.json`,
JSON.stringify(
{
connectionId: connectionName,
connectionId,
driver: 'postgres',
tables: [{ table: { catalog: null, db: 'public', name: 'orders' }, rowCount: 12 }],
columns: {
@ -109,7 +109,7 @@ describe('EntityDetailsTool', () => {
}
it('returns scoped table detail for a display target', async () => {
const result = await tool.call({ connectionName: 'warehouse', targets: [{ display: 'public.orders' }] }, context);
const result = await tool.call({ connectionId: 'warehouse', targets: [{ display: 'public.orders' }] }, context);
expect(result.markdown).toContain('### public.orders');
expect(result.markdown).toContain('- status (text, nullable=false)');
@ -120,7 +120,7 @@ describe('EntityDetailsTool', () => {
it('resolves display targets that include a column name', async () => {
const result = await tool.call(
{ connectionName: 'warehouse', targets: [{ display: 'public.orders.status' }] },
{ connectionId: 'warehouse', targets: [{ display: 'public.orders.status' }] },
context,
);
@ -133,7 +133,7 @@ describe('EntityDetailsTool', () => {
it('reports missing explicit columns instead of returning an empty column list', async () => {
const result = await tool.call(
{ connectionName: 'warehouse', targets: [{ display: 'public.orders.plan_tier' }] },
{ connectionId: 'warehouse', targets: [{ display: 'public.orders.plan_tier' }] },
context,
);
@ -146,7 +146,7 @@ describe('EntityDetailsTool', () => {
it('reports missing structured table targets in model-visible markdown', async () => {
const result = await tool.call(
{
connectionName: 'warehouse',
connectionId: 'warehouse',
targets: [{ catalog: null, db: 'public', name: 'orderz' }],
},
context,
@ -161,7 +161,7 @@ describe('EntityDetailsTool', () => {
it('reports missing structured column targets in model-visible markdown', async () => {
const result = await tool.call(
{
connectionName: 'warehouse',
connectionId: 'warehouse',
targets: [{ catalog: null, db: 'public', name: 'orders', column: 'plan_tier' }],
},
context,
@ -175,7 +175,7 @@ describe('EntityDetailsTool', () => {
it('returns a no-scan state distinct from not found', async () => {
const result = await tool.call(
{ connectionName: 'empty', targets: [{ display: 'public.orders' }] },
{ connectionId: 'empty', targets: [{ display: 'public.orders' }] },
{ ...context, session: { ...context.session!, allowedConnectionNames: new Set(['empty']) } },
);
@ -184,9 +184,30 @@ describe('EntityDetailsTool', () => {
});
it('refuses out-of-scope connections', async () => {
const result = await tool.call({ connectionName: 'billing', targets: [{ display: 'public.orders' }] }, context);
const result = await tool.call({ connectionId: 'billing', targets: [{ display: 'public.orders' }] }, context);
expect(result.markdown).toContain('Connection "billing" is not available to this ingest stage.');
expect(result.structured.scanAvailable).toBe(false);
});
it('uses connectionId as the public input field', async () => {
const legacyConnectionField = ['connection', 'Name'].join('');
expect(
tool.parseInput({
connectionId: 'warehouse',
targets: [{ display: 'public.orders' }],
}),
).toEqual({
connectionId: 'warehouse',
targets: [{ display: 'public.orders' }],
});
expect(() =>
tool.parseInput({
[legacyConnectionField]: 'warehouse',
targets: [{ display: 'public.orders' }],
}),
).toThrow();
});
});

View file

@ -1,7 +1,7 @@
import { z } from 'zod';
import type { KtxTableRef } from '../../../scan/types.js';
import { WarehouseCatalogService, type TableDetail } from '../../../scan/warehouse-catalog.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
import { WarehouseCatalogService, type TableDetail } from './warehouse-catalog.service.js';
const targetSchema = z.union([
z.object({ display: z.string().min(1) }),
@ -14,9 +14,9 @@ const targetSchema = z.union([
]);
const entityDetailsInputSchema = z.object({
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
connectionId: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
targets: z.array(targetSchema).min(1).max(50),
});
}).strict();
type EntityDetailsInput = z.infer<typeof entityDetailsInputSchema>;
type EntityDetailsTarget = EntityDetailsInput['targets'][number];
@ -47,14 +47,14 @@ function appendMissingTargetMarkdown(parts: string[], target: EntityDetailsTarge
async function resolveTarget(
catalog: WarehouseCatalogService,
connectionName: string,
connectionId: string,
target: EntityDetailsTarget,
): Promise<{ resolved: (KtxTableRef & { column?: string }) | null; candidates: KtxTableRef[] }> {
if ('display' in target) {
return catalog.resolveDisplayTarget(connectionName, target.display);
return catalog.resolveDisplayTarget(connectionId, target.display);
}
const candidateResolution = await catalog.resolveDisplayTarget(connectionName, targetLabel(target));
const candidateResolution = await catalog.resolveDisplayTarget(connectionId, targetLabel(target));
return {
resolved: {
catalog: target.catalog,
@ -107,18 +107,18 @@ export class EntityDetailsTool extends BaseTool<typeof entityDetailsInputSchema>
async call(input: EntityDetailsInput, context: ToolContext): Promise<ToolOutput<EntityDetailsStructured>> {
const allowed = allowedConnectionNames(context);
if (allowed && !allowed.has(input.connectionName)) {
if (allowed && !allowed.has(input.connectionId)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
markdown: `Connection "${input.connectionId}" is not available to this ingest stage.`,
structured: { resolved: [], missing: [], scanAvailable: false },
};
}
const catalog = this.catalogFactory(context);
const scanAvailable = await catalog.hasScan(input.connectionName);
const scanAvailable = await catalog.hasScan(input.connectionId);
if (!scanAvailable) {
return {
markdown: `No live-database scan available for connection "${input.connectionName}"; run \`ktx scan\` first.`,
markdown: `No live-database scan available for connection "${input.connectionId}"; run \`ktx scan\` first.`,
structured: { resolved: [], missing: [], scanAvailable: false },
};
}
@ -128,13 +128,13 @@ export class EntityDetailsTool extends BaseTool<typeof entityDetailsInputSchema>
const missing: EntityDetailsStructured['missing'] = [];
for (const target of input.targets) {
const resolution = await resolveTarget(catalog, input.connectionName, target);
const resolution = await resolveTarget(catalog, input.connectionId, target);
if (!resolution.resolved) {
missing.push({ target, candidates: resolution.candidates });
appendMissingTargetMarkdown(parts, target, resolution.candidates);
continue;
}
const detail = await catalog.getTable({ connectionName: input.connectionName, ...resolution.resolved });
const detail = await catalog.getTable({ connectionId: input.connectionId, ...resolution.resolved });
if (!detail) {
missing.push({ target, candidates: resolution.candidates });
appendMissingTargetMarkdown(parts, target, resolution.candidates);

View file

@ -1,10 +1,10 @@
import type { KtxFileStorePort } from '../../../core/index.js';
import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { WarehouseCatalogService } from '../../../scan/warehouse-catalog.js';
import type { BaseTool, ToolContext } from '../../../tools/index.js';
import { DiscoverDataTool } from './discover-data.tool.js';
import { EntityDetailsTool } from './entity-details.tool.js';
import { SqlExecutionTool } from './sql-execution.tool.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
export function createWarehouseVerificationTools(deps: {
connections: SlConnectionCatalogPort;

View file

@ -19,7 +19,7 @@ describe('SqlExecutionTool', () => {
connections.executeQuery.mockResolvedValue({ headers: ['status'], rows: [['paid']], totalRows: 1 });
const result = await tool.call(
{ connectionName: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
{ connectionId: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
context,
);
@ -34,7 +34,7 @@ describe('SqlExecutionTool', () => {
it.each(['insert into x values (1)', 'drop table x', 'vacuum'])('rejects mutating SQL: %s', async (sql) => {
connections.executeQuery.mockClear();
const result = await tool.call({ connectionName: 'warehouse', sql }, context);
const result = await tool.call({ connectionId: 'warehouse', sql }, context);
expect(result.markdown).toContain('Only read-only SELECT/WITH queries can be executed locally.');
expect(connections.executeQuery).not.toHaveBeenCalled();
@ -44,11 +44,35 @@ describe('SqlExecutionTool', () => {
connections.executeQuery.mockRejectedValue(new Error('relation "orbit_analytics.customer" does not exist'));
const result = await tool.call(
{ connectionName: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
{ connectionId: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
context,
);
expect(result.markdown).toContain('relation "orbit_analytics.customer" does not exist');
expect(result.structured.error).toContain('relation "orbit_analytics.customer" does not exist');
});
it('uses connectionId as the public input field', () => {
const legacyConnectionField = ['connection', 'Name'].join('');
expect(
tool.parseInput({
connectionId: 'warehouse',
sql: 'select 1',
rowLimit: 5,
}),
).toEqual({
connectionId: 'warehouse',
sql: 'select 1',
rowLimit: 5,
});
expect(() =>
tool.parseInput({
[legacyConnectionField]: 'warehouse',
sql: 'select 1',
rowLimit: 5,
}),
).toThrow();
});
});

View file

@ -4,10 +4,10 @@ import type { SlConnectionCatalogPort } from '../../../sl/index.js';
import { BaseTool, type ToolContext, type ToolOutput } from '../../../tools/index.js';
const sqlExecutionInputSchema = z.object({
connectionName: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
connectionId: z.string().regex(/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/),
sql: z.string().min(1),
rowLimit: z.number().int().positive().max(1000).optional().default(100),
});
}).strict();
type SqlExecutionInput = z.input<typeof sqlExecutionInputSchema>;
@ -54,9 +54,9 @@ export class SqlExecutionTool extends BaseTool<typeof sqlExecutionInputSchema> {
async call(input: SqlExecutionInput, context: ToolContext): Promise<ToolOutput<SqlExecutionStructured>> {
const allowed = context.session?.allowedConnectionNames;
if (allowed && !allowed.has(input.connectionName)) {
if (allowed && !allowed.has(input.connectionId)) {
return {
markdown: `Connection "${input.connectionName}" is not available to this ingest stage.`,
markdown: `Connection "${input.connectionId}" is not available to this ingest stage.`,
structured: {
headers: [],
rows: [],
@ -83,7 +83,7 @@ export class SqlExecutionTool extends BaseTool<typeof sqlExecutionInputSchema> {
}
try {
const result = await this.connections.executeQuery(input.connectionName, wrappedSql);
const result = await this.connections.executeQuery(input.connectionId, wrappedSql);
const headers = result.headers ?? [];
const rows = result.rows ?? [];
const rowCount = result.totalRows ?? rows.length;

View file

@ -1,196 +0,0 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../project/index.js';
import { WarehouseCatalogService } from './warehouse-catalog.service.js';
describe('WarehouseCatalogService', () => {
let tempDir: string;
let project: KtxLocalProject;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-warehouse-catalog-'));
project = await initKtxProject({ projectDir: join(tempDir, 'project') });
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
async function seedLiveDatabaseScan(connectionName = 'warehouse', syncId = 'sync-2', driver = 'postgres') {
const root = `raw-sources/${connectionName}/live-database/${syncId}`;
const tableRef = {
catalog: driver === 'bigquery' ? 'analytics' : null,
db: driver === 'sqlite' ? null : 'public',
name: 'orders',
};
await project.fileStore.writeFile(
`${root}/connection.json`,
JSON.stringify({ connectionId: connectionName, driver, extractedAt: '2026-05-12T00:00:00.000Z' }, null, 2),
'ktx',
'ktx@example.com',
'seed connection',
);
await project.fileStore.writeFile(
`${root}/tables/orders.json`,
JSON.stringify(
{
catalog: tableRef.catalog,
db: tableRef.db,
name: tableRef.name,
kind: 'table',
comment: 'Customer orders',
estimatedRows: 12,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: 'Order id',
},
{
name: 'status',
nativeType: 'text',
normalizedType: 'text',
dimensionType: 'string',
nullable: false,
primaryKey: false,
comment: 'Order status',
},
],
foreignKeys: [],
},
null,
2,
),
'ktx',
'ktx@example.com',
'seed orders',
);
await project.fileStore.writeFile(
`${root}/enrichment/relationship-profile.json`,
JSON.stringify(
{
connectionId: connectionName,
driver,
sqlAvailable: true,
queryCount: 3,
tables: [{ table: { catalog: tableRef.catalog, db: tableRef.db, name: tableRef.name }, rowCount: 12 }],
columns: {
'orders.status': {
table: { catalog: tableRef.catalog, db: tableRef.db, name: tableRef.name },
column: 'status',
nativeType: 'text',
normalizedType: 'text',
rowCount: 12,
nullCount: 0,
distinctCount: 2,
uniquenessRatio: 0.1667,
nullRate: 0,
sampleValues: ['paid', 'refunded'],
minTextLength: 4,
maxTextLength: 8,
},
},
warnings: [],
},
null,
2,
),
'ktx',
'ktx@example.com',
'seed profile',
);
}
it('finds the latest sync and merges table schema with relationship profile values', async () => {
await seedLiveDatabaseScan('warehouse', 'sync-1');
await seedLiveDatabaseScan('warehouse', 'sync-2');
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.getLatestSyncId('warehouse')).resolves.toBe('sync-2');
const detail = await catalog.getTable({ connectionName: 'warehouse', catalog: null, db: 'public', name: 'orders' });
expect(detail).toMatchObject({
connectionName: 'warehouse',
display: 'public.orders',
rowCount: 12,
columns: [
{ name: 'id', nativeType: 'integer', primaryKey: true },
{ name: 'status', nativeType: 'text', sampleValues: ['paid', 'refunded'], distinctCount: 2 },
],
});
});
it('returns scanAvailable=false when no live-database scan exists', async () => {
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.getTable({ connectionName: 'missing', catalog: null, db: 'public', name: 'orders' })).resolves.toBeNull();
await expect(catalog.hasScan('missing')).resolves.toBe(false);
});
it('resolves postgres display strings and returns closest candidates for missing tables', async () => {
await seedLiveDatabaseScan();
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
resolved: { catalog: null, db: 'public', name: 'orders' },
candidates: [],
dialect: 'postgres',
});
await expect(catalog.resolveDisplay('warehouse', 'public.orderz')).resolves.toMatchObject({
resolved: null,
candidates: [{ name: 'orders' }],
});
});
it('treats two-part BigQuery identifiers as ambiguous instead of guessing', async () => {
await seedLiveDatabaseScan('warehouse', 'sync-bigquery', 'bigquery');
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.resolveDisplay('warehouse', 'public.orders')).resolves.toMatchObject({
resolved: null,
dialect: 'bigquery',
});
});
it('resolves postgres column display strings without treating the column as a table', async () => {
await seedLiveDatabaseScan();
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.resolveDisplayTarget('warehouse', 'public.orders.status')).resolves.toMatchObject({
resolved: { catalog: null, db: 'public', name: 'orders', column: 'status' },
candidates: [],
dialect: 'postgres',
});
});
it('resolves BigQuery column display strings with four parts', async () => {
await seedLiveDatabaseScan('warehouse', 'sync-bigquery', 'bigquery');
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.resolveDisplayTarget('warehouse', 'analytics.public.orders.status')).resolves.toMatchObject({
resolved: { catalog: 'analytics', db: 'public', name: 'orders', column: 'status' },
candidates: [],
dialect: 'bigquery',
});
});
it('searches table names, column names, comments, and descriptions', async () => {
await seedLiveDatabaseScan();
const catalog = new WarehouseCatalogService({ fileStore: project.fileStore });
await expect(catalog.searchByName('warehouse', 'status', 10)).resolves.toEqual(
expect.arrayContaining([
expect.objectContaining({
kind: 'column',
ref: expect.objectContaining({ db: 'public', name: 'orders', column: 'status' }),
matchedOn: 'name',
}),
]),
);
});
});

View file

@ -1,448 +0,0 @@
import { getDialectForDriver } from '../../../connections/index.js';
import type { KtxFileStorePort } from '../../../core/index.js';
import type {
KtxConnectionDriver,
KtxSchemaColumn,
KtxSchemaForeignKey,
KtxSchemaTable,
KtxTableRef,
} from '../../../scan/types.js';
type CatalogDriver = KtxConnectionDriver | 'sqlite3';
export interface WarehouseCatalogServiceDeps {
fileStore: KtxFileStorePort;
}
interface WarehouseColumnDetail extends KtxSchemaColumn {
descriptions: Record<string, string>;
rowCount: number | null;
nullCount: number | null;
distinctCount: number | null;
nullRate: number | null;
sampleValues: string[];
}
export interface TableDetail {
connectionName: string;
catalog: string | null;
db: string | null;
name: string;
display: string;
kind: string;
comment: string | null;
description: string | null;
rowCount: number | null;
columns: WarehouseColumnDetail[];
foreignKeys: KtxSchemaForeignKey[];
}
export type RawSchemaHit =
| {
kind: 'table';
connectionName: string;
ref: KtxTableRef;
display: string;
matchedOn: 'name' | 'db' | 'comment' | 'description';
}
| {
kind: 'column';
connectionName: string;
ref: KtxTableRef & { column: string };
display: string;
matchedOn: 'name' | 'comment' | 'description';
};
export interface DisplayTargetResolution {
resolved: (KtxTableRef & { column?: string }) | null;
candidates: KtxTableRef[];
dialect: string;
}
interface ConnectionArtifact {
driver?: CatalogDriver;
}
interface RelationshipProfileColumn {
table?: KtxTableRef;
column?: string;
rowCount?: number;
nullCount?: number;
distinctCount?: number;
nullRate?: number;
sampleValues?: unknown[];
}
interface RelationshipProfileArtifact {
driver?: CatalogDriver;
tables?: Array<{ table?: KtxTableRef; rowCount?: number }>;
columns?: Record<string, RelationshipProfileColumn>;
}
interface ConnectionCatalog {
connectionName: string;
syncId: string;
driver: CatalogDriver;
tables: KtxSchemaTable[];
profile: RelationshipProfileArtifact | null;
}
type TableWithDescriptions = KtxSchemaTable & {
descriptions?: Record<string, string>;
columns: Array<KtxSchemaColumn & { descriptions?: Record<string, string> }>;
};
function normalize(value: string | null | undefined): string {
return (value ?? '').toLowerCase();
}
function refsEqual(left: KtxTableRef, right: KtxTableRef): boolean {
return (
normalize(left.catalog) === normalize(right.catalog) &&
normalize(left.db) === normalize(right.db) &&
normalize(left.name) === normalize(right.name)
);
}
function refKey(ref: KtxTableRef): string {
return [ref.catalog, ref.db, ref.name].map((part) => normalize(part)).join('.');
}
function columnKey(ref: KtxTableRef, column: string): string {
return `${refKey(ref)}.${normalize(column)}`;
}
function readJson<T>(content: string): T {
return JSON.parse(content) as T;
}
function cleanIdentifierPart(part: string): string {
return part.trim().replace(/^["'`\[]|["'`\]]$/g, '');
}
function splitDisplay(display: string): string[] {
return display
.trim()
.split('.')
.map(cleanIdentifierPart)
.filter(Boolean);
}
function formatDisplay(driver: CatalogDriver, table: KtxTableRef): string {
if (driver === 'sqlite' || driver === 'sqlite3') {
return table.name;
}
return [table.catalog, table.db, table.name].filter((part): part is string => Boolean(part)).join('.');
}
function parseDisplay(driver: CatalogDriver, display: string): KtxTableRef | null {
const parts = splitDisplay(display);
if (driver === 'sqlite' || driver === 'sqlite3') {
return parts.length === 1 ? { catalog: null, db: null, name: parts[0]! } : null;
}
if (driver === 'bigquery' || driver === 'snowflake' || driver === 'sqlserver') {
if (parts.length !== 3) {
return null;
}
return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
}
if (parts.length === 2) {
return { catalog: null, db: parts[0]!, name: parts[1]! };
}
if (parts.length === 3) {
return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
}
return parts.length === 1 ? { catalog: null, db: null, name: parts[0]! } : null;
}
function expectedDisplayPartCount(driver: CatalogDriver): number {
if (driver === 'sqlite' || driver === 'sqlite3') {
return 1;
}
if (driver === 'bigquery' || driver === 'snowflake' || driver === 'sqlserver') {
return 3;
}
return 2;
}
function parseColumnDisplay(driver: CatalogDriver, display: string): (KtxTableRef & { column: string }) | null {
const parts = splitDisplay(display);
const tablePartCount = expectedDisplayPartCount(driver);
if (parts.length !== tablePartCount + 1) {
return null;
}
const column = parts.at(-1);
if (!column) {
return null;
}
const table = parseDisplay(driver, parts.slice(0, -1).join('.'));
return table ? { ...table, column } : null;
}
function bestCandidates(tables: KtxSchemaTable[], display: string, limit = 5): KtxTableRef[] {
const needle = normalize(splitDisplay(display).at(-1) ?? display);
return tables
.map((table) => {
const name = normalize(table.name);
let score = 0;
if (name === needle) {
score = 100;
} else if (name.includes(needle) || needle.includes(name)) {
score = 80;
} else {
const samePrefix = [...name].filter((char, index) => needle[index] === char).length;
score = samePrefix / Math.max(name.length, needle.length, 1);
}
return { table, score };
})
.filter((entry) => entry.score > 0)
.sort((left, right) => right.score - left.score || left.table.name.localeCompare(right.table.name))
.slice(0, limit)
.map(({ table }) => ({ catalog: table.catalog, db: table.db, name: table.name }));
}
function firstDescription(descriptions: Record<string, string> | undefined): string | null {
return Object.values(descriptions ?? {}).find((value) => value.trim().length > 0) ?? null;
}
function matchedOnTable(table: TableWithDescriptions, query: string): RawSchemaHit['matchedOn'] | null {
const q = normalize(query);
if (!q) {
return null;
}
if (normalize(table.name).includes(q)) {
return 'name';
}
if (normalize(table.db).includes(q)) {
return 'db';
}
if (normalize(table.comment).includes(q)) {
return 'comment';
}
if (normalize(firstDescription(table.descriptions)).includes(q)) {
return 'description';
}
return null;
}
function matchedOnColumn(
column: KtxSchemaColumn & { descriptions?: Record<string, string> },
query: string,
): 'name' | 'comment' | 'description' | null {
const q = normalize(query);
if (!q) {
return null;
}
if (normalize(column.name).includes(q)) {
return 'name';
}
if (normalize(column.comment).includes(q)) {
return 'comment';
}
if (normalize(firstDescription(column.descriptions)).includes(q)) {
return 'description';
}
return null;
}
export class WarehouseCatalogService {
private readonly catalogs = new Map<string, Promise<ConnectionCatalog | null>>();
constructor(private readonly deps: WarehouseCatalogServiceDeps) {}
async hasScan(connectionName: string): Promise<boolean> {
return (await this.loadCatalog(connectionName)) !== null;
}
async getLatestSyncId(connectionName: string): Promise<string | null> {
return (await this.loadCatalog(connectionName))?.syncId ?? null;
}
async listTables(connectionName: string): Promise<KtxTableRef[]> {
const catalog = await this.loadCatalog(connectionName);
return catalog?.tables.map((table) => ({ catalog: table.catalog, db: table.db, name: table.name })) ?? [];
}
async getTable(ref: { connectionName: string } & KtxTableRef): Promise<TableDetail | null> {
const catalog = await this.loadCatalog(ref.connectionName);
if (!catalog) {
return null;
}
const table = catalog.tables.find((candidate) => refsEqual(candidate, ref)) as TableWithDescriptions | undefined;
if (!table) {
return null;
}
const profileTables = catalog.profile?.tables ?? [];
const profileTable = profileTables.find((candidate) => candidate.table && refsEqual(candidate.table, table));
const profileColumns = catalog.profile?.columns ?? {};
return {
connectionName: ref.connectionName,
catalog: table.catalog,
db: table.db,
name: table.name,
display: formatDisplay(catalog.driver, table),
kind: table.kind,
comment: table.comment,
description: firstDescription(table.descriptions),
rowCount: profileTable?.rowCount ?? table.estimatedRows ?? null,
columns: table.columns.map((rawColumn) => {
const column = rawColumn as KtxSchemaColumn & { descriptions?: Record<string, string> };
const profileColumn =
profileColumns[columnKey(table, column.name)] ??
Object.entries(profileColumns).find(
([key, value]) =>
normalize(key) === `${normalize(table.name)}.${normalize(column.name)}` ||
(value.table && refsEqual(value.table, table) && normalize(value.column) === normalize(column.name)),
)?.[1];
return {
...column,
descriptions: column.descriptions ?? {},
rowCount: profileColumn?.rowCount ?? null,
nullCount: profileColumn?.nullCount ?? null,
distinctCount: profileColumn?.distinctCount ?? null,
nullRate: profileColumn?.nullRate ?? null,
sampleValues: (profileColumn?.sampleValues ?? []).map((value) => String(value)),
};
}),
foreignKeys: table.foreignKeys,
};
}
async resolveDisplay(
connectionName: string,
display: string,
): Promise<{
resolved: KtxTableRef | null;
candidates: KtxTableRef[];
dialect: string;
}> {
const catalog = await this.loadCatalog(connectionName);
if (!catalog) {
return { resolved: null, candidates: [], dialect: 'unknown' };
}
const dialect = getDialectForDriver(catalog.driver).type;
const parsed = parseDisplay(catalog.driver, display);
if (!parsed) {
return { resolved: null, candidates: bestCandidates(catalog.tables, display), dialect };
}
const table = catalog.tables.find((candidate) => refsEqual(candidate, parsed));
if (!table) {
return { resolved: null, candidates: bestCandidates(catalog.tables, display), dialect };
}
return { resolved: { catalog: table.catalog, db: table.db, name: table.name }, candidates: [], dialect };
}
async resolveDisplayTarget(connectionName: string, display: string): Promise<DisplayTargetResolution> {
const catalog = await this.loadCatalog(connectionName);
if (!catalog) {
return { resolved: null, candidates: [], dialect: 'unknown' };
}
const dialect = getDialectForDriver(catalog.driver).type;
const tableResolution = await this.resolveDisplay(connectionName, display);
if (tableResolution.resolved) {
return tableResolution;
}
const parsedColumn = parseColumnDisplay(catalog.driver, display);
if (!parsedColumn) {
return { resolved: null, candidates: bestCandidates(catalog.tables, display), dialect };
}
const table = catalog.tables.find((candidate) => refsEqual(candidate, parsedColumn));
if (!table) {
return { resolved: null, candidates: bestCandidates(catalog.tables, display), dialect };
}
return {
resolved: {
catalog: table.catalog,
db: table.db,
name: table.name,
column: parsedColumn.column,
},
candidates: [],
dialect,
};
}
async searchByName(connectionName: string, query: string, limit: number): Promise<RawSchemaHit[]> {
const catalog = await this.loadCatalog(connectionName);
if (!catalog) {
return [];
}
const hits: RawSchemaHit[] = [];
for (const table of catalog.tables as TableWithDescriptions[]) {
const tableMatch = matchedOnTable(table, query);
if (tableMatch) {
hits.push({
kind: 'table',
connectionName,
ref: { catalog: table.catalog, db: table.db, name: table.name },
display: formatDisplay(catalog.driver, table),
matchedOn: tableMatch,
});
}
for (const column of table.columns) {
const columnMatch = matchedOnColumn(column, query);
if (!columnMatch) {
continue;
}
hits.push({
kind: 'column',
connectionName,
ref: { catalog: table.catalog, db: table.db, name: table.name, column: column.name },
display: `${formatDisplay(catalog.driver, table)}.${column.name}`,
matchedOn: columnMatch,
});
}
}
return hits.slice(0, Math.max(0, limit));
}
private loadCatalog(connectionName: string): Promise<ConnectionCatalog | null> {
const existing = this.catalogs.get(connectionName);
if (existing) {
return existing;
}
const pending = this.readCatalog(connectionName);
this.catalogs.set(connectionName, pending);
return pending;
}
private async readCatalog(connectionName: string): Promise<ConnectionCatalog | null> {
const root = `raw-sources/${connectionName}/live-database`;
const listed = await this.deps.fileStore.listFiles(root);
const connectionFiles = listed.files.filter((file) => file.endsWith('/connection.json')).sort();
const latestConnectionPath = connectionFiles.at(-1);
if (!latestConnectionPath) {
return null;
}
const latestRoot = latestConnectionPath.slice(0, -'/connection.json'.length);
const syncId = latestRoot.split('/').at(-1) ?? '';
const connection = readJson<ConnectionArtifact>((await this.deps.fileStore.readFile(latestConnectionPath)).content);
const tablesListing = await this.deps.fileStore.listFiles(`${latestRoot}/tables`);
const tables: KtxSchemaTable[] = [];
for (const tablePath of tablesListing.files.filter((file) => file.endsWith('.json')).sort()) {
tables.push(readJson<KtxSchemaTable>((await this.deps.fileStore.readFile(tablePath)).content));
}
let profile: RelationshipProfileArtifact | null = null;
try {
profile = readJson<RelationshipProfileArtifact>(
(await this.deps.fileStore.readFile(`${latestRoot}/enrichment/relationship-profile.json`)).content,
);
} catch {
profile = null;
}
return {
connectionName,
syncId,
driver: connection.driver ?? profile?.driver ?? 'postgres',
tables,
profile,
};
}
}