ktx/packages/context/src/daemon/semantic-layer-compute.ts

313 lines
10 KiB
TypeScript
Raw Normal View History

2026-05-10 23:12:26 +02:00
import { request as httpRequest } from 'node:http';
import { request as httpsRequest } from 'node:https';
import { URL } from 'node:url';
import { spawn } from 'node:child_process';
feat(setup): add Claude Desktop target and MCP-first agent setup (#114) * feat(setup): add Claude Desktop target and MCP-first agent setup Adds `ktx mcp stdio` and a `claude-desktop` setup target that generates a local plugin ZIP wiring the analytics skill and a stdio MCP config. Replaces the CLI-only agent install mode with MCP+analytics (default) and an optional admin CLI skill, renames the research skill to analytics, and lets interactive setup pick project vs global scope when every target supports it. Extracts a shared MCP server factory used by both HTTP and stdio entrypoints. * Add MCP agent client setup support * Polish setup output formatting * Add MCP tool polish design spec Design for slimming the MCP-registered surface from 25 to 11 tools, introducing memory_ingest, applying the per-tool polish kit (annotations, outputSchema, .describe(), in-band error wrapping, union-drift fixes, type-narrowed jsonToolResult), emitting progress notifications on sql_execution + sl_query, and refining the ktx-analytics SKILL.md to match. * Refine MCP tool polish design spec after adversarial review iteration 1 * Refine MCP tool polish design spec after adversarial review iteration 2 * Refine MCP tool polish design spec after adversarial review iteration 3 * refactor(context): rename memory capture service to ingest * feat(mcp): slim research tool surface * refactor(mcp): remove admin ports from server factory * refactor(cli): rename text ingest memory port * docs: update analytics skill for memory ingest * chore: verify mcp surface rename * Add MCP tool polish v1 surface change plan * feat(context): polish mcp tool metadata * fix(context): enforce resolved semantic layer compute sources * feat(context): emit mcp query progress stages * fix(context): keep mcp progress event internal * Add MCP tool polish v1 metadata & progress plan * Fix CI snapshot and docs checks
2026-05-16 11:39:55 +02:00
import type { ResolvedSemanticLayerSource, SemanticLayerQueryInput } from '../sl/types.js';
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerComputeQueryResult {
2026-05-10 23:12:26 +02:00
sql: string;
dialect: string;
columns: Array<Record<string, unknown>>;
plan: Record<string, unknown>;
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerComputeValidationResult {
2026-05-10 23:12:26 +02:00
valid: boolean;
errors: string[];
warnings: string[];
perSourceWarnings: Record<string, string[]>;
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerSourceGenerationColumnInput {
2026-05-10 23:12:26 +02:00
name: string;
type: string;
primaryKey?: boolean;
nullable?: boolean;
comment?: string | null;
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerSourceGenerationTableInput {
2026-05-10 23:12:26 +02:00
name: string;
catalog?: string | null;
db?: string | null;
comment?: string | null;
2026-05-10 23:51:24 +02:00
columns: KtxSemanticLayerSourceGenerationColumnInput[];
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerSourceGenerationLinkInput {
2026-05-10 23:12:26 +02:00
fromTable: string;
fromColumn: string;
toTable: string;
toColumn: string;
relationshipType: string;
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerSourceGenerationInput {
tables: KtxSemanticLayerSourceGenerationTableInput[];
links: KtxSemanticLayerSourceGenerationLinkInput[];
2026-05-10 23:12:26 +02:00
dialect?: string;
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerSourceGenerationResult {
2026-05-10 23:12:26 +02:00
sources: Array<Record<string, unknown>>;
sourceCount: number;
}
2026-05-10 23:51:24 +02:00
export interface KtxSemanticLayerComputePort {
feat(setup): add Claude Desktop target and MCP-first agent setup (#114) * feat(setup): add Claude Desktop target and MCP-first agent setup Adds `ktx mcp stdio` and a `claude-desktop` setup target that generates a local plugin ZIP wiring the analytics skill and a stdio MCP config. Replaces the CLI-only agent install mode with MCP+analytics (default) and an optional admin CLI skill, renames the research skill to analytics, and lets interactive setup pick project vs global scope when every target supports it. Extracts a shared MCP server factory used by both HTTP and stdio entrypoints. * Add MCP agent client setup support * Polish setup output formatting * Add MCP tool polish design spec Design for slimming the MCP-registered surface from 25 to 11 tools, introducing memory_ingest, applying the per-tool polish kit (annotations, outputSchema, .describe(), in-band error wrapping, union-drift fixes, type-narrowed jsonToolResult), emitting progress notifications on sql_execution + sl_query, and refining the ktx-analytics SKILL.md to match. * Refine MCP tool polish design spec after adversarial review iteration 1 * Refine MCP tool polish design spec after adversarial review iteration 2 * Refine MCP tool polish design spec after adversarial review iteration 3 * refactor(context): rename memory capture service to ingest * feat(mcp): slim research tool surface * refactor(mcp): remove admin ports from server factory * refactor(cli): rename text ingest memory port * docs: update analytics skill for memory ingest * chore: verify mcp surface rename * Add MCP tool polish v1 surface change plan * feat(context): polish mcp tool metadata * fix(context): enforce resolved semantic layer compute sources * feat(context): emit mcp query progress stages * fix(context): keep mcp progress event internal * Add MCP tool polish v1 metadata & progress plan * Fix CI snapshot and docs checks
2026-05-16 11:39:55 +02:00
/**
* Callers must pass sources sanitized through toResolvedWire. The Python
* daemon rejects authoring-only fields such as usage and inherits_columns_from.
*/
2026-05-10 23:12:26 +02:00
query(input: {
feat(setup): add Claude Desktop target and MCP-first agent setup (#114) * feat(setup): add Claude Desktop target and MCP-first agent setup Adds `ktx mcp stdio` and a `claude-desktop` setup target that generates a local plugin ZIP wiring the analytics skill and a stdio MCP config. Replaces the CLI-only agent install mode with MCP+analytics (default) and an optional admin CLI skill, renames the research skill to analytics, and lets interactive setup pick project vs global scope when every target supports it. Extracts a shared MCP server factory used by both HTTP and stdio entrypoints. * Add MCP agent client setup support * Polish setup output formatting * Add MCP tool polish design spec Design for slimming the MCP-registered surface from 25 to 11 tools, introducing memory_ingest, applying the per-tool polish kit (annotations, outputSchema, .describe(), in-band error wrapping, union-drift fixes, type-narrowed jsonToolResult), emitting progress notifications on sql_execution + sl_query, and refining the ktx-analytics SKILL.md to match. * Refine MCP tool polish design spec after adversarial review iteration 1 * Refine MCP tool polish design spec after adversarial review iteration 2 * Refine MCP tool polish design spec after adversarial review iteration 3 * refactor(context): rename memory capture service to ingest * feat(mcp): slim research tool surface * refactor(mcp): remove admin ports from server factory * refactor(cli): rename text ingest memory port * docs: update analytics skill for memory ingest * chore: verify mcp surface rename * Add MCP tool polish v1 surface change plan * feat(context): polish mcp tool metadata * fix(context): enforce resolved semantic layer compute sources * feat(context): emit mcp query progress stages * fix(context): keep mcp progress event internal * Add MCP tool polish v1 metadata & progress plan * Fix CI snapshot and docs checks
2026-05-16 11:39:55 +02:00
sources: ResolvedSemanticLayerSource[];
2026-05-10 23:12:26 +02:00
query: SemanticLayerQueryInput;
dialect: string;
2026-05-10 23:51:24 +02:00
}): Promise<KtxSemanticLayerComputeQueryResult>;
feat(setup): add Claude Desktop target and MCP-first agent setup (#114) * feat(setup): add Claude Desktop target and MCP-first agent setup Adds `ktx mcp stdio` and a `claude-desktop` setup target that generates a local plugin ZIP wiring the analytics skill and a stdio MCP config. Replaces the CLI-only agent install mode with MCP+analytics (default) and an optional admin CLI skill, renames the research skill to analytics, and lets interactive setup pick project vs global scope when every target supports it. Extracts a shared MCP server factory used by both HTTP and stdio entrypoints. * Add MCP agent client setup support * Polish setup output formatting * Add MCP tool polish design spec Design for slimming the MCP-registered surface from 25 to 11 tools, introducing memory_ingest, applying the per-tool polish kit (annotations, outputSchema, .describe(), in-band error wrapping, union-drift fixes, type-narrowed jsonToolResult), emitting progress notifications on sql_execution + sl_query, and refining the ktx-analytics SKILL.md to match. * Refine MCP tool polish design spec after adversarial review iteration 1 * Refine MCP tool polish design spec after adversarial review iteration 2 * Refine MCP tool polish design spec after adversarial review iteration 3 * refactor(context): rename memory capture service to ingest * feat(mcp): slim research tool surface * refactor(mcp): remove admin ports from server factory * refactor(cli): rename text ingest memory port * docs: update analytics skill for memory ingest * chore: verify mcp surface rename * Add MCP tool polish v1 surface change plan * feat(context): polish mcp tool metadata * fix(context): enforce resolved semantic layer compute sources * feat(context): emit mcp query progress stages * fix(context): keep mcp progress event internal * Add MCP tool polish v1 metadata & progress plan * Fix CI snapshot and docs checks
2026-05-16 11:39:55 +02:00
/**
* Callers must pass sources sanitized through toResolvedWire. The Python
* daemon rejects authoring-only fields such as usage and inherits_columns_from.
*/
2026-05-10 23:12:26 +02:00
validateSources(input: {
feat(setup): add Claude Desktop target and MCP-first agent setup (#114) * feat(setup): add Claude Desktop target and MCP-first agent setup Adds `ktx mcp stdio` and a `claude-desktop` setup target that generates a local plugin ZIP wiring the analytics skill and a stdio MCP config. Replaces the CLI-only agent install mode with MCP+analytics (default) and an optional admin CLI skill, renames the research skill to analytics, and lets interactive setup pick project vs global scope when every target supports it. Extracts a shared MCP server factory used by both HTTP and stdio entrypoints. * Add MCP agent client setup support * Polish setup output formatting * Add MCP tool polish design spec Design for slimming the MCP-registered surface from 25 to 11 tools, introducing memory_ingest, applying the per-tool polish kit (annotations, outputSchema, .describe(), in-band error wrapping, union-drift fixes, type-narrowed jsonToolResult), emitting progress notifications on sql_execution + sl_query, and refining the ktx-analytics SKILL.md to match. * Refine MCP tool polish design spec after adversarial review iteration 1 * Refine MCP tool polish design spec after adversarial review iteration 2 * Refine MCP tool polish design spec after adversarial review iteration 3 * refactor(context): rename memory capture service to ingest * feat(mcp): slim research tool surface * refactor(mcp): remove admin ports from server factory * refactor(cli): rename text ingest memory port * docs: update analytics skill for memory ingest * chore: verify mcp surface rename * Add MCP tool polish v1 surface change plan * feat(context): polish mcp tool metadata * fix(context): enforce resolved semantic layer compute sources * feat(context): emit mcp query progress stages * fix(context): keep mcp progress event internal * Add MCP tool polish v1 metadata & progress plan * Fix CI snapshot and docs checks
2026-05-16 11:39:55 +02:00
sources: ResolvedSemanticLayerSource[];
2026-05-10 23:12:26 +02:00
dialect: string;
recentlyTouched?: string[];
2026-05-10 23:51:24 +02:00
}): Promise<KtxSemanticLayerComputeValidationResult>;
generateSources(input: KtxSemanticLayerSourceGenerationInput): Promise<KtxSemanticLayerSourceGenerationResult>;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export type KtxDaemonCommand = 'semantic-query' | 'semantic-validate' | 'semantic-generate-sources';
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
export type KtxDaemonJsonRunner = (
subcommand: KtxDaemonCommand,
2026-05-10 23:12:26 +02:00
payload: Record<string, unknown>,
) => Promise<Record<string, unknown>>;
2026-05-10 23:51:24 +02:00
export type KtxDaemonHttpJsonRunner = (path: string, payload: Record<string, unknown>) => Promise<Record<string, unknown>>;
2026-05-10 23:12:26 +02:00
export interface PythonSemanticLayerComputeOptions {
command?: string;
args?: string[];
cwd?: string;
env?: NodeJS.ProcessEnv;
2026-05-10 23:51:24 +02:00
runJson?: KtxDaemonJsonRunner;
2026-05-10 23:12:26 +02:00
}
export interface HttpSemanticLayerComputeOptions {
baseUrl: string;
2026-05-10 23:51:24 +02:00
requestJson?: KtxDaemonHttpJsonRunner;
2026-05-10 23:12:26 +02:00
}
function parseJsonObject(raw: string, subcommand: string): Record<string, unknown> {
const parsed = JSON.parse(raw) as unknown;
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
2026-05-10 23:51:24 +02:00
throw new Error(`ktx-daemon ${subcommand} returned non-object JSON`);
2026-05-10 23:12:26 +02:00
}
return parsed as Record<string, unknown>;
}
function runProcessJson(
options: Required<Pick<PythonSemanticLayerComputeOptions, 'command' | 'args'>> &
Pick<PythonSemanticLayerComputeOptions, 'cwd' | 'env'>,
2026-05-10 23:51:24 +02:00
): KtxDaemonJsonRunner {
return async (subcommand: KtxDaemonCommand, payload: Record<string, unknown>): Promise<Record<string, unknown>> =>
2026-05-10 23:12:26 +02:00
new Promise((resolve, reject) => {
const child = spawn(options.command, [...options.args, subcommand], {
cwd: options.cwd,
env: { ...process.env, ...options.env },
stdio: ['pipe', 'pipe', 'pipe'],
});
const stdout: Buffer[] = [];
const stderr: Buffer[] = [];
child.stdout.on('data', (chunk: Buffer) => stdout.push(chunk));
child.stderr.on('data', (chunk: Buffer) => stderr.push(chunk));
child.on('error', reject);
child.on('close', (code) => {
const stdoutText = Buffer.concat(stdout).toString('utf8').trim();
const stderrText = Buffer.concat(stderr).toString('utf8').trim();
if (code !== 0) {
2026-05-10 23:51:24 +02:00
reject(new Error(`ktx-daemon ${subcommand} failed: ${stderrText || `exit code ${code}`}`));
2026-05-10 23:12:26 +02:00
return;
}
try {
resolve(parseJsonObject(stdoutText, subcommand));
} catch (error) {
reject(error);
}
});
child.stdin.end(`${JSON.stringify(payload)}\n`);
});
}
function normalizedBaseUrl(baseUrl: string): string {
return baseUrl.endsWith('/') ? baseUrl : `${baseUrl}/`;
}
2026-05-10 23:51:24 +02:00
function postJson(baseUrl: string): KtxDaemonHttpJsonRunner {
2026-05-10 23:12:26 +02:00
return async (path, payload) =>
new Promise((resolve, reject) => {
const target = new URL(path.replace(/^\//, ''), normalizedBaseUrl(baseUrl));
const body = JSON.stringify(payload);
const client = target.protocol === 'https:' ? httpsRequest : httpRequest;
const request = client(
target,
{
method: 'POST',
headers: {
accept: 'application/json',
'content-type': 'application/json',
'content-length': Buffer.byteLength(body),
},
},
(response) => {
const chunks: Buffer[] = [];
response.on('data', (chunk: Buffer) => chunks.push(chunk));
response.on('end', () => {
const text = Buffer.concat(chunks).toString('utf8');
const statusCode = response.statusCode ?? 0;
if (statusCode < 200 || statusCode >= 300) {
2026-05-10 23:51:24 +02:00
reject(new Error(`ktx-daemon HTTP ${path} failed with ${statusCode}: ${text}`));
2026-05-10 23:12:26 +02:00
return;
}
try {
resolve(parseJsonObject(text, path));
} catch (error) {
reject(error);
}
});
},
);
request.on('error', reject);
request.end(body);
});
}
function stringArray(value: unknown): string[] {
return Array.isArray(value) ? value.filter((item): item is string => typeof item === 'string') : [];
}
function recordValue(value: unknown): Record<string, unknown> {
return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record<string, unknown>) : {};
}
function recordArray(value: unknown): Array<Record<string, unknown>> {
return Array.isArray(value)
? value.filter(
(item): item is Record<string, unknown> => item !== null && typeof item === 'object' && !Array.isArray(item),
)
: [];
}
2026-05-10 23:51:24 +02:00
function sourceGenerationPayload(input: KtxSemanticLayerSourceGenerationInput): Record<string, unknown> {
2026-05-10 23:12:26 +02:00
return {
tables: input.tables.map((table) => ({
name: table.name,
...(table.catalog !== undefined ? { catalog: table.catalog } : {}),
...(table.db !== undefined ? { db: table.db } : {}),
...(table.comment !== undefined ? { comment: table.comment } : {}),
columns: table.columns.map((column) => ({
name: column.name,
type: column.type,
...(column.primaryKey !== undefined ? { primary_key: column.primaryKey } : {}),
...(column.nullable !== undefined ? { nullable: column.nullable } : {}),
...(column.comment !== undefined ? { comment: column.comment } : {}),
})),
})),
links: input.links.map((link) => ({
from_table: link.fromTable,
from_column: link.fromColumn,
to_table: link.toTable,
to_column: link.toColumn,
relationship_type: link.relationshipType,
})),
dialect: input.dialect ?? 'postgres',
};
}
2026-05-10 23:51:24 +02:00
function sourceGenerationResult(raw: Record<string, unknown>): KtxSemanticLayerSourceGenerationResult {
2026-05-10 23:12:26 +02:00
return {
sources: recordArray(raw.sources),
sourceCount: typeof raw.source_count === 'number' ? raw.source_count : recordArray(raw.sources).length,
};
}
export function createPythonSemanticLayerComputePort(
options: PythonSemanticLayerComputeOptions = {},
2026-05-10 23:51:24 +02:00
): KtxSemanticLayerComputePort {
2026-05-10 23:12:26 +02:00
const command = options.command ?? 'python';
2026-05-10 23:51:24 +02:00
const args = options.args ?? ['-m', 'ktx_daemon'];
2026-05-10 23:12:26 +02:00
const runJson = options.runJson ?? runProcessJson({ command, args, cwd: options.cwd, env: options.env });
return {
async query(input) {
const raw = await runJson('semantic-query', {
sources: input.sources,
dialect: input.dialect,
query: input.query,
});
return {
sql: typeof raw.sql === 'string' ? raw.sql : '',
dialect: typeof raw.dialect === 'string' ? raw.dialect : input.dialect,
columns: recordArray(raw.columns),
plan: recordValue(raw.plan),
};
},
async validateSources(input) {
const raw = await runJson('semantic-validate', {
sources: input.sources,
dialect: input.dialect,
recently_touched: input.recentlyTouched,
});
return {
valid: raw.valid === true,
errors: stringArray(raw.errors),
warnings: stringArray(raw.warnings),
perSourceWarnings: recordValue(raw.per_source_warnings) as Record<string, string[]>,
};
},
async generateSources(input) {
const raw = await runJson('semantic-generate-sources', sourceGenerationPayload(input));
return sourceGenerationResult(raw);
},
};
}
export function createHttpSemanticLayerComputePort(
options: HttpSemanticLayerComputeOptions,
2026-05-10 23:51:24 +02:00
): KtxSemanticLayerComputePort {
2026-05-10 23:12:26 +02:00
const requestJson = options.requestJson ?? postJson(options.baseUrl);
return {
async query(input) {
const raw = await requestJson('/semantic-layer/query', {
sources: input.sources,
dialect: input.dialect,
query: input.query,
});
return {
sql: typeof raw.sql === 'string' ? raw.sql : '',
dialect: typeof raw.dialect === 'string' ? raw.dialect : input.dialect,
columns: recordArray(raw.columns),
plan: recordValue(raw.plan),
};
},
async validateSources(input) {
const raw = await requestJson('/semantic-layer/validate', {
sources: input.sources,
dialect: input.dialect,
recently_touched: input.recentlyTouched,
});
return {
valid: raw.valid === true,
errors: stringArray(raw.errors),
warnings: stringArray(raw.warnings),
perSourceWarnings: recordValue(raw.per_source_warnings) as Record<string, string[]>,
};
},
async generateSources(input) {
const raw = await requestJson('/semantic-layer/generate-sources', sourceGenerationPayload(input));
return sourceGenerationResult(raw);
},
};
}