feat(context): polish mcp tool metadata

This commit is contained in:
Andrey Avtomonov 2026-05-16 02:27:03 +02:00
parent d7a494f187
commit 9a0f239753
4 changed files with 2433 additions and 129 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,16 @@
import { randomUUID } from 'node:crypto';
import type { ToolAnnotations } from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
import type { MemoryAgentInput } from '../memory/index.js';
import type { KtxMcpContextPorts, KtxMcpServerLike, KtxMcpToolResult, KtxMcpUserContext } from './types.js';
import type {
KtxMcpContextPorts,
KtxMcpProgressCallback,
KtxMcpServerLike,
KtxMcpToolHandlerContext,
KtxMcpToolResult,
KtxMcpUserContext,
NonArrayObject,
} from './types.js';
export interface RegisterKtxContextToolsDeps {
server: KtxMcpServerLike;
@ -10,38 +19,94 @@ export interface RegisterKtxContextToolsDeps {
}
const connectionIdSchema = z.string().min(1);
const unknownRecordSchema = z.record(z.string(), z.unknown());
const tableRefSchema = z.object({
catalog: z.string().nullable(),
db: z.string().nullable(),
name: z.string(),
});
const toolAnnotations = {
connection_list: { title: 'Connection List', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
discover_data: { title: 'Discover Data', readOnlyHint: true, openWorldHint: false },
wiki_search: { title: 'Wiki Search', readOnlyHint: true, openWorldHint: false },
wiki_read: { title: 'Wiki Read', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
entity_details: { title: 'Entity Details', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
dictionary_search: { title: 'Dictionary Search', readOnlyHint: true, openWorldHint: false },
sl_read_source: { title: 'Semantic Layer Read Source', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
sl_query: { title: 'Semantic Layer Query', readOnlyHint: true, openWorldHint: false },
sql_execution: { title: 'SQL Execution', readOnlyHint: true, openWorldHint: false },
memory_ingest: { title: 'Memory Ingest', destructiveHint: true, openWorldHint: false },
memory_ingest_status: { title: 'Memory Ingest Status', readOnlyHint: true, openWorldHint: false },
} satisfies Record<string, ToolAnnotations>;
const toolDescriptions = {
connection_list:
'List configured read-only data connections available to this KTX project. Use this before connection-scoped tools when the project may have multiple warehouses.',
discover_data:
'Search across KTX wiki pages, semantic-layer sources, measures, dimensions, raw tables, and columns. Example: discover_data({ query: "monthly orders by customer", connectionId: "warehouse", kinds: ["sl_source", "table"] }).',
wiki_search:
'Search KTX wiki pages for reusable business context. Example: wiki_search({ query: "revenue recognition", limit: 5 }).',
wiki_read: 'Read a KTX wiki page by key returned from wiki_search. Example: wiki_read({ key: "global/revenue" }).',
entity_details:
'Read table and column metadata from the latest live-database scan snapshot. Example: entity_details({ connectionId: "warehouse", entities: [{ table: { schema: "public", table: "orders" }, columns: ["id"] }] }).',
dictionary_search:
'Search profile-sampled warehouse values to locate likely source columns for business values. Example: dictionary_search({ values: ["Acme Corp"], connectionId: "warehouse" }).',
sl_read_source:
'Read a semantic-layer YAML source by connection id and source name. Example: sl_read_source({ connectionId: "warehouse", sourceName: "orders" }).',
sl_query:
'Execute a semantic-layer query and return rows, headers, generated SQL, and plan details. Example: sl_query({ connectionId: "warehouse", measures: ["orders.order_count"], dimensions: [{ dimension: "orders.created_at", granularity: "month" }] }).',
sql_execution:
'Execute one parser-validated read-only SQL query against a configured KTX connection. Example: sql_execution({ connectionId: "warehouse", sql: "select count(*) from public.orders", maxRows: 100 }).',
memory_ingest:
'Ingest free-form markdown knowledge into durable KTX memory. Use this for business rules, metric definitions, schema gotchas, recurring findings, or explicit user requests to remember something. Example: memory_ingest({ connectionId: "warehouse", content: "ARR is reported in cents in this warehouse." }).',
memory_ingest_status:
'Read the current or final status for a memory ingest run. Example: memory_ingest_status({ runId: "memory-run-1" }).',
} satisfies Record<string, string>;
const connectionListSchema = z.object({});
const knowledgeSearchSchema = z.object({
query: z.string().min(1),
limit: z.number().int().min(1).max(50).default(10),
query: z.string().min(1).describe('Natural-language wiki search query, e.g. "revenue recognition policy".'),
limit: z.number().int().min(1).max(50).default(10).describe('Maximum wiki pages to return. Defaults to 10.'),
});
const knowledgeReadSchema = z.object({
key: z.string().min(1),
key: z.string().min(1).describe('Wiki page key returned by wiki_search, e.g. "global/revenue".'),
});
const slReadSourceSchema = z.object({
connectionId: connectionIdSchema,
sourceName: z.string().min(1),
connectionId: connectionIdSchema.describe('Connection id that owns the semantic-layer source.'),
sourceName: z.string().min(1).describe('Semantic-layer source name without ".yaml", e.g. "orders".'),
});
const slQueryMeasureSchema = z.union([
z.string(),
z.string().describe('Semantic-layer measure key, e.g. "orders.order_count".'),
z.object({
expr: z.string().min(1),
name: z.string().min(1),
expr: z.string().min(1).describe('Ad hoc aggregate expression, e.g. "sum(orders.amount)".'),
name: z.string().min(1).describe('Alias for the ad hoc measure, e.g. "gross_revenue".'),
}),
]);
const slQueryDimensionSchema = z.union([
z.string(),
const slQueryDimensionSchema = z.preprocess(
(value) => {
if (typeof value === 'string') return { field: value };
if (value && typeof value === 'object' && !Array.isArray(value)) {
const obj = { ...(value as Record<string, unknown>) };
if (!('field' in obj) && typeof obj.dimension === 'string') obj.field = obj.dimension;
return obj;
}
return value;
},
z.object({
field: z.string().min(1),
granularity: z.string().min(1).optional(),
field: z.string().min(1).describe('Dimension to group by, e.g. "orders.created_at" or "orders.status".'),
granularity: z
.string()
.min(1)
.optional()
.describe('Time grain for time dimensions: day, week, month, quarter, or year.'),
}),
]);
);
const slQueryOrderBySchema = z.preprocess(
(value) => {
@ -75,53 +140,93 @@ const slQueryOrderBySchema = z.preprocess(
);
const slQuerySchema = z.object({
connectionId: connectionIdSchema.optional(),
measures: z.array(slQueryMeasureSchema).min(1),
dimensions: z.array(slQueryDimensionSchema).default([]),
filters: z.array(z.string()).default([]),
segments: z.array(z.string()).default([]),
order_by: z.array(slQueryOrderBySchema).default([]),
limit: z.number().int().min(0).default(1000),
include_empty: z.boolean().default(true),
connectionId: connectionIdSchema
.optional()
.describe('Connection id to query. Omit only when the project has exactly one configured connection.'),
measures: z.array(slQueryMeasureSchema).min(1).describe('Measures to select. Use semantic-layer keys when available.'),
dimensions: z
.array(slQueryDimensionSchema)
.default([])
.describe('Dimensions to group by. Strings and {dimension, granularity} are accepted.'),
filters: z
.array(z.string().describe('Semantic-layer filter expression, e.g. "orders.status = paid".'))
.default([])
.describe('Semantic-layer filter expressions to apply.'),
segments: z
.array(z.string().describe('Semantic-layer segment key to apply.'))
.default([])
.describe('Semantic-layer segment keys to apply.'),
order_by: z
.array(slQueryOrderBySchema)
.default([])
.describe('Sort clauses. Strings and Cube-style {id, desc} are accepted.'),
limit: z.number().int().min(0).default(1000).describe('Maximum rows to return. Defaults to 1000.'),
include_empty: z.boolean().default(true).describe('Whether to include empty dimension groups. Defaults to true.'),
});
const entityDetailsTableRefSchema = z.object({
catalog: z.string().nullable(),
db: z.string().nullable(),
name: z.string().min(1),
});
const entityDetailsTableRefSchema = z.preprocess(
(value) => {
if (value && typeof value === 'object' && !Array.isArray(value)) {
const obj = { ...(value as Record<string, unknown>) };
if (!('db' in obj) && typeof obj.schema === 'string') obj.db = obj.schema;
if (!('name' in obj) && typeof obj.table === 'string') obj.name = obj.table;
if (!('catalog' in obj)) obj.catalog = null;
return obj;
}
return value;
},
z.object({
catalog: z.string().nullable().describe('Catalog/project/database. Use null when not applicable.'),
db: z.string().nullable().describe('Schema/database/dataset. Use null when not applicable.'),
name: z.string().min(1).describe('Table name.'),
}),
);
const entityDetailsSchema = z.object({
connectionId: connectionIdSchema,
connectionId: connectionIdSchema.describe('Connection id whose latest scan snapshot should be read.'),
entities: z
.array(
z.object({
table: z.union([z.string().min(1), entityDetailsTableRefSchema]),
columns: z.array(z.string().min(1)).optional(),
table: z
.union([z.string().min(1), entityDetailsTableRefSchema])
.describe('Table display string or object ref. {schema, table} is accepted as an alias for {db, name}.'),
columns: z
.array(z.string().min(1).describe('Column name to inspect.'))
.optional()
.describe('Optional column filter.'),
}),
)
.min(1)
.max(20),
.max(20)
.describe('Tables or columns to inspect. Maximum 20 entities.'),
});
const dictionarySearchSchema = z.object({
values: z.array(z.string().min(1)).min(1).max(20),
connectionId: connectionIdSchema.optional(),
values: z
.array(z.string().min(1).describe('Business value to locate, e.g. "Acme Corp" or "enterprise".'))
.min(1)
.max(20)
.describe('Values to search for in sampled warehouse dictionaries.'),
connectionId: connectionIdSchema
.optional()
.describe('Optional connection id. Pass it when user intent pins a specific warehouse.'),
});
const discoverDataKindSchema = z.enum(['wiki', 'sl_source', 'sl_measure', 'sl_dimension', 'table', 'column']);
const discoverDataSchema = z.object({
query: z.string().min(1),
connectionId: connectionIdSchema.optional(),
kinds: z.array(discoverDataKindSchema).optional(),
limit: z.number().int().min(1).max(50).default(15).optional(),
query: z.string().min(1).describe('Natural-language discovery query, e.g. "monthly orders by customer".'),
connectionId: connectionIdSchema
.optional()
.describe('Optional connection id. Pass it when user intent pins a specific warehouse.'),
kinds: z.array(discoverDataKindSchema.describe('Reference kind to include.')).optional().describe('Optional kind filter.'),
limit: z.number().int().min(1).max(50).default(15).optional().describe('Maximum refs to return. Defaults to 15.'),
});
const sqlExecutionSchema = z.object({
connectionId: connectionIdSchema,
sql: z.string().min(1),
maxRows: z.number().int().min(1).max(10_000).default(1000).optional(),
connectionId: connectionIdSchema.describe('Connection id to execute against. Required for raw SQL.'),
sql: z.string().min(1).describe('Parser-validated read-only SQL, e.g. "select count(*) from public.orders".'),
maxRows: z.number().int().min(1).max(10_000).default(1000).optional().describe('Maximum rows to return. Defaults to 1000.'),
});
const memoryIngestSchema = z.object({
@ -142,7 +247,205 @@ const memoryIngestStatusSchema = z.object({
runId: z.string().min(1).describe('The memory ingest run id returned by memory_ingest.'),
});
export function jsonToolResult<T extends object>(structuredContent: T): KtxMcpToolResult<T> {
const connectionListOutputSchema = z.object({
connections: z.array(
z.object({
id: z.string(),
name: z.string(),
connectionType: z.string(),
}),
),
});
const wikiSearchOutputSchema = z.object({
results: z.array(
z.object({
key: z.string(),
path: z.string(),
scope: z.enum(['GLOBAL', 'USER']),
summary: z.string(),
score: z.number(),
matchReasons: z.array(z.string()).optional(),
lanes: z
.array(
z.object({
lane: z.string(),
status: z.string(),
requestedCandidatePoolLimit: z.number(),
effectiveCandidatePoolLimit: z.number(),
returnedCandidateCount: z.number(),
weight: z.number(),
reason: z.string().optional(),
}),
)
.optional(),
}),
),
totalFound: z.number(),
});
const wikiReadOutputSchema = z.object({
key: z.string(),
summary: z.string(),
content: z.string(),
scope: z.enum(['GLOBAL', 'USER']),
tags: z.array(z.string()).optional(),
refs: z.array(z.string()).optional(),
slRefs: z.array(z.string()).optional(),
});
const slReadSourceOutputSchema = z.object({
sourceName: z.string(),
yaml: z.string(),
});
const slQueryOutputSchema = z.object({
connectionId: z.string().optional(),
dialect: z.string().optional(),
sql: z.string(),
headers: z.array(z.string()),
rows: z.array(z.array(z.unknown())),
totalRows: z.number(),
plan: unknownRecordSchema.optional(),
});
const entityDetailsSnapshotOutputSchema = z.object({
syncId: z.string(),
extractedAt: z.string(),
scanRunId: z.string().nullable(),
});
const entityDetailsColumnOutputSchema = z.object({
name: z.string(),
nativeType: z.string(),
normalizedType: z.string(),
dimensionType: z.enum(['time', 'string', 'number', 'boolean']),
nullable: z.boolean(),
primaryKey: z.boolean(),
comment: z.string().nullable(),
});
const entityDetailsForeignKeyOutputSchema = z.object({
fromColumn: z.string(),
toCatalog: z.string().nullable(),
toDb: z.string().nullable(),
toTable: z.string(),
toColumn: z.string(),
constraintName: z.string().nullable(),
});
const entityDetailsOutputSchema = z.object({
results: z.array(
z.union([
z.object({
ok: z.literal(true),
connectionId: z.string(),
tableRef: tableRefSchema,
display: z.string(),
kind: z.enum(['table', 'view', 'external', 'event_stream']),
comment: z.string().nullable(),
estimatedRows: z.number().nullable(),
columns: z.array(entityDetailsColumnOutputSchema),
foreignKeys: z.array(entityDetailsForeignKeyOutputSchema),
snapshot: entityDetailsSnapshotOutputSchema,
}),
z.object({
ok: z.literal(false),
connectionId: z.string(),
table: z.union([z.string(), tableRefSchema]),
snapshot: entityDetailsSnapshotOutputSchema.optional(),
error: z.object({
code: z.enum(['scan_missing', 'table_not_found', 'ambiguous_table', 'column_not_found']),
message: z.string(),
candidates: z
.union([z.array(z.object({ tableRef: tableRefSchema, display: z.string() })), z.array(z.string())])
.optional(),
}),
}),
]),
),
});
const dictionarySearchOutputSchema = z.object({
searched: z.array(
z.object({
connectionId: z.string(),
coverage: z.object({
sampledRows: z.number().nullable(),
valuesPerColumn: z.number().nullable(),
profiledColumns: z.number(),
syncId: z.string().nullable(),
profiledAt: z.string().nullable(),
}),
status: z.enum(['ready', 'no_profile_artifact', 'no_candidate_columns']),
}),
),
results: z.array(
z.object({
value: z.string(),
matches: z.array(
z.object({
connectionId: z.string(),
sourceName: z.string(),
columnName: z.string(),
matchedValue: z.string(),
cardinality: z.number().nullable(),
}),
),
misses: z.array(
z.object({
connectionId: z.string(),
reason: z.enum(['no_profile_artifact', 'no_candidate_columns', 'value_not_in_sample']),
}),
),
}),
),
});
const discoverDataOutputSchema = z.object({
refs: z.array(
z.object({
kind: discoverDataKindSchema,
id: z.string(),
score: z.number(),
summary: z.string().nullable(),
snippet: z.string().nullable(),
matchedOn: z.enum(['name', 'display', 'description', 'comment', 'expr', 'sample_value', 'body']),
connectionId: z.string().optional(),
tableRef: tableRefSchema.optional(),
columnName: z.string().optional(),
}),
),
});
const sqlExecutionOutputSchema = z.object({
headers: z.array(z.string()),
headerTypes: z.array(z.string()).optional(),
rows: z.array(z.array(z.unknown())),
rowCount: z.number(),
});
const memoryIngestOutputSchema = z.object({
runId: z.string(),
});
const memoryIngestStatusOutputSchema = z.object({
runId: z.string(),
status: z.enum(['running', 'done', 'error']),
stage: z.string(),
done: z.boolean(),
captured: z.object({
wiki: z.array(z.string()),
sl: z.array(z.string()),
xrefs: z.array(z.string()),
}),
error: z.string().nullable(),
commitHash: z.string().nullable(),
skillsLoaded: z.array(z.string()),
signalDetected: z.boolean(),
});
export function jsonToolResult<T extends NonArrayObject>(structuredContent: T): KtxMcpToolResult<T> {
return {
content: [{ type: 'text', text: JSON.stringify(structuredContent, null, 2) }],
structuredContent,
@ -156,14 +459,53 @@ export function jsonErrorToolResult(text: string): KtxMcpToolResult<Record<strin
};
}
function formatToolError(error: unknown): string {
if (error instanceof z.ZodError) {
return error.issues
.map((issue) => `${issue.path.length > 0 ? issue.path.join('.') : '<root>'}: ${issue.message}`)
.join('\n');
}
return error instanceof Error ? error.message : String(error);
}
function mcpProgressCallback(context?: KtxMcpToolHandlerContext): KtxMcpProgressCallback | undefined {
const progressToken = context?._meta?.progressToken;
if (progressToken === undefined || !context?.sendNotification) {
return undefined;
}
return async (event) => {
await context.sendNotification?.({
method: 'notifications/progress',
params: {
progressToken,
progress: event.progress,
...(event.total !== undefined ? { total: event.total } : {}),
message: event.message,
},
});
};
}
function registerParsedTool<TSchema extends z.ZodType>(
server: KtxMcpServerLike,
name: string,
config: { title: string; description: string; inputSchema: unknown },
config: {
title: string;
description: string;
inputSchema: unknown;
outputSchema: unknown;
annotations: ToolAnnotations;
},
schema: TSchema,
handler: (input: z.infer<TSchema>) => Promise<KtxMcpToolResult>,
handler: (input: z.infer<TSchema>, context?: KtxMcpToolHandlerContext) => Promise<KtxMcpToolResult>,
): void {
server.registerTool(name, config, async (input) => handler(schema.parse(input)));
server.registerTool(name, config, async (input, context) => {
try {
return await handler(schema.parse(input), context);
} catch (error) {
return jsonErrorToolResult(formatToolError(error));
}
});
}
export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void {
@ -175,9 +517,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'connection_list',
{
title: 'Connection List',
description: 'List configured read-only data connections available to the KTX project.',
title: toolAnnotations.connection_list.title!,
description: toolDescriptions.connection_list,
inputSchema: connectionListSchema.shape,
outputSchema: connectionListOutputSchema,
annotations: toolAnnotations.connection_list,
},
connectionListSchema,
async () => jsonToolResult({ connections: await connections.list() }),
@ -190,9 +534,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'wiki_search',
{
title: 'Wiki Search',
description: 'Search KTX wiki pages and return ranked summaries.',
title: toolAnnotations.wiki_search.title!,
description: toolDescriptions.wiki_search,
inputSchema: knowledgeSearchSchema.shape,
outputSchema: wikiSearchOutputSchema,
annotations: toolAnnotations.wiki_search,
},
knowledgeSearchSchema,
async (input) =>
@ -209,9 +555,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'wiki_read',
{
title: 'Wiki Read',
description: 'Read a KTX wiki page by key.',
title: toolAnnotations.wiki_read.title!,
description: toolDescriptions.wiki_read,
inputSchema: knowledgeReadSchema.shape,
outputSchema: wikiReadOutputSchema,
annotations: toolAnnotations.wiki_read,
},
knowledgeReadSchema,
async (input) => {
@ -227,9 +575,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'sl_read_source',
{
title: 'Semantic Layer Read Source',
description: 'Read a semantic-layer YAML source by connection id and source name.',
title: toolAnnotations.sl_read_source.title!,
description: toolDescriptions.sl_read_source,
inputSchema: slReadSourceSchema.shape,
outputSchema: slReadSourceOutputSchema,
annotations: toolAnnotations.sl_read_source,
},
slReadSourceSchema,
async (input) => {
@ -244,29 +594,33 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'sl_query',
{
title: 'Semantic Layer Query',
description:
'Execute a semantic-layer query and return rows, headers, SQL, and the query plan. ' +
'order_by items use the shape {"field": "orders.created_at", "direction": "asc"|"desc"}; ' +
'a bare string is treated as field with direction "asc".',
title: toolAnnotations.sl_query.title!,
description: toolDescriptions.sl_query,
inputSchema: slQuerySchema.shape,
outputSchema: slQueryOutputSchema,
annotations: toolAnnotations.sl_query,
},
slQuerySchema,
async (input) =>
jsonToolResult(
await semanticLayer.query({
connectionId: input.connectionId,
query: {
measures: input.measures,
dimensions: input.dimensions,
filters: input.filters,
segments: input.segments,
order_by: input.order_by,
limit: input.limit,
include_empty: input.include_empty,
async (input, context) => {
const onProgress = mcpProgressCallback(context);
return jsonToolResult(
await semanticLayer.query(
{
connectionId: input.connectionId,
query: {
measures: input.measures,
dimensions: input.dimensions,
filters: input.filters,
segments: input.segments,
order_by: input.order_by,
limit: input.limit,
include_empty: input.include_empty,
},
},
}),
),
onProgress ? { onProgress } : undefined,
),
);
},
);
}
@ -276,9 +630,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'entity_details',
{
title: 'Entity Details',
description: 'Read raw table and column metadata from the latest KTX live-database scan snapshot.',
title: toolAnnotations.entity_details.title!,
description: toolDescriptions.entity_details,
inputSchema: entityDetailsSchema.shape,
outputSchema: entityDetailsOutputSchema,
annotations: toolAnnotations.entity_details,
},
entityDetailsSchema,
async (input) => jsonToolResult(await entityDetails.read(input)),
@ -291,10 +647,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'dictionary_search',
{
title: 'Dictionary Search',
description:
'Search profile-sampled warehouse values and report matching connection/source/column locations plus non-authoritative miss reasons.',
title: toolAnnotations.dictionary_search.title!,
description: toolDescriptions.dictionary_search,
inputSchema: dictionarySearchSchema.shape,
outputSchema: dictionarySearchOutputSchema,
annotations: toolAnnotations.dictionary_search,
},
dictionarySearchSchema,
async (input) => jsonToolResult(await dictionarySearch.search(input)),
@ -307,10 +664,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'discover_data',
{
title: 'Discover Data',
description:
'Search across KTX wiki pages, semantic-layer sources/measures/dimensions, and raw warehouse schema refs.',
title: toolAnnotations.discover_data.title!,
description: toolDescriptions.discover_data,
inputSchema: discoverDataSchema.shape,
outputSchema: discoverDataOutputSchema,
annotations: toolAnnotations.discover_data,
},
discoverDataSchema,
async (input) => jsonToolResult({ refs: await discover.search(input) }),
@ -323,24 +681,25 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'sql_execution',
{
title: 'SQL Execution',
description:
'Execute one parser-validated read-only SQL query against a configured KTX connection and return structured rows.',
title: toolAnnotations.sql_execution.title!,
description: toolDescriptions.sql_execution,
inputSchema: sqlExecutionSchema.shape,
outputSchema: sqlExecutionOutputSchema,
annotations: toolAnnotations.sql_execution,
},
sqlExecutionSchema,
async (input) => {
try {
return jsonToolResult(
await sqlExecution.execute({
async (input, context) => {
const onProgress = mcpProgressCallback(context);
return jsonToolResult(
await sqlExecution.execute(
{
connectionId: input.connectionId,
sql: input.sql,
maxRows: input.maxRows ?? 1000,
}),
);
} catch (error) {
return jsonErrorToolResult(error instanceof Error ? error.message : String(error));
}
},
onProgress ? { onProgress } : undefined,
),
);
},
);
}
@ -351,10 +710,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'memory_ingest',
{
title: 'Memory Ingest',
description:
'Ingest free-form markdown knowledge into KTX durable memory. Use this for business rules, metric definitions, schema gotchas, recurring findings, or explicit user requests to remember something.',
title: toolAnnotations.memory_ingest.title!,
description: toolDescriptions.memory_ingest,
inputSchema: memoryIngestSchema.shape,
outputSchema: memoryIngestOutputSchema,
annotations: toolAnnotations.memory_ingest,
},
memoryIngestSchema,
async (input) => {
@ -374,9 +734,11 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
server,
'memory_ingest_status',
{
title: 'Memory Ingest Status',
description: 'Read the current or final status for a memory ingest run.',
title: toolAnnotations.memory_ingest_status.title!,
description: toolDescriptions.memory_ingest_status,
inputSchema: memoryIngestStatusSchema.shape,
outputSchema: memoryIngestStatusOutputSchema,
annotations: toolAnnotations.memory_ingest_status,
},
memoryIngestStatusSchema,
async (input) => {

View file

@ -1,6 +1,8 @@
import { access, mkdtemp, readFile, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { InMemoryTransport } from '@modelcontextprotocol/sdk/inMemory.js';
import { describe, expect, it, vi } from 'vitest';
import {
createLocalProjectMemoryIngest,
@ -8,13 +10,15 @@ import {
type MemoryAgentInput,
} from '../memory/index.js';
import { initKtxProject } from '../project/index.js';
import { createKtxMcpServer } from './server.js';
import { jsonToolResult } from './context-tools.js';
import { createDefaultKtxMcpServer, createKtxMcpServer } from './server.js';
import type {
KtxDiscoverDataMcpPort,
KtxDictionarySearchMcpPort,
KtxEntityDetailsMcpPort,
KtxKnowledgeMcpPort,
KtxMcpContextPorts,
KtxMcpToolHandlerContext,
KtxSemanticLayerMcpPort,
KtxSqlExecutionMcpPort,
KtxSqlExecutionResponse,
@ -23,8 +27,14 @@ import type {
type RegisteredTool = {
name: string;
config: { title?: string; description?: string; inputSchema: unknown };
handler: (input: Record<string, unknown>) => Promise<unknown>;
config: {
title?: string;
description?: string;
inputSchema: unknown;
outputSchema?: unknown;
annotations?: Record<string, unknown>;
};
handler: (input: Record<string, unknown>, context?: KtxMcpToolHandlerContext) => Promise<unknown>;
};
function makeFakeServer() {
@ -47,7 +57,153 @@ function getTool(tools: RegisteredTool[], name: string): RegisteredTool {
return found;
}
const retainedToolNames = [
'connection_list',
'dictionary_search',
'discover_data',
'entity_details',
'memory_ingest',
'memory_ingest_status',
'sl_query',
'sl_read_source',
'sql_execution',
'wiki_read',
'wiki_search',
] as const;
function makeAllContextTools(): KtxMcpContextPorts {
return {
connections: {
list: vi.fn().mockResolvedValue([{ id: 'warehouse', name: 'Warehouse', connectionType: 'POSTGRES' }]),
},
knowledge: {
search: vi.fn<KtxKnowledgeMcpPort['search']>().mockResolvedValue({ results: [], totalFound: 0 }),
read: vi.fn<KtxKnowledgeMcpPort['read']>().mockResolvedValue({
key: 'revenue',
summary: 'Paid order value',
content: '# Revenue',
scope: 'GLOBAL',
tags: ['finance'],
refs: [],
slRefs: ['orders'],
}),
},
semanticLayer: {
readSource: vi.fn<KtxSemanticLayerMcpPort['readSource']>().mockResolvedValue({
sourceName: 'orders',
yaml: 'name: orders\n',
}),
query: vi.fn<KtxSemanticLayerMcpPort['query']>().mockResolvedValue({
sql: 'select 1',
headers: ['count'],
rows: [[1]],
totalRows: 1,
plan: { sources: ['orders'] },
}),
},
entityDetails: {
read: vi.fn<KtxEntityDetailsMcpPort['read']>().mockResolvedValue({ results: [] }),
},
dictionarySearch: {
search: vi.fn<KtxDictionarySearchMcpPort['search']>().mockResolvedValue({ searched: [], results: [] }),
},
discover: {
search: vi.fn<KtxDiscoverDataMcpPort['search']>().mockResolvedValue([]),
},
sqlExecution: {
execute: vi.fn<KtxSqlExecutionMcpPort['execute']>().mockResolvedValue({
headers: ['count'],
headerTypes: ['integer'],
rows: [[1]],
rowCount: 1,
}),
},
memoryIngest: {
ingest: vi.fn<MemoryIngestPort['ingest']>().mockResolvedValue({ runId: 'run-1' }),
status: vi.fn<MemoryIngestPort['status']>().mockResolvedValue({
runId: 'run-1',
status: 'done',
stage: 'done',
done: true,
captured: { wiki: [], sl: [], xrefs: [] },
error: null,
commitHash: null,
skillsLoaded: [],
signalDetected: false,
}),
},
};
}
async function listToolsThroughSdk(contextTools: KtxMcpContextPorts) {
const server = createDefaultKtxMcpServer({
name: 'ktx-test',
version: '0.0.0-test',
userContext: { userId: 'mcp-user' },
contextTools,
});
const client = new Client({ name: 'ktx-test-client', version: '0.0.0-test' });
const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair();
await Promise.all([server.connect(serverTransport), client.connect(clientTransport)]);
try {
return await client.listTools();
} finally {
await client.close();
await server.close();
}
}
describe('createKtxMcpServer', () => {
it('registers annotations and output schemas for every retained tool', async () => {
const fake = makeFakeServer();
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'mcp-user' },
contextTools: makeAllContextTools(),
});
expect(fake.tools.map((tool) => tool.name).sort()).toEqual([...retainedToolNames].sort());
const expectedAnnotations: Record<string, Record<string, unknown>> = {
connection_list: { title: 'Connection List', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
discover_data: { title: 'Discover Data', readOnlyHint: true, openWorldHint: false },
wiki_search: { title: 'Wiki Search', readOnlyHint: true, openWorldHint: false },
wiki_read: { title: 'Wiki Read', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
entity_details: { title: 'Entity Details', readOnlyHint: true, idempotentHint: true, openWorldHint: false },
dictionary_search: { title: 'Dictionary Search', readOnlyHint: true, openWorldHint: false },
sl_read_source: {
title: 'Semantic Layer Read Source',
readOnlyHint: true,
idempotentHint: true,
openWorldHint: false,
},
sl_query: { title: 'Semantic Layer Query', readOnlyHint: true, openWorldHint: false },
sql_execution: { title: 'SQL Execution', readOnlyHint: true, openWorldHint: false },
memory_ingest: { title: 'Memory Ingest', destructiveHint: true, openWorldHint: false },
memory_ingest_status: { title: 'Memory Ingest Status', readOnlyHint: true, openWorldHint: false },
};
for (const toolName of retainedToolNames) {
const tool = getTool(fake.tools, toolName);
expect(tool.config.title).toBe(expectedAnnotations[toolName]?.title);
expect(tool.config.annotations).toEqual(expectedAnnotations[toolName]);
expect(tool.config.outputSchema).toBeDefined();
const inputShape = tool.config.inputSchema as Record<string, { description?: string }>;
for (const inputSchema of Object.values(inputShape)) {
expect(inputSchema.description).toEqual(expect.any(String));
}
}
});
it('exposes annotations and output schemas through the SDK tools/list response', async () => {
const result = await listToolsThroughSdk(makeAllContextTools());
const toolNames = result.tools.map((tool) => tool.name).sort();
expect(toolNames).toEqual([...retainedToolNames].sort());
await expect(result.tools).toMatchFileSnapshot('__snapshots__/mcp-tools-list.json');
});
it('registers context tools without memory capture tools when memory capture is omitted', async () => {
const fake = makeFakeServer();
@ -121,11 +277,14 @@ describe('createKtxMcpServer', () => {
rowCount: 1,
},
});
expect(sqlExecution.execute).toHaveBeenCalledWith({
connectionId: 'warehouse',
sql: 'select status, count(*) from public.orders group by status',
maxRows: 50,
});
expect(sqlExecution.execute).toHaveBeenCalledWith(
{
connectionId: 'warehouse',
sql: 'select status, count(*) from public.orders group by status',
maxRows: 50,
},
undefined,
);
});
it('registers entity_details when the host provides an entity-details port', async () => {
@ -287,17 +446,131 @@ describe('createKtxMcpServer', () => {
],
});
expect(semanticLayer.query).toHaveBeenCalledWith({
connectionId: 'warehouse',
query: expect.objectContaining({
order_by: [
{ field: 'orders.total', direction: 'desc' },
{ field: 'orders.quarter_label', direction: 'asc' },
{ field: 'orders.created_at', direction: 'desc' },
{ field: 'orders.segment', direction: 'asc' },
],
}),
expect(semanticLayer.query).toHaveBeenCalledWith(
{
connectionId: 'warehouse',
query: expect.objectContaining({
order_by: [
{ field: 'orders.total', direction: 'desc' },
{ field: 'orders.quarter_label', direction: 'asc' },
{ field: 'orders.created_at', direction: 'desc' },
{ field: 'orders.segment', direction: 'asc' },
],
}),
},
undefined,
);
});
it('sl_query normalizes cube-style dimensions to field dimensions', async () => {
const fake = makeFakeServer();
const semanticLayer = makeAllContextTools().semanticLayer!;
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { semanticLayer },
});
await getTool(fake.tools, 'sl_query').handler({
connectionId: 'warehouse',
measures: ['orders.count'],
dimensions: [{ dimension: 'orders.created_at', granularity: 'month' }, 'orders.status'],
});
expect(semanticLayer.query).toHaveBeenCalledWith(
{
connectionId: 'warehouse',
query: expect.objectContaining({
dimensions: [{ field: 'orders.created_at', granularity: 'month' }, { field: 'orders.status' }],
}),
},
undefined,
);
});
it('entity_details normalizes sql-style schema table refs', async () => {
const fake = makeFakeServer();
const entityDetails = makeAllContextTools().entityDetails!;
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { entityDetails },
});
await getTool(fake.tools, 'entity_details').handler({
connectionId: 'warehouse',
entities: [{ table: { schema: 'public', table: 'orders' }, columns: ['id'] }],
});
expect(entityDetails.read).toHaveBeenCalledWith({
connectionId: 'warehouse',
entities: [{ table: { catalog: null, db: 'public', name: 'orders' }, columns: ['id'] }],
});
});
it('wraps handler exceptions in-band for non-sql tools', async () => {
const fake = makeFakeServer();
const knowledge: KtxKnowledgeMcpPort = {
search: vi.fn<KtxKnowledgeMcpPort['search']>().mockRejectedValue(new Error('wiki index unavailable')),
read: vi.fn(),
};
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { knowledge },
});
await expect(getTool(fake.tools, 'wiki_search').handler({ query: 'revenue' })).resolves.toEqual({
content: [{ type: 'text', text: 'wiki index unavailable' }],
isError: true,
});
});
it('wires sql_execution progress to MCP notifications when a progress token is present', async () => {
const fake = makeFakeServer();
const notifications: unknown[] = [];
const sqlExecution: KtxSqlExecutionMcpPort = {
execute: vi.fn<KtxSqlExecutionMcpPort['execute']>().mockImplementation(async (_input, options) => {
await options?.onProgress?.({ progress: 0, message: 'Validating SQL' });
await options?.onProgress?.({ progress: 0.3, message: 'Executing' });
await options?.onProgress?.({ progress: 1, message: 'Fetched 1 rows' });
return { headers: ['count'], rows: [[1]], rowCount: 1 };
}),
};
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { sqlExecution },
});
await getTool(fake.tools, 'sql_execution').handler(
{ connectionId: 'warehouse', sql: 'select 1' },
{
_meta: { progressToken: 'progress-1' },
sendNotification: async (notification) => {
notifications.push(notification);
},
},
);
expect(notifications).toEqual([
{
method: 'notifications/progress',
params: { progressToken: 'progress-1', progress: 0, message: 'Validating SQL' },
},
{
method: 'notifications/progress',
params: { progressToken: 'progress-1', progress: 0.3, message: 'Executing' },
},
{
method: 'notifications/progress',
params: { progressToken: 'progress-1', progress: 1, message: 'Fetched 1 rows' },
},
]);
});
it('registers discover_data when the host provides a discover port', async () => {
@ -700,17 +973,29 @@ describe('createKtxMcpServer', () => {
filters: ['orders.status = paid'],
limit: 25,
});
expect(contextTools.semanticLayer?.query).toHaveBeenCalledWith({
connectionId: '00000000-0000-4000-8000-000000000001',
query: {
measures: ['orders.count'],
dimensions: ['orders.created_at'],
filters: ['orders.status = paid'],
segments: [],
order_by: [],
limit: 25,
include_empty: true,
expect(contextTools.semanticLayer?.query).toHaveBeenCalledWith(
{
connectionId: '00000000-0000-4000-8000-000000000001',
query: {
measures: ['orders.count'],
dimensions: [{ field: 'orders.created_at' }],
filters: ['orders.status = paid'],
segments: [],
order_by: [],
limit: 25,
include_empty: true,
},
},
});
undefined,
);
});
it('keeps jsonToolResult typed to non-array objects', () => {
expect(jsonToolResult({ ok: true }).structuredContent).toEqual({ ok: true });
if (false) {
// @ts-expect-error bare arrays are not valid MCP structuredContent objects in KTX
jsonToolResult([]);
}
});
});

View file

@ -9,12 +9,35 @@ export interface KtxMcpTextContent {
text: string;
}
export interface KtxMcpToolResult<T extends object = object> {
export type NonArrayObject = object & { length?: never };
export interface KtxMcpToolResult<T extends NonArrayObject = NonArrayObject> {
content: KtxMcpTextContent[];
structuredContent?: T;
isError?: true;
}
export interface KtxMcpProgressEvent {
progress: number;
total?: number;
message: string;
}
export type KtxMcpProgressCallback = (event: KtxMcpProgressEvent) => void | Promise<void>;
export interface KtxMcpToolHandlerContext {
_meta?: { progressToken?: string | number; [key: string]: unknown };
sendNotification?: (notification: {
method: 'notifications/progress';
params: {
progressToken: string | number;
progress: number;
total?: number;
message?: string;
};
}) => Promise<void>;
}
export interface MemoryIngestPort {
ingest: MemoryIngestService['ingest'];
status: MemoryIngestService['status'];
@ -31,8 +54,10 @@ export interface KtxMcpServerLike {
title?: string;
description?: string;
inputSchema: unknown;
outputSchema?: unknown;
annotations?: Record<string, unknown>;
},
handler: (input: Record<string, unknown>) => Promise<unknown>,
handler: (input: Record<string, unknown>, context?: KtxMcpToolHandlerContext) => Promise<unknown>,
): void;
}
@ -91,7 +116,10 @@ export interface KtxSemanticLayerQueryResponse {
export interface KtxSemanticLayerMcpPort {
readSource(input: { connectionId: string; sourceName: string }): Promise<KtxSemanticLayerReadResponse | null>;
query(input: { connectionId?: string; query: SemanticLayerQueryInput }): Promise<KtxSemanticLayerQueryResponse>;
query(
input: { connectionId?: string; query: SemanticLayerQueryInput },
options?: { onProgress?: KtxMcpProgressCallback },
): Promise<KtxSemanticLayerQueryResponse>;
}
export interface KtxEntityDetailsMcpPort {
@ -114,7 +142,10 @@ export interface KtxSqlExecutionResponse {
}
export interface KtxSqlExecutionMcpPort {
execute(input: { connectionId: string; sql: string; maxRows: number }): Promise<KtxSqlExecutionResponse>;
execute(
input: { connectionId: string; sql: string; maxRows: number },
options?: { onProgress?: KtxMcpProgressCallback },
): Promise<KtxSqlExecutionResponse>;
}
export interface KtxMcpContextPorts {