feat: trim MCP query response payloads

This commit is contained in:
Andrey Avtomonov 2026-05-30 16:21:21 +02:00
parent 8ebc4ce107
commit 133a2f700a
7 changed files with 235 additions and 1703 deletions

File diff suppressed because it is too large Load diff

View file

@ -12,6 +12,7 @@ import type {
KtxMcpToolHandlerContext,
KtxMcpToolResult,
KtxMcpUserContext,
KtxSemanticLayerQueryResponse,
NonArrayObject,
} from './types.js';
@ -60,7 +61,7 @@ const toolDescriptions = {
sl_read_source:
'Read a semantic-layer YAML source by connection id and source name. Example: sl_read_source({ connectionId: "warehouse", sourceName: "orders" }).',
sl_query:
'Execute a semantic-layer query and return rows, headers, generated SQL, and plan details. Example: sl_query({ connectionId: "warehouse", measures: ["orders.order_count"], dimensions: [{ field: "orders.created_at", granularity: "month" }] }).',
'Execute a semantic-layer query and return headers, rows, and total row count, plus correctness notes (e.g. compile-only or fan-out) when relevant. The generated SQL and full query plan are omitted by default; request them with include: ["sql"] and/or include: ["plan"]. Example: sl_query({ connectionId: "warehouse", measures: ["orders.order_count"], dimensions: [{ field: "orders.created_at", granularity: "month" }], include: ["sql"] }).',
sql_execution:
'Execute one parser-validated read-only SQL query against a configured KTX connection. Example: sql_execution({ connectionId: "warehouse", sql: "select count(*) from public.orders", maxRows: 100 }).',
memory_ingest:
@ -73,7 +74,7 @@ const connectionListSchema = z.object({});
const knowledgeSearchSchema = z.object({
query: z.string().min(1).describe('Natural-language wiki search query, e.g. "revenue recognition policy".'),
limit: z.number().int().min(1).max(50).default(10).describe('Maximum wiki pages to return. Defaults to 10.'),
limit: z.number().int().min(1).max(50).default(10).describe('Maximum wiki pages to return.'),
});
const knowledgeReadSchema = z.object({
@ -109,10 +110,7 @@ const slQueryOrderBySchema = z.object({
.describe(
'Field/measure/dimension id to order by, e.g. "orders.created_at", a dimension key like "mart_nrr_quarterly.quarter_label", or a measure alias.',
),
direction: z
.enum(['asc', 'desc'])
.default('asc')
.describe('Sort direction: "asc" or "desc". Defaults to "asc".'),
direction: z.enum(['asc', 'desc']).default('asc').describe('Sort direction for this field.'),
});
const slQuerySchema = z.object({
@ -136,8 +134,12 @@ const slQuerySchema = z.object({
.array(slQueryOrderBySchema)
.default([])
.describe('Sort clauses. Use {field, direction?} entries.'),
limit: z.number().int().min(0).default(1000).describe('Maximum rows to return. Defaults to 1000.'),
include_empty: z.boolean().default(true).describe('Whether to include empty dimension groups. Defaults to true.'),
limit: z.number().int().min(0).default(1000).describe('Maximum rows to return.'),
include_empty: z.boolean().default(true).describe('Whether to include empty dimension groups.'),
include: z
.array(z.enum(['plan', 'sql']))
.default([])
.describe('Extra detail to attach to the response: "sql" for the generated SQL, "plan" for the full query plan.'),
});
const entityDetailsTableRefSchema = z.object({
@ -184,13 +186,13 @@ const discoverDataSchema = z.object({
.optional()
.describe('Optional connection id. Pass it when user intent pins a specific warehouse.'),
kinds: z.array(discoverDataKindSchema.describe('Reference kind to include.')).optional().describe('Optional kind filter.'),
limit: z.number().int().min(1).max(50).default(15).optional().describe('Maximum refs to return. Defaults to 15.'),
limit: z.number().int().min(1).max(50).default(10).optional().describe('Maximum refs to return.'),
});
const sqlExecutionSchema = z.object({
connectionId: connectionIdSchema.describe('Connection id to execute against. Required for raw SQL.'),
sql: z.string().min(1).describe('Parser-validated read-only SQL, e.g. "select count(*) from public.orders".'),
maxRows: z.number().int().min(1).max(10_000).default(1000).optional().describe('Maximum rows to return. Defaults to 1000.'),
maxRows: z.number().int().min(1).max(10_000).default(1000).optional().describe('Maximum rows to return.'),
});
const memoryIngestSchema = z.object({
@ -266,10 +268,14 @@ const slReadSourceOutputSchema = z.object({
const slQueryOutputSchema = z.object({
connectionId: z.string().optional(),
dialect: z.string().optional(),
sql: z.string(),
headers: z.array(z.string()),
rows: z.array(z.array(z.unknown())),
totalRows: z.number(),
// Correctness signals hoisted out of `plan` so they survive default projection (e.g. compile-only
// status, fan-out warnings). Present only when there is something to report.
notes: z.array(z.string()).optional(),
// Opt-in detail, attached only when requested via the `include` input.
sql: z.string().optional(),
plan: unknownRecordSchema.optional(),
});
@ -411,12 +417,59 @@ const memoryIngestStatusOutputSchema = z.object({
/** @internal */
export function jsonToolResult<T extends NonArrayObject>(structuredContent: T): KtxMcpToolResult<T> {
// Compact (non-indented) JSON: this `content` text is the copy the model reads. Pretty-printing
// arrays-of-arrays (every `rows` payload) puts one scalar per line, inflating tabular results by
// a large constant factor. `structuredContent` carries the same data for structured-output clients.
return {
content: [{ type: 'text', text: JSON.stringify(structuredContent, null, 2) }],
content: [{ type: 'text', text: JSON.stringify(structuredContent) }],
structuredContent,
};
}
/**
* Pull the correctness-critical signals out of a query plan so they survive even when the caller
* did not opt into the full `plan`. Returns an empty list when there is nothing to flag.
*/
function slQueryNotes(plan: Record<string, unknown> | undefined): string[] {
if (!plan) {
return [];
}
const notes: string[] = [];
const execution = plan.execution;
if (
execution &&
typeof execution === 'object' &&
(execution as Record<string, unknown>).mode === 'compile_only'
) {
const reason = (execution as Record<string, unknown>).reason;
notes.push(typeof reason === 'string' ? reason : 'Compiled SQL only; no rows were executed.');
}
if (plan.has_fan_out === true) {
const description = typeof plan.fan_out_description === 'string' ? plan.fan_out_description.trim() : '';
notes.push(description.length > 0 ? description : 'Fan-out detected: measure totals may be inflated by joins.');
}
return notes;
}
/**
* Default sl_query response is the minimum the agent needs to read the result: connection, headers,
* rows, totals, plus any correctness notes. The generated `sql` and the full `plan` are attached only
* when explicitly requested via `include`, since both are large and echo information the caller already has.
*/
function projectSlQueryResult(result: KtxSemanticLayerQueryResponse, include: ('plan' | 'sql')[]) {
const notes = slQueryNotes(result.plan);
return {
...(result.connectionId !== undefined ? { connectionId: result.connectionId } : {}),
...(result.dialect !== undefined ? { dialect: result.dialect } : {}),
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
...(notes.length > 0 ? { notes } : {}),
...(include.includes('sql') ? { sql: result.sql } : {}),
...(include.includes('plan') && result.plan ? { plan: result.plan } : {}),
};
}
function jsonErrorToolResult(text: string): KtxMcpToolResult<Record<string, never>> {
return {
content: [{ type: 'text', text }],
@ -618,23 +671,22 @@ export function registerKtxContextTools(deps: RegisterKtxContextToolsDeps): void
slQuerySchema,
async (input, context) => {
const onProgress = mcpProgressCallback(context);
return jsonToolResult(
await semanticLayer.query(
{
connectionId: input.connectionId,
query: {
measures: input.measures,
dimensions: input.dimensions,
filters: input.filters,
segments: input.segments,
order_by: input.order_by,
limit: input.limit,
include_empty: input.include_empty,
},
const result = await semanticLayer.query(
{
connectionId: input.connectionId,
query: {
measures: input.measures,
dimensions: input.dimensions,
filters: input.filters,
segments: input.segments,
order_by: input.order_by,
limit: input.limit,
include_empty: input.include_empty,
},
onProgress ? { onProgress } : undefined,
),
},
onProgress ? { onProgress } : undefined,
);
return jsonToolResult(projectSlQueryResult(result, input.include));
},
);
}

View file

@ -110,7 +110,10 @@ interface KtxSemanticLayerReadResponse {
yaml: string;
}
interface KtxSemanticLayerQueryResponse {
/** @internal */
export interface KtxSemanticLayerQueryResponse {
connectionId?: string;
dialect?: string;
sql: string;
headers: string[];
rows: unknown[][];

View file

@ -167,7 +167,7 @@ async function wikiCandidates(
query: input.query,
userId: options.userId,
embeddingService: options.embeddingService ?? null,
limit: Math.max(input.limit ?? 15, 25),
limit: Math.max(input.limit ?? 10, 25),
});
const records: CandidateRecord[] = [];
for (const result of searchResults) {
@ -421,7 +421,8 @@ function hydrate(
}
return {
...ref,
score: maxScore > 0 ? Number((candidate.score / maxScore).toFixed(6)) : 0,
// 3 decimals is plenty for a relative-rank hint; 6 just spent bytes on noise.
score: maxScore > 0 ? Number((candidate.score / maxScore).toFixed(3)) : 0,
};
})
.filter((result): result is KtxDiscoverDataRef => result !== null);
@ -433,7 +434,7 @@ export function createKtxDiscoverDataService(
): { search(input: KtxDiscoverDataInput): Promise<KtxDiscoverDataResponse> } {
return {
async search(input) {
const limit = Math.max(1, Math.min(input.limit ?? 15, 50));
const limit = Math.max(1, Math.min(input.limit ?? 10, 50));
const query = input.query.trim();
if (!query) {
return [];

View file

@ -28,7 +28,12 @@ You have access to KTX MCP tools for data discovery, semantic-layer analysis, ra
- Read entity details before writing SQL against an unfamiliar table. Do not assume column names.
- Treat `sql_execution` as read-only. Writes are rejected by the server.
- Validate value mentions with `dictionary_search` instead of guessing case or spelling. Treat a `dictionary_search` miss as non-authoritative. The index is built from profile-sampled values, so a missing value may simply have been outside the sample. Follow up with `sql_execution` against the most plausible columns before concluding the value is absent.
- When `connection_list` shows multiple connections, pass an explicit `connectionId` to every tool that takes one and where user intent pins a specific warehouse. Required: `entity_details`, `sl_read_source`, and `sql_execution`. Required when user intent is warehouse-specific, including wording like "in our warehouse" or "this warehouse": `memory_ingest`; without `connectionId`, the memory agent cannot update the semantic layer and the knowledge lands as wiki-only. Pass `connectionId` when intent pins a warehouse, otherwise omit for unscoped discovery: `sl_query`, `discover_data`, and `dictionary_search`. Never pass `connectionId` to `connection_list`, `wiki_search`, `wiki_read`, or `memory_ingest_status`. If intent is ambiguous for a required-or-scoped tool, ask the user which warehouse before calling.
- `connectionId` scoping when `connection_list` shows multiple connections:
- Always pass it: `entity_details`, `sl_read_source`, `sql_execution`.
- Pass it when intent pins a warehouse, otherwise omit for unscoped discovery: `sl_query`, `discover_data`, `dictionary_search`.
- `memory_ingest`: pass it for warehouse-specific knowledge (e.g. "in our warehouse"); without it the memory lands as wiki-only and cannot update the semantic layer.
- Never pass it: `connection_list`, `wiki_search`, `wiki_read`, `memory_ingest_status`.
- If scoping is required but intent is ambiguous, ask which warehouse before calling.
- Show compact result tables for small outputs. For broad results, summarize the top findings and mention the applied limit.
- Ask a concise clarification only when the metric, date range, entity, or grain is genuinely ambiguous and cannot be inferred from context.
</rules>