refactor(mcp): remove admin ports from server factory

This commit is contained in:
Andrey Avtomonov 2026-05-16 02:13:10 +02:00
parent 337c083f05
commit 6392926bdc
4 changed files with 111 additions and 1483 deletions

View file

@ -1,5 +1,5 @@
import { createDefaultKtxMcpServer, createLocalProjectMcpContextPorts } from '@ktx/context/mcp';
import { createLocalProjectMemoryCapture } from '@ktx/context/memory';
import { createLocalProjectMemoryIngest } from '@ktx/context/memory';
import type { KtxLocalProject } from '@ktx/context/project';
import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import type { KtxCliIo } from './cli-runtime.js';
@ -41,17 +41,13 @@ export async function createKtxMcpServerFactory(input: {
localScan: {
createConnector: async (connectionId) => createKtxCliScanConnector(input.project, connectionId),
},
localIngest: {
semanticLayerCompute,
queryExecutor,
},
});
let memoryCapture: ReturnType<typeof createLocalProjectMemoryCapture> | undefined;
let memoryIngest: ReturnType<typeof createLocalProjectMemoryIngest> | undefined;
try {
memoryCapture = createLocalProjectMemoryCapture(input.project, { semanticLayerCompute, queryExecutor });
memoryIngest = createLocalProjectMemoryIngest(input.project, { semanticLayerCompute, queryExecutor });
} catch (error) {
input.io?.stderr.write(`KTX MCP memory_capture disabled: ${error instanceof Error ? error.message : String(error)}\n`);
input.io?.stderr.write(`KTX MCP memory_ingest disabled: ${error instanceof Error ? error.message : String(error)}\n`);
}
return () =>
@ -59,7 +55,9 @@ export async function createKtxMcpServerFactory(input: {
name: 'ktx',
version: input.cliVersion,
userContext: { userId: 'local' },
contextTools,
memoryCapture,
contextTools: {
...contextTools,
...(memoryIngest ? { memoryIngest } : {}),
},
});
}

File diff suppressed because it is too large Load diff

View file

@ -1,65 +1,19 @@
import YAML from 'yaml';
import {
type KtxSqlQueryExecutorPort,
localConnectionInfoFromConfig,
localConnectionTypeForConfig,
} from '../connections/index.js';
import { type KtxSqlQueryExecutorPort, localConnectionInfoFromConfig } from '../connections/index.js';
import type { KtxEmbeddingPort } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import {
createDefaultLocalIngestAdapters,
getLocalIngestStatus,
type IngestReportSnapshot,
ingestReportToMemoryFlowReplay,
type LocalIngestMcpOptions,
runLocalIngest,
runLocalMetabaseIngest,
} from '../ingest/index.js';
import { createLocalKtxEmbeddingProviderFromConfig, KtxIngestEmbeddingPortAdapter } from '../llm/index.js';
import type { KtxLocalProject } from '../project/index.js';
import {
createKtxEntityDetailsService,
getLocalScanReport,
getLocalScanStatus,
type KtxConnectionDriver,
type KtxScanConnector,
type KtxScanReport,
type LocalScanMcpOptions,
runLocalScan,
} from '../scan/index.js';
import { createKtxEntityDetailsService, type KtxScanConnector, type LocalScanMcpOptions } from '../scan/index.js';
import { createKtxDiscoverDataService } from '../search/index.js';
import type { SqlAnalysisDialect, SqlAnalysisPort } from '../sql-analysis/index.js';
import {
compileLocalSlQuery,
createKtxDictionarySearchService,
type LocalSlSourceSearchResult,
type LocalSlSourceSummary,
listLocalSlSources,
searchLocalSlSources,
sourceDefinitionSchema,
sourceOverlaySchema,
} from '../sl/index.js';
import { readLocalKnowledgePage, searchLocalKnowledgePages, writeLocalKnowledgePage } from '../wiki/local-knowledge.js';
import type {
KtxConnectionTestResponse,
KtxIngestStatusResponse,
KtxMcpContextPorts,
KtxScanArtifactListResponse,
KtxScanArtifactReadResponse,
KtxScanArtifactSummary,
KtxScanArtifactType,
KtxSqlExecutionResponse,
} from './types.js';
const LOCAL_AUTHOR = 'ktx';
const LOCAL_AUTHOR_EMAIL = 'ktx@example.com';
const SL_SHAPE_WARNING = 'Local stdio validation checks YAML shape only; Python semantic validation is not configured.';
import { compileLocalSlQuery, createKtxDictionarySearchService } from '../sl/index.js';
import { readLocalKnowledgePage, searchLocalKnowledgePages } from '../wiki/local-knowledge.js';
import type { KtxMcpContextPorts, KtxSqlExecutionResponse } from './types.js';
interface CreateLocalProjectMcpContextPortsOptions {
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: KtxSqlQueryExecutorPort;
sqlAnalysis?: SqlAnalysisPort;
localIngest?: LocalIngestMcpOptions;
localScan?: LocalScanMcpOptions;
embeddingService?: KtxEmbeddingPort | null;
}
@ -115,279 +69,16 @@ function assertSafeSourceName(sourceName: string): string {
return assertSafePathToken('semantic-layer source name', sourceName);
}
function normalizeScanDriver(driver: string | undefined): KtxConnectionDriver {
const normalized = (driver ?? '').toLowerCase();
if (
normalized === 'postgres' ||
normalized === 'postgresql' ||
normalized === 'sqlite' ||
normalized === 'sqlite3' ||
normalized === 'mysql' ||
normalized === 'clickhouse' ||
normalized === 'sqlserver' ||
normalized === 'bigquery' ||
normalized === 'snowflake'
) {
return normalized === 'sqlite3' ? 'sqlite' : normalized;
}
return 'postgres';
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
if (connector?.cleanup) {
await connector.cleanup();
}
}
async function testLocalConnection(
project: KtxLocalProject,
options: CreateLocalProjectMcpContextPortsOptions,
connectionId: string,
): Promise<KtxConnectionTestResponse | null> {
const safeConnectionId = assertSafeConnectionId(connectionId);
const connection = project.config.connections[safeConnectionId];
if (!connection) {
return null;
}
const connectionType = localConnectionTypeForConfig(safeConnectionId, connection);
const createConnector = options.localScan?.createConnector;
if (!createConnector) {
return {
id: safeConnectionId,
connectionType,
ok: true,
tableCount: null,
message: 'Connection is configured; no native scan connector is available for live testing.',
warnings: ['ktx serve was not configured with a local scan connector factory.'],
};
}
let connector: KtxScanConnector | null = null;
try {
connector = await createConnector(safeConnectionId);
const snapshot = await connector.introspect(
{
connectionId: safeConnectionId,
driver: normalizeScanDriver(connection.driver),
mode: 'structural',
dryRun: true,
detectRelationships: false,
},
{ runId: `connection-test-${safeConnectionId}` },
);
return {
id: safeConnectionId,
connectionType,
ok: true,
tableCount: snapshot.tables.length,
message: 'Connection test passed.',
warnings: [],
};
} catch (error) {
return {
id: safeConnectionId,
connectionType,
ok: false,
tableCount: null,
message: error instanceof Error ? error.message : String(error),
warnings: [],
};
} finally {
await cleanupConnector(connector);
}
}
function scanArtifactType(path: string, report: KtxScanReport): KtxScanArtifactType {
if (path === report.artifactPaths.reportPath) {
return 'report';
}
if (report.artifactPaths.manifestShards.includes(path)) {
return 'manifest_shard';
}
if (report.artifactPaths.enrichmentArtifacts.includes(path)) {
return 'enrichment_artifact';
}
return 'raw_source';
}
async function artifactSize(project: KtxLocalProject, path: string): Promise<number | undefined> {
try {
const result = await project.fileStore.readFile(path);
return typeof result.size === 'number' ? result.size : undefined;
} catch {
return undefined;
}
}
async function listArtifactsForReport(
project: KtxLocalProject,
runId: string,
report: KtxScanReport,
): Promise<KtxScanArtifactListResponse> {
const paths = new Set<string>();
if (report.artifactPaths.rawSourcesDir) {
const listed = await project.fileStore.listFiles(report.artifactPaths.rawSourcesDir);
for (const file of listed.files) {
paths.add(file);
}
}
if (report.artifactPaths.reportPath) {
paths.add(report.artifactPaths.reportPath);
}
for (const path of report.artifactPaths.manifestShards) {
paths.add(path);
}
for (const path of report.artifactPaths.enrichmentArtifacts) {
paths.add(path);
}
const artifacts: KtxScanArtifactSummary[] = [];
for (const path of [...paths].sort()) {
const size = await artifactSize(project, path);
artifacts.push({
path,
type: scanArtifactType(path, report),
...(size === undefined ? {} : { size }),
});
}
return { runId, artifacts };
}
async function readScanArtifact(
project: KtxLocalProject,
runId: string,
path: string,
): Promise<KtxScanArtifactReadResponse | null> {
const report = await getLocalScanReport(project, runId);
if (!report) {
return null;
}
const listed = await listArtifactsForReport(project, runId, report);
const artifact = listed.artifacts.find((candidate) => candidate.path === path);
if (!artifact) {
return null;
}
const result = await project.fileStore.readFile(path);
return {
runId,
path,
type: artifact.type,
...(typeof result.size === 'number' ? { size: result.size } : {}),
content: result.content,
};
}
function slPath(connectionId: string, sourceName: string): string {
return `semantic-layer/${assertSafeConnectionId(connectionId)}/${assertSafeSourceName(sourceName)}.yaml`;
}
function sourceNameFromPath(path: string): string {
return (
path
.split('/')
.at(-1)
?.replace(/\.ya?ml$/, '') ?? path
);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function parseYamlRecord(raw: string): Record<string, unknown> {
const parsed = YAML.parse(raw) as unknown;
if (!isRecord(parsed)) {
throw new Error('Semantic-layer source YAML must contain an object');
}
return parsed;
}
async function listSlPaths(project: KtxLocalProject, connectionId?: string): Promise<string[]> {
const root = connectionId ? `semantic-layer/${assertSafeConnectionId(connectionId)}` : 'semantic-layer';
const listed = await project.fileStore.listFiles(root);
return listed.files.filter((file) => file.endsWith('.yaml') || file.endsWith('.yml')).sort();
}
async function loadComputableSources(
project: KtxLocalProject,
connectionId: string,
): Promise<Record<string, unknown>[]> {
const paths = await listSlPaths(project, connectionId);
const sources: Record<string, unknown>[] = [];
for (const path of paths) {
const raw = await project.fileStore.readFile(path);
const source = parseYamlRecord(raw.content);
if (source.table || source.sql) {
sources.push(source);
}
}
return sources;
}
function validateSourceRecord(sourceName: string, source: Record<string, unknown>): string[] {
const namedSource = { ...source, name: typeof source.name === 'string' ? source.name : sourceName };
const definition = sourceDefinitionSchema.safeParse(namedSource);
if (definition.success) {
return [];
}
const overlay = sourceOverlaySchema.safeParse(namedSource);
if (overlay.success) {
return [];
}
return definition.error.issues.map((issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`);
}
function localIngestSourceDir(config: unknown): string | undefined {
if (!isRecord(config) || config.sourceDir === undefined) {
return undefined;
}
if (typeof config.sourceDir !== 'string' || config.sourceDir.trim().length === 0) {
throw new Error('Local ingest config sourceDir must be a non-empty string when provided');
}
return config.sourceDir;
}
function rawFileCountFromIngestReport(report: IngestReportSnapshot): number {
return new Set(report.body.workUnits.flatMap((workUnit) => workUnit.rawFiles)).size;
}
function hasSlSearchMetadata(
source: LocalSlSourceSummary | LocalSlSourceSearchResult,
): source is LocalSlSourceSearchResult {
return 'score' in source;
}
function statusFromIngestReport(report: IngestReportSnapshot): KtxIngestStatusResponse {
const failedWorkUnits = report.body.failedWorkUnits;
return {
runId: report.runId,
jobId: report.jobId,
reportId: report.id,
status: failedWorkUnits.length > 0 ? 'error' : 'done',
stage: 'done',
progress: 1,
errors: failedWorkUnits,
done: true,
adapter: report.sourceKey,
connectionId: report.connectionId,
sourceDir: null,
syncId: report.body.syncId,
startedAt: report.createdAt,
completedAt: report.createdAt,
previousRunId: null,
diffSummary: report.body.diffSummary,
workUnitCount: report.body.workUnits.length,
rawFileCount: rawFileCountFromIngestReport(report),
workUnits: report.body.workUnits.map((workUnit) => ({
unitKey: workUnit.unitKey,
rawFiles: [...workUnit.rawFiles],
peerFileIndex: [],
dependencyPaths: [],
})),
evictionDeletedRawPaths: [...report.body.evictionInputs],
};
}
async function executeValidatedReadOnlySql(
project: KtxLocalProject,
options: CreateLocalProjectMcpContextPortsOptions,
@ -453,9 +144,6 @@ export function createLocalProjectMcpContextPorts(
)
.sort((a, b) => a.id.localeCompare(b.id));
},
async test(input) {
return testLocalConnection(project, options, input.connectionId);
},
},
knowledge: {
async search(input) {
@ -495,58 +183,8 @@ export function createLocalProjectMcpContextPorts(
}
: null;
},
async write(input) {
const existing = await readLocalKnowledgePage(project, {
key: input.key,
userId: input.userId,
});
await writeLocalKnowledgePage(project, {
key: input.key,
scope: 'GLOBAL',
userId: input.userId,
summary: input.summary,
content: input.content,
tags: input.tags,
refs: input.refs,
slRefs: input.slRefs,
source: input.source,
intent: input.intent,
tables: input.tables,
representativeSql: input.representativeSql,
usage: input.usage,
fingerprints: input.fingerprints,
});
return { success: true, key: input.key, action: existing ? 'updated' : 'created' };
},
},
semanticLayer: {
async listSources(input) {
const listed: Array<LocalSlSourceSummary | LocalSlSourceSearchResult> = input.query
? await searchLocalSlSources(project, {
connectionId: input.connectionId,
query: input.query,
embeddingService,
})
: await listLocalSlSources(project, { connectionId: input.connectionId });
const sources = listed.map((source) => ({
connectionId: source.connectionId,
connectionName: source.connectionId,
name: source.name,
description: source.description,
columnCount: source.columnCount,
measureCount: source.measureCount,
joinCount: source.joinCount,
...(hasSlSearchMetadata(source) && source.frequencyTier ? { frequencyTier: source.frequencyTier } : {}),
...(hasSlSearchMetadata(source) && source.snippet ? { snippet: source.snippet } : {}),
...(hasSlSearchMetadata(source) ? { score: source.score } : {}),
...(hasSlSearchMetadata(source) && source.matchReasons ? { matchReasons: source.matchReasons } : {}),
...(hasSlSearchMetadata(source) && source.dictionaryMatches
? { dictionaryMatches: source.dictionaryMatches }
: {}),
...(hasSlSearchMetadata(source) && source.lanes ? { lanes: source.lanes } : {}),
}));
return { sources, totalSources: sources.length };
},
async readSource(input) {
const path = slPath(input.connectionId, input.sourceName);
try {
@ -556,71 +194,9 @@ export function createLocalProjectMcpContextPorts(
return null;
}
},
async writeSource(input) {
const path = slPath(input.connectionId, input.sourceName);
if (input.delete) {
const deleted = await project.fileStore.deleteFile(
path,
LOCAL_AUTHOR,
LOCAL_AUTHOR_EMAIL,
`Remove semantic-layer source: ${input.sourceName}`,
);
return { success: Boolean(deleted), sourceName: input.sourceName };
}
const yaml =
input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0, version: '1.1' });
parseYamlRecord(yaml);
await project.fileStore.writeFile(
path,
`${yaml.trimEnd()}\n`,
LOCAL_AUTHOR,
LOCAL_AUTHOR_EMAIL,
`Update semantic-layer source: ${input.sourceName}`,
);
return { success: true, sourceName: input.sourceName, yaml: `${yaml.trimEnd()}\n` };
},
async validate(input) {
if (options.semanticLayerCompute) {
const connectionId = assertSafeConnectionId(input.connectionId);
const result = await options.semanticLayerCompute.validateSources({
sources: await loadComputableSources(project, connectionId),
dialect: dialectForDriver(project.config.connections[connectionId]?.driver),
recentlyTouched: input.names,
});
return {
success: result.valid,
errors: result.errors,
warnings: result.warnings,
};
}
const names = new Set(input.names ?? []);
const paths = await listSlPaths(project, input.connectionId);
const errors: string[] = [];
for (const path of paths) {
const sourceName = sourceNameFromPath(path);
if (names.size > 0 && !names.has(sourceName)) {
continue;
}
try {
const raw = await project.fileStore.readFile(path);
errors.push(...validateSourceRecord(sourceName, parseYamlRecord(raw.content)));
} catch (error) {
errors.push(`${sourceName}: ${error instanceof Error ? error.message : String(error)}`);
}
}
return {
success: errors.length === 0,
errors,
warnings: [SL_SHAPE_WARNING],
};
},
async query(input) {
if (!options.semanticLayerCompute) {
throw new Error(
'sl_query requires a semantic-layer query adapter. Local stdio MCP exposes file-backed SL CRUD only.',
);
throw new Error('sl_query requires a semantic-layer query adapter.');
}
return compileLocalSlQuery(project, {
connectionId: input.connectionId,
@ -657,111 +233,5 @@ export function createLocalProjectMcpContextPorts(
};
}
if (options.localIngest) {
ports.ingest = {
async trigger(input) {
const sourceDir = localIngestSourceDir(input.config);
if (input.adapter === 'metabase' && !sourceDir) {
const result = await (options.localIngest?.runLocalMetabaseIngest ?? runLocalMetabaseIngest)({
project,
adapters: options.localIngest?.adapters ?? createDefaultLocalIngestAdapters(project),
metabaseConnectionId: input.connectionId,
trigger: input.trigger,
jobIdFactory: options.localIngest?.jobIdFactory,
pullConfigOptions: options.localIngest?.pullConfigOptions,
agentRunner: options.localIngest?.agentRunner,
llmProvider: options.localIngest?.llmProvider,
memoryModel: options.localIngest?.memoryModel,
semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute,
queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor,
logger: options.localIngest?.logger,
});
return {
runId: `metabase-fanout:${result.metabaseConnectionId}`,
jobId: undefined,
reportId: undefined,
fanout: {
status: result.status,
children: result.children.map((child) => ({
runId: child.report.runId,
jobId: child.report.jobId,
reportId: child.report.id,
targetConnectionId: child.targetConnectionId,
metabaseDatabaseId: child.metabaseDatabaseId,
})),
},
};
}
const executeLocalIngest = options.localIngest?.runLocalIngest ?? runLocalIngest;
const result = await executeLocalIngest({
project,
adapters: options.localIngest?.adapters ?? createDefaultLocalIngestAdapters(project),
adapter: input.adapter,
connectionId: input.connectionId,
sourceDir,
pullConfigOptions: options.localIngest?.pullConfigOptions,
trigger: input.trigger,
jobId: options.localIngest?.jobIdFactory?.(),
agentRunner: options.localIngest?.agentRunner,
llmProvider: options.localIngest?.llmProvider,
memoryModel: options.localIngest?.memoryModel,
semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute,
queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor,
logger: options.localIngest?.logger,
});
return {
runId: result.report.runId,
jobId: result.report.jobId,
reportId: result.report.id,
};
},
async status(input) {
const report = await getLocalIngestStatus(project, input.runId);
return report ? statusFromIngestReport(report) : null;
},
async report(input) {
return getLocalIngestStatus(project, input.runId);
},
async replay(input) {
const report = await getLocalIngestStatus(project, input.runId);
return report ? ingestReportToMemoryFlowReplay(report) : null;
},
};
}
if (options.localScan) {
ports.scan = {
async trigger(input) {
return runLocalScan({
project,
connectionId: input.connectionId,
mode: input.mode,
detectRelationships: input.detectRelationships,
dryRun: input.dryRun,
trigger: 'mcp',
adapters: options.localScan?.adapters,
databaseIntrospectionUrl: options.localScan?.databaseIntrospectionUrl,
createConnector: options.localScan?.createConnector,
jobId: options.localScan?.jobIdFactory?.(),
now: options.localScan?.now,
});
},
async status(input) {
return getLocalScanStatus(project, input.runId);
},
async report(input) {
return getLocalScanReport(project, input.runId);
},
async listArtifacts(input) {
const report = await getLocalScanReport(project, input.runId);
return report ? listArtifactsForReport(project, input.runId, report) : null;
},
async readArtifact(input) {
return readScanArtifact(project, input.runId, input.path);
},
};
}
return ports;
}

View file

@ -475,21 +475,17 @@ describe('createKtxMcpServer', () => {
});
it('runs MCP memory_ingest against a local project memory port', async () => {
const tempDir = await mkdtemp(join(tmpdir(), 'ktx-mcp-local-memory-'));
try {
const project = await initKtxProject({ projectDir: tempDir });
let receivedInput: MemoryAgentInput | undefined;
const agentRunner = {
runLoop: async ({
input,
toolSet,
}: {
input: MemoryAgentInput;
toolSet: Record<string, { execute: (input: unknown, options?: { toolCallId?: string }) => Promise<unknown> }>;
}) => {
receivedInput = input;
await toolSet.load_skill.execute({ name: 'wiki_capture' });
await toolSet.wiki_write.execute(
const tempDir = await mkdtemp(join(tmpdir(), 'ktx-mcp-local-memory-'));
try {
const project = await initKtxProject({ projectDir: tempDir });
const agentRunner = {
runLoop: async ({
toolSet,
}: {
toolSet: Record<string, { execute: (input: unknown, options?: { toolCallId?: string }) => Promise<unknown> }>;
}) => {
await toolSet.load_skill.execute({ name: 'wiki_capture' });
await toolSet.wiki_write.execute(
{
key: 'arr',
summary: 'ARR definition',
@ -504,6 +500,7 @@ describe('createKtxMcpServer', () => {
agentRunner: agentRunner as never,
runIdFactory: () => 'memory-run-mcp',
});
const ingestSpy = vi.spyOn(memoryIngest, 'ingest');
const fake = makeFakeServer();
createKtxMcpServer({
@ -520,7 +517,7 @@ describe('createKtxMcpServer', () => {
structuredContent: { runId: 'memory-run-mcp' },
});
await memoryIngest.waitForRun('memory-run-mcp');
expect(receivedInput).toMatchObject({
expect(ingestSpy).toHaveBeenCalledWith({
userId: 'local',
chatId: expect.stringMatching(/^mcp-/),
userMessage: 'Ingest external knowledge into KTX memory.',