feat: trim MCP query response payloads (#240)

This commit is contained in:
Andrey Avtomonov 2026-05-30 17:54:24 +02:00 committed by GitHub
parent cbbcf8e8bd
commit 25f639fba2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 235 additions and 1703 deletions

File diff suppressed because it is too large Load diff

View file

@ -12,6 +12,7 @@ import type {
KtxMcpToolHandlerContext,
KtxMcpToolResult,
KtxMcpUserContext,
KtxSemanticLayerQueryResponse,
NonArrayObject,
} from './types.js';
@ -60,7 +61,7 @@ const toolDescriptions = {
sl_read_source:
'Read a semantic-layer YAML source by connection id and source name. Example: sl_read_source({ connectionId: "warehouse", sourceName: "orders" }).',
sl_query:
'Execute a semantic-layer query and return rows, headers, generated SQL, and plan details. Example: sl_query({ connectionId: "warehouse", measures: ["orders.order_count"], dimensions: [{ field: "orders.created_at", granularity: "month" }] }).',
'Execute a semantic-layer query and return headers, rows, and total row count, plus correctness notes (e.g. compile-only or fan-out) when relevant. The generated SQL and full query plan are omitted by default; request them with include: ["sql"] and/or include: ["plan"]. Example: sl_query({ connectionId: "warehouse", measures: ["orders.order_count"], dimensions: [{ field: "orders.created_at", granularity: "month" }], include: ["sql"] }).',
sql_execution:
'Execute one parser-validated read-only SQL query against a configured KTX connection. Example: sql_execution({ connectionId: "warehouse", sql: "select count(*) from public.orders", maxRows: 100 }).',
memory_ingest:
@ -73,7 +74,7 @@ const connectionListSchema = z.object({});
const knowledgeSearchSchema = z.object({
query: z.string().min(1).describe('Natural-language wiki search query, e.g. "revenue recognition policy".'),
limit: z.number().int().min(1).max(50).default(10).describe('Maximum wiki pages to return. Defaults to 10.'),
limit: z.number().int().min(1).max(50).default(10).describe('Maximum wiki pages to return.'),
});
const knowledgeReadSchema = z.object({
@ -109,10 +110,7 @@ const slQueryOrderBySchema = z.object({
.describe(
'Field/measure/dimension id to order by, e.g. "orders.created_at", a dimension key like "mart_nrr_quarterly.quarter_label", or a measure alias.',
),
direction: z
.enum(['asc', 'desc'])
.default('asc')
.describe('Sort direction: "asc" or "desc". Defaults to "asc".'),
direction: z.enum(['asc', 'desc']).default('asc').describe('Sort direction for this field.'),
});
const slQuerySchema = z.object({
@ -136,8 +134,12 @@ const slQuerySchema = z.object({
.array(slQueryOrderBySchema)
.default([])
.describe('Sort clauses. Use {field, direction?} entries.'),
limit: z.number().int().min(0).default(1000).describe('Maximum rows to return. Defaults to 1000.'),
include_empty: z.boolean().default(true).describe('Whether to include empty dimension groups. Defaults to true.'),
limit: z.number().int().min(0).default(1000).describe('Maximum rows to return.'),
include_empty: z.boolean().default(true).describe('Whether to include empty dimension groups.'),
include: z
.array(z.enum(['plan', 'sql']))
.default([])
.describe('Extra detail to attach to the response: "sql" for the generated SQL, "plan" for the full query plan.'),
});
const entityDetailsTableRefSchema = z.object({
@ -184,13 +186,13 @@ const discoverDataSchema = z.object({
.optional()
.describe('Optional connection id. Pass it when user intent pins a specific warehouse.'),
kinds: z.array(discoverDataKindSchema.describe('Reference kind to include.')).optional().describe('Optional kind filter.'),
limit: z.number().int().min(1).max(50).default(15).optional().describe('Maximum refs to return. Defaults to 15.'),
limit: z.number().int().min(1).max(50).default(10).optional().describe('Maximum refs to return.'),
});
const sqlExecutionSchema = z.object({
connectionId: connectionIdSchema.describe('Connection id to execute against. Required for raw SQL.'),
sql: z.string().min(1).describe('Parser-validated read-only SQL, e.g. "select count(*) from public.orders".'),
maxRows: z.number().int().min(1).max(10_000).default(1000).optional().describe('Maximum rows to return. Defaults to 1000.'),
maxRows: z.number().int().min(1).max(10_000).default(1000).optional().describe('Maximum rows to return.'),
});
const memoryIngestSchema = z.object({
@ -266,10 +268,14 @@ const slReadSourceOutputSchema = z.object({
const slQueryOutputSchema = z.object({
connectionId: z.string().optional(),
dialect: z.string().optional(),
sql: z.string(),
headers: z.array(z.string()),
rows: z.array(z.array(z.unknown())),
totalRows: z.number(),
// Correctness signals hoisted out of `plan` so they survive default projection (e.g. compile-only
// status, fan-out warnings). Present only when there is something to report.
notes: z.array(z.string()).optional(),
// Opt-in detail, attached only when requested via the `include` input.
sql: z.string().optional(),
plan: unknownRecordSchema.optional(),
});
@ -411,12 +417,59 @@ const memoryIngestStatusOutputSchema = z.object({
/** @internal */
export function jsonToolResult<T extends NonArrayObject>(structuredContent: T): KtxMcpToolResult<T> {
// Compact (non-indented) JSON: this `content` text is the copy the model reads. Pretty-printing
// arrays-of-arrays (every `rows` payload) puts one scalar per line, inflating tabular results by
// a large constant factor. `structuredContent` carries the same data for structured-output clients.
return {
content: [{ type: 'text', text: JSON.stringify(structuredContent, null, 2) }],
content: [{ type: 'text', text: JSON.stringify(structuredContent) }],
structuredContent,
};
}
/**
* Pull the correctness-critical signals out of a query plan so they survive even when the caller
* did not opt into the full `plan`. Returns an empty list when there is nothing to flag.
*/
function slQueryNotes(plan: Record<string, unknown> | undefined): string[] {
if (!plan) {
return [];
}
const notes: string[] = [];
const execution = plan.execution;
if (
execution &&
typeof execution === 'object' &&
(execution as Record<string, unknown>).mode === 'compile_only'
) {
const reason = (execution as Record<string, unknown>).reason;
notes.push(typeof reason === 'string' ? reason : 'Compiled SQL only; no rows were executed.');
}
if (plan.has_fan_out === true) {
const description = typeof plan.fan_out_description === 'string' ? plan.fan_out_description.trim() : '';
notes.push(description.length > 0 ? description : 'Fan-out detected: measure totals may be inflated by joins.');
}
return notes;
}
/**
* Default sl_query response is the minimum the agent needs to read the result: connection, headers,
* rows, totals, plus any correctness notes. The generated `sql` and the full `plan` are attached only
* when explicitly requested via `include`, since both are large and echo information the caller already has.
*/
function projectSlQueryResult(result: KtxSemanticLayerQueryResponse, include: ('plan' | 'sql')[]) {
const notes = slQueryNotes(result.plan);
return {
...(result.connectionId !== undefined ? { connectionId: result.connectionId } : {}),
...(result.dialect !== undefined ? { dialect: result.dialect } : {}),
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
...(notes.length > 0 ? { notes } : {}),
...(include.includes('sql') ? { sql: result.sql } : {}),
...(include.includes('plan') && result.plan ? { plan: result.plan } : {}),
};
}
function jsonErrorToolResult(text: string): KtxMcpToolResult<Record<string, never>> {
return {
content: [{ type: 'text', text }],
@ -618,23 +671,22 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
slQuerySchema,
async (input, context) => {
const onProgress = mcpProgressCallback(context);
return jsonToolResult(
await semanticLayer.query(
{
connectionId: input.connectionId,
query: {
measures: input.measures,
dimensions: input.dimensions,
filters: input.filters,
segments: input.segments,
order_by: input.order_by,
limit: input.limit,
include_empty: input.include_empty,
},
const result = await semanticLayer.query(
{
connectionId: input.connectionId,
query: {
measures: input.measures,
dimensions: input.dimensions,
filters: input.filters,
segments: input.segments,
order_by: input.order_by,
limit: input.limit,
include_empty: input.include_empty,
},
onProgress ? { onProgress } : undefined,
),
},
onProgress ? { onProgress } : undefined,
);
return jsonToolResult(projectSlQueryResult(result, input.include));
},
);
}

View file

@ -110,7 +110,10 @@ interface KtxSemanticLayerReadResponse {
yaml: string;
}
interface KtxSemanticLayerQueryResponse {
/** @internal */
export interface KtxSemanticLayerQueryResponse {
connectionId?: string;
dialect?: string;
sql: string;
headers: string[];
rows: unknown[][];

View file

@ -167,7 +167,7 @@ async function wikiCandidates(
query: input.query,
userId: options.userId,
embeddingService: options.embeddingService ?? null,
limit: Math.max(input.limit ?? 15, 25),
limit: Math.max(input.limit ?? 10, 25),
});
const records: CandidateRecord[] = [];
for (const result of searchResults) {
@ -421,7 +421,8 @@ function hydrate(
}
return {
...ref,
score: maxScore > 0 ? Number((candidate.score / maxScore).toFixed(6)) : 0,
// 3 decimals is plenty for a relative-rank hint; 6 just spent bytes on noise.
score: maxScore > 0 ? Number((candidate.score / maxScore).toFixed(3)) : 0,
};
})
.filter((result): result is KtxDiscoverDataRef => result !== null);
@ -433,7 +434,7 @@ export function createKtxDiscoverDataService(
): { search(input: KtxDiscoverDataInput): Promise<KtxDiscoverDataResponse> } {
return {
async search(input) {
const limit = Math.max(1, Math.min(input.limit ?? 15, 50));
const limit = Math.max(1, Math.min(input.limit ?? 10, 50));
const query = input.query.trim();
if (!query) {
return [];

View file

@ -28,7 +28,12 @@ You have access to KTX MCP tools for data discovery, semantic-layer analysis, ra
- Read entity details before writing SQL against an unfamiliar table. Do not assume column names.
- Treat `sql_execution` as read-only. Writes are rejected by the server.
- Validate value mentions with `dictionary_search` instead of guessing case or spelling. Treat a `dictionary_search` miss as non-authoritative. The index is built from profile-sampled values, so a missing value may simply have been outside the sample. Follow up with `sql_execution` against the most plausible columns before concluding the value is absent.
- When `connection_list` shows multiple connections, pass an explicit `connectionId` to every tool that takes one and where user intent pins a specific warehouse. Required: `entity_details`, `sl_read_source`, and `sql_execution`. Required when user intent is warehouse-specific, including wording like "in our warehouse" or "this warehouse": `memory_ingest`; without `connectionId`, the memory agent cannot update the semantic layer and the knowledge lands as wiki-only. Pass `connectionId` when intent pins a warehouse, otherwise omit for unscoped discovery: `sl_query`, `discover_data`, and `dictionary_search`. Never pass `connectionId` to `connection_list`, `wiki_search`, `wiki_read`, or `memory_ingest_status`. If intent is ambiguous for a required-or-scoped tool, ask the user which warehouse before calling.
- `connectionId` scoping when `connection_list` shows multiple connections:
- Always pass it: `entity_details`, `sl_read_source`, `sql_execution`.
- Pass it when intent pins a warehouse, otherwise omit for unscoped discovery: `sl_query`, `discover_data`, `dictionary_search`.
- `memory_ingest`: pass it for warehouse-specific knowledge (e.g. "in our warehouse"); without it the memory lands as wiki-only and cannot update the semantic layer.
- Never pass it: `connection_list`, `wiki_search`, `wiki_read`, `memory_ingest_status`.
- If scoping is required but intent is ambiguous, ask which warehouse before calling.
- Show compact result tables for small outputs. For broad results, summarize the top findings and mention the applied limit.
- Ask a concise clarification only when the metric, date range, entity, or grain is genuinely ambiguous and cannot be inferred from context.
</rules>

View file

@ -65,7 +65,7 @@
},
"limit": {
"default": 10,
"description": "Maximum wiki pages to return. Defaults to 10.",
"description": "Maximum wiki pages to return.",
"type": "integer",
"minimum": 1,
"maximum": 50
@ -307,7 +307,7 @@
{
"name": "sl_query",
"title": "Semantic Layer Query",
"description": "Execute a semantic-layer query and return rows, headers, generated SQL, and plan details. Example: sl_query({ connectionId: \"warehouse\", measures: [\"orders.order_count\"], dimensions: [{ field: \"orders.created_at\", granularity: \"month\" }] }).",
"description": "Execute a semantic-layer query and return headers, rows, and total row count, plus correctness notes (e.g. compile-only or fan-out) when relevant. The generated SQL and full query plan are omitted by default; request them with include: [\"sql\"] and/or include: [\"plan\"]. Example: sl_query({ connectionId: \"warehouse\", measures: [\"orders.order_count\"], dimensions: [{ field: \"orders.created_at\", granularity: \"month\" }], include: [\"sql\"] }).",
"inputSchema": {
"type": "object",
"properties": {
@ -403,7 +403,7 @@
},
"direction": {
"default": "asc",
"description": "Sort direction: \"asc\" or \"desc\". Defaults to \"asc\".",
"description": "Sort direction for this field.",
"type": "string",
"enum": [
"asc",
@ -418,15 +418,27 @@
},
"limit": {
"default": 1000,
"description": "Maximum rows to return. Defaults to 1000.",
"description": "Maximum rows to return.",
"type": "integer",
"minimum": 0,
"maximum": 9007199254740991
},
"include_empty": {
"default": true,
"description": "Whether to include empty dimension groups. Defaults to true.",
"description": "Whether to include empty dimension groups.",
"type": "boolean"
},
"include": {
"default": [],
"description": "Extra detail to attach to the response: \"sql\" for the generated SQL, \"plan\" for the full query plan.",
"type": "array",
"items": {
"type": "string",
"enum": [
"plan",
"sql"
]
}
}
},
"required": [
@ -443,9 +455,6 @@
"dialect": {
"type": "string"
},
"sql": {
"type": "string"
},
"headers": {
"type": "array",
"items": {
@ -462,6 +471,15 @@
"totalRows": {
"type": "number"
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
},
"sql": {
"type": "string"
},
"plan": {
"type": "object",
"propertyNames": {
@ -471,7 +489,6 @@
}
},
"required": [
"sql",
"headers",
"rows",
"totalRows"
@ -1241,8 +1258,8 @@
}
},
"limit": {
"description": "Maximum refs to return. Defaults to 15.",
"default": 15,
"description": "Maximum refs to return.",
"default": 10,
"type": "integer",
"minimum": 1,
"maximum": 50
@ -1396,7 +1413,7 @@
"description": "Parser-validated read-only SQL, e.g. \"select count(*) from public.orders\"."
},
"maxRows": {
"description": "Maximum rows to return. Defaults to 1000.",
"description": "Maximum rows to return.",
"default": 1000,
"type": "integer",
"minimum": 1,

View file

@ -307,16 +307,12 @@ describe('createKtxMcpServer', () => {
content: [
{
type: 'text',
text: JSON.stringify(
{
headers: ['status', 'count'],
headerTypes: ['text', 'bigint'],
rows: [['paid', 42]],
rowCount: 1,
},
null,
2,
),
text: JSON.stringify({
headers: ['status', 'count'],
headerTypes: ['text', 'bigint'],
rows: [['paid', 42]],
rowCount: 1,
}),
},
],
structuredContent: {
@ -598,6 +594,92 @@ describe('createKtxMcpServer', () => {
);
});
it('sl_query default response omits plan and sql but keeps compile-only and fan-out notes', async () => {
const fake = makeFakeServer();
const semanticLayer: KtxSemanticLayerMcpPort = {
readSource: vi.fn(),
query: vi.fn<KtxSemanticLayerMcpPort['query']>().mockResolvedValue({
connectionId: 'warehouse',
dialect: 'postgres',
sql: 'select count(*) from public.orders',
headers: ['order_count'],
rows: [],
totalRows: 0,
plan: {
sources_used: ['orders'],
has_fan_out: true,
fan_out_description: 'orders fans out across line_items',
execution: { mode: 'compile_only', reason: 'No execution adapter configured.' },
},
}),
};
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { semanticLayer },
});
const result = await getTool(fake.tools, 'sl_query').handler({
connectionId: 'warehouse',
measures: ['orders.order_count'],
});
expect(result).toMatchObject({
structuredContent: {
connectionId: 'warehouse',
dialect: 'postgres',
headers: ['order_count'],
rows: [],
totalRows: 0,
notes: ['No execution adapter configured.', 'orders fans out across line_items'],
},
});
const structured = (result as { structuredContent: Record<string, unknown> }).structuredContent;
expect(structured.sql).toBeUndefined();
expect(structured.plan).toBeUndefined();
});
it('sl_query attaches sql and plan only when include requests them', async () => {
const fake = makeFakeServer();
const plan = { sources_used: ['orders'], execution: { mode: 'executed' } };
const semanticLayer: KtxSemanticLayerMcpPort = {
readSource: vi.fn(),
query: vi.fn<KtxSemanticLayerMcpPort['query']>().mockResolvedValue({
connectionId: 'warehouse',
dialect: 'postgres',
sql: 'select count(*) from public.orders',
headers: ['order_count'],
rows: [[3]],
totalRows: 1,
plan,
}),
};
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { semanticLayer },
});
const result = await getTool(fake.tools, 'sl_query').handler({
connectionId: 'warehouse',
measures: ['orders.order_count'],
include: ['plan', 'sql'],
});
expect(result).toMatchObject({
structuredContent: {
sql: 'select count(*) from public.orders',
plan,
rows: [[3]],
totalRows: 1,
},
});
const structured = (result as { structuredContent: Record<string, unknown> }).structuredContent;
expect(structured.notes).toBeUndefined();
});
it('entity_details rejects sql-style schema table ref aliases', async () => {
const fake = makeFakeServer();
const entityDetails = makeAllContextTools().entityDetails!;
@ -798,7 +880,7 @@ describe('createKtxMcpServer', () => {
connectionId: '00000000-0000-4000-8000-000000000001',
}),
).resolves.toEqual({
content: [{ type: 'text', text: JSON.stringify({ runId: 'run-1' }, null, 2) }],
content: [{ type: 'text', text: JSON.stringify({ runId: 'run-1' }) }],
structuredContent: { runId: 'run-1' },
});
expect(ingest.ingest).toHaveBeenCalledWith({
@ -825,21 +907,17 @@ describe('createKtxMcpServer', () => {
content: [
{
type: 'text',
text: JSON.stringify(
{
runId: 'run-1',
status: 'done',
stage: 'done',
done: true,
captured: { wiki: ['revenue'], sl: [], xrefs: [] },
error: null,
commitHash: 'abc123',
skillsLoaded: ['wiki_capture'],
signalDetected: true,
},
null,
2,
),
text: JSON.stringify({
runId: 'run-1',
status: 'done',
stage: 'done',
done: true,
captured: { wiki: ['revenue'], sl: [], xrefs: [] },
error: null,
commitHash: 'abc123',
skillsLoaded: ['wiki_capture'],
signalDetected: true,
}),
},
],
structuredContent: {
@ -1047,19 +1125,15 @@ describe('createKtxMcpServer', () => {
content: [
{
type: 'text',
text: JSON.stringify(
{
connections: [
{
id: '00000000-0000-4000-8000-000000000001',
name: 'Warehouse',
connectionType: 'POSTGRES',
},
],
},
null,
2,
),
text: JSON.stringify({
connections: [
{
id: '00000000-0000-4000-8000-000000000001',
name: 'Warehouse',
connectionType: 'POSTGRES',
},
],
}),
},
],
structuredContent: {