Merge origin/main into copy-claude-code-backend-spec

This commit is contained in:
Andrey Avtomonov 2026-05-16 11:46:18 +02:00
commit 7287e4907b
59 changed files with 8221 additions and 3159 deletions

View file

@ -2,7 +2,7 @@ import { request as httpRequest } from 'node:http';
import { request as httpsRequest } from 'node:https';
import { URL } from 'node:url';
import { spawn } from 'node:child_process';
import type { SemanticLayerQueryInput, SemanticLayerSource } from '../sl/index.js';
import type { ResolvedSemanticLayerSource, SemanticLayerQueryInput } from '../sl/types.js';
export interface KtxSemanticLayerComputeQueryResult {
sql: string;
@ -54,13 +54,21 @@ export interface KtxSemanticLayerSourceGenerationResult {
}
export interface KtxSemanticLayerComputePort {
/**
* Callers must pass sources sanitized through toResolvedWire. The Python
* daemon rejects authoring-only fields such as usage and inherits_columns_from.
*/
query(input: {
sources: Array<Record<string, unknown> | SemanticLayerSource>;
sources: ResolvedSemanticLayerSource[];
query: SemanticLayerQueryInput;
dialect: string;
}): Promise<KtxSemanticLayerComputeQueryResult>;
/**
* Callers must pass sources sanitized through toResolvedWire. The Python
* daemon rejects authoring-only fields such as usage and inherits_columns_from.
*/
validateSources(input: {
sources: Array<Record<string, unknown> | SemanticLayerSource>;
sources: ResolvedSemanticLayerSource[];
dialect: string;
recentlyTouched?: string[];
}): Promise<KtxSemanticLayerComputeValidationResult>;

View file

@ -191,6 +191,36 @@ describe('local KTX embedding config', () => {
expect(resolveLocalKtxEmbeddingConfig(config, {})).toBeNull();
});
it('returns null when backend is openai but no apiKey is resolvable from env', () => {
const config: KtxProjectEmbeddingConfig = {
backend: 'openai',
model: 'text-embedding-3-small',
dimensions: 1536,
openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret
};
expect(resolveLocalKtxEmbeddingConfig(config, {})).toBeNull();
});
it('resolves openai embedding config from env', () => {
const config: KtxProjectEmbeddingConfig = {
backend: 'openai',
model: 'text-embedding-3-small',
dimensions: 1536,
openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret
};
expect(
resolveLocalKtxEmbeddingConfig(config, { OPENAI_API_KEY: 'sk-test' }), // pragma: allowlist secret
).toEqual({
backend: 'openai',
model: 'text-embedding-3-small',
dimensions: 1536,
openai: { apiKey: 'sk-test' }, // pragma: allowlist secret
batchSize: undefined,
});
});
it('constructs deterministic embeddings from the default project config', () => {
const createKtxEmbeddingProvider = vi.fn(() => ({}) as never);
const provider = createLocalKtxEmbeddingProviderFromConfig(

View file

@ -177,11 +177,23 @@ export function resolveLocalKtxEmbeddingConfig(
batchSize: config.batchSize,
};
}
if (config.backend === 'openai') {
const openai = resolvedProviderConfig(config.openai, env);
if (!openai?.apiKey) {
return null;
}
return {
backend: config.backend,
model: config.model ?? 'deterministic',
dimensions: config.dimensions,
openai,
batchSize: config.batchSize,
};
}
return {
backend: config.backend,
model: config.model ?? 'deterministic',
dimensions: config.dimensions,
...(resolvedProviderConfig(config.openai, env) ? { openai: resolvedProviderConfig(config.openai, env) } : {}),
...(config.sentenceTransformers
? {
sentenceTransformers: {

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -8,29 +8,18 @@ export type {
KtxDiscoverDataMcpPort,
KtxDictionarySearchMcpPort,
KtxEntityDetailsMcpPort,
KtxIngestDiffSummary,
KtxIngestMcpPort,
KtxIngestStatusResponse,
KtxIngestTriggerKind,
KtxIngestTriggerResponse,
KtxIngestWorkUnitSummary,
KtxKnowledgeMcpPort,
KtxKnowledgePage,
KtxKnowledgeSearchResponse,
KtxKnowledgeSearchResult,
KtxKnowledgeWriteResponse,
KtxMcpContextPorts,
KtxMcpServerDeps,
KtxMcpServerLike,
KtxMcpTextContent,
KtxMcpToolResult,
KtxMcpUserContext,
KtxSemanticLayerListResponse,
KtxSemanticLayerMcpPort,
KtxSemanticLayerQueryResponse,
KtxSemanticLayerReadResponse,
KtxSemanticLayerSourceSummary,
KtxSemanticLayerValidationResponse,
KtxSemanticLayerWriteResponse,
MemoryCapturePort,
MemoryIngestPort,
} from './types.js';

File diff suppressed because it is too large Load diff

View file

@ -1,65 +1,19 @@
import YAML from 'yaml';
import {
type KtxSqlQueryExecutorPort,
localConnectionInfoFromConfig,
localConnectionTypeForConfig,
} from '../connections/index.js';
import { type KtxSqlQueryExecutorPort, localConnectionInfoFromConfig } from '../connections/index.js';
import type { KtxEmbeddingPort } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import {
createDefaultLocalIngestAdapters,
getLocalIngestStatus,
type IngestReportSnapshot,
ingestReportToMemoryFlowReplay,
type LocalIngestMcpOptions,
runLocalIngest,
runLocalMetabaseIngest,
} from '../ingest/index.js';
import { createLocalKtxEmbeddingProviderFromConfig, KtxIngestEmbeddingPortAdapter } from '../llm/index.js';
import type { KtxLocalProject } from '../project/index.js';
import {
createKtxEntityDetailsService,
getLocalScanReport,
getLocalScanStatus,
type KtxConnectionDriver,
type KtxScanConnector,
type KtxScanReport,
type LocalScanMcpOptions,
runLocalScan,
} from '../scan/index.js';
import { createKtxEntityDetailsService, type KtxScanConnector, type LocalScanMcpOptions } from '../scan/index.js';
import { createKtxDiscoverDataService } from '../search/index.js';
import type { SqlAnalysisDialect, SqlAnalysisPort } from '../sql-analysis/index.js';
import {
compileLocalSlQuery,
createKtxDictionarySearchService,
type LocalSlSourceSearchResult,
type LocalSlSourceSummary,
listLocalSlSources,
searchLocalSlSources,
sourceDefinitionSchema,
sourceOverlaySchema,
} from '../sl/index.js';
import { readLocalKnowledgePage, searchLocalKnowledgePages, writeLocalKnowledgePage } from '../wiki/local-knowledge.js';
import type {
KtxConnectionTestResponse,
KtxIngestStatusResponse,
KtxMcpContextPorts,
KtxScanArtifactListResponse,
KtxScanArtifactReadResponse,
KtxScanArtifactSummary,
KtxScanArtifactType,
KtxSqlExecutionResponse,
} from './types.js';
const LOCAL_AUTHOR = 'ktx';
const LOCAL_AUTHOR_EMAIL = 'ktx@example.com';
const SL_SHAPE_WARNING = 'Local stdio validation checks YAML shape only; Python semantic validation is not configured.';
import { compileLocalSlQuery, createKtxDictionarySearchService } from '../sl/index.js';
import { readLocalKnowledgePage, searchLocalKnowledgePages } from '../wiki/local-knowledge.js';
import type { KtxMcpContextPorts, KtxMcpProgressCallback, KtxSqlExecutionResponse } from './types.js';
interface CreateLocalProjectMcpContextPortsOptions {
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: KtxSqlQueryExecutorPort;
sqlAnalysis?: SqlAnalysisPort;
localIngest?: LocalIngestMcpOptions;
localScan?: LocalScanMcpOptions;
embeddingService?: KtxEmbeddingPort | null;
}
@ -115,284 +69,23 @@ function assertSafeSourceName(sourceName: string): string {
return assertSafePathToken('semantic-layer source name', sourceName);
}
function normalizeScanDriver(driver: string | undefined): KtxConnectionDriver {
const normalized = (driver ?? '').toLowerCase();
if (
normalized === 'postgres' ||
normalized === 'postgresql' ||
normalized === 'sqlite' ||
normalized === 'sqlite3' ||
normalized === 'mysql' ||
normalized === 'clickhouse' ||
normalized === 'sqlserver' ||
normalized === 'bigquery' ||
normalized === 'snowflake'
) {
return normalized === 'sqlite3' ? 'sqlite' : normalized;
}
return 'postgres';
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
if (connector?.cleanup) {
await connector.cleanup();
}
}
async function testLocalConnection(
project: KtxLocalProject,
options: CreateLocalProjectMcpContextPortsOptions,
connectionId: string,
): Promise<KtxConnectionTestResponse | null> {
const safeConnectionId = assertSafeConnectionId(connectionId);
const connection = project.config.connections[safeConnectionId];
if (!connection) {
return null;
}
const connectionType = localConnectionTypeForConfig(safeConnectionId, connection);
const createConnector = options.localScan?.createConnector;
if (!createConnector) {
return {
id: safeConnectionId,
connectionType,
ok: true,
tableCount: null,
message: 'Connection is configured; no native scan connector is available for live testing.',
warnings: ['ktx serve was not configured with a local scan connector factory.'],
};
}
let connector: KtxScanConnector | null = null;
try {
connector = await createConnector(safeConnectionId);
const snapshot = await connector.introspect(
{
connectionId: safeConnectionId,
driver: normalizeScanDriver(connection.driver),
mode: 'structural',
dryRun: true,
detectRelationships: false,
},
{ runId: `connection-test-${safeConnectionId}` },
);
return {
id: safeConnectionId,
connectionType,
ok: true,
tableCount: snapshot.tables.length,
message: 'Connection test passed.',
warnings: [],
};
} catch (error) {
return {
id: safeConnectionId,
connectionType,
ok: false,
tableCount: null,
message: error instanceof Error ? error.message : String(error),
warnings: [],
};
} finally {
await cleanupConnector(connector);
}
}
function scanArtifactType(path: string, report: KtxScanReport): KtxScanArtifactType {
if (path === report.artifactPaths.reportPath) {
return 'report';
}
if (report.artifactPaths.manifestShards.includes(path)) {
return 'manifest_shard';
}
if (report.artifactPaths.enrichmentArtifacts.includes(path)) {
return 'enrichment_artifact';
}
return 'raw_source';
}
async function artifactSize(project: KtxLocalProject, path: string): Promise<number | undefined> {
try {
const result = await project.fileStore.readFile(path);
return typeof result.size === 'number' ? result.size : undefined;
} catch {
return undefined;
}
}
async function listArtifactsForReport(
project: KtxLocalProject,
runId: string,
report: KtxScanReport,
): Promise<KtxScanArtifactListResponse> {
const paths = new Set<string>();
if (report.artifactPaths.rawSourcesDir) {
const listed = await project.fileStore.listFiles(report.artifactPaths.rawSourcesDir);
for (const file of listed.files) {
paths.add(file);
}
}
if (report.artifactPaths.reportPath) {
paths.add(report.artifactPaths.reportPath);
}
for (const path of report.artifactPaths.manifestShards) {
paths.add(path);
}
for (const path of report.artifactPaths.enrichmentArtifacts) {
paths.add(path);
}
const artifacts: KtxScanArtifactSummary[] = [];
for (const path of [...paths].sort()) {
const size = await artifactSize(project, path);
artifacts.push({
path,
type: scanArtifactType(path, report),
...(size === undefined ? {} : { size }),
});
}
return { runId, artifacts };
}
async function readScanArtifact(
project: KtxLocalProject,
runId: string,
path: string,
): Promise<KtxScanArtifactReadResponse | null> {
const report = await getLocalScanReport(project, runId);
if (!report) {
return null;
}
const listed = await listArtifactsForReport(project, runId, report);
const artifact = listed.artifacts.find((candidate) => candidate.path === path);
if (!artifact) {
return null;
}
const result = await project.fileStore.readFile(path);
return {
runId,
path,
type: artifact.type,
...(typeof result.size === 'number' ? { size: result.size } : {}),
content: result.content,
};
}
function slPath(connectionId: string, sourceName: string): string {
return `semantic-layer/${assertSafeConnectionId(connectionId)}/${assertSafeSourceName(sourceName)}.yaml`;
}
function sourceNameFromPath(path: string): string {
return (
path
.split('/')
.at(-1)
?.replace(/\.ya?ml$/, '') ?? path
);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function parseYamlRecord(raw: string): Record<string, unknown> {
const parsed = YAML.parse(raw) as unknown;
if (!isRecord(parsed)) {
throw new Error('Semantic-layer source YAML must contain an object');
}
return parsed;
}
async function listSlPaths(project: KtxLocalProject, connectionId?: string): Promise<string[]> {
const root = connectionId ? `semantic-layer/${assertSafeConnectionId(connectionId)}` : 'semantic-layer';
const listed = await project.fileStore.listFiles(root);
return listed.files.filter((file) => file.endsWith('.yaml') || file.endsWith('.yml')).sort();
}
async function loadComputableSources(
project: KtxLocalProject,
connectionId: string,
): Promise<Record<string, unknown>[]> {
const paths = await listSlPaths(project, connectionId);
const sources: Record<string, unknown>[] = [];
for (const path of paths) {
const raw = await project.fileStore.readFile(path);
const source = parseYamlRecord(raw.content);
if (source.table || source.sql) {
sources.push(source);
}
}
return sources;
}
function validateSourceRecord(sourceName: string, source: Record<string, unknown>): string[] {
const namedSource = { ...source, name: typeof source.name === 'string' ? source.name : sourceName };
const definition = sourceDefinitionSchema.safeParse(namedSource);
if (definition.success) {
return [];
}
const overlay = sourceOverlaySchema.safeParse(namedSource);
if (overlay.success) {
return [];
}
return definition.error.issues.map((issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`);
}
function localIngestSourceDir(config: unknown): string | undefined {
if (!isRecord(config) || config.sourceDir === undefined) {
return undefined;
}
if (typeof config.sourceDir !== 'string' || config.sourceDir.trim().length === 0) {
throw new Error('Local ingest config sourceDir must be a non-empty string when provided');
}
return config.sourceDir;
}
function rawFileCountFromIngestReport(report: IngestReportSnapshot): number {
return new Set(report.body.workUnits.flatMap((workUnit) => workUnit.rawFiles)).size;
}
function hasSlSearchMetadata(
source: LocalSlSourceSummary | LocalSlSourceSearchResult,
): source is LocalSlSourceSearchResult {
return 'score' in source;
}
function statusFromIngestReport(report: IngestReportSnapshot): KtxIngestStatusResponse {
const failedWorkUnits = report.body.failedWorkUnits;
return {
runId: report.runId,
jobId: report.jobId,
reportId: report.id,
status: failedWorkUnits.length > 0 ? 'error' : 'done',
stage: 'done',
progress: 1,
errors: failedWorkUnits,
done: true,
adapter: report.sourceKey,
connectionId: report.connectionId,
sourceDir: null,
syncId: report.body.syncId,
startedAt: report.createdAt,
completedAt: report.createdAt,
previousRunId: null,
diffSummary: report.body.diffSummary,
workUnitCount: report.body.workUnits.length,
rawFileCount: rawFileCountFromIngestReport(report),
workUnits: report.body.workUnits.map((workUnit) => ({
unitKey: workUnit.unitKey,
rawFiles: [...workUnit.rawFiles],
peerFileIndex: [],
dependencyPaths: [],
})),
evictionDeletedRawPaths: [...report.body.evictionInputs],
};
}
async function executeValidatedReadOnlySql(
project: KtxLocalProject,
options: CreateLocalProjectMcpContextPortsOptions,
input: { connectionId: string; sql: string; maxRows: number },
onProgress?: KtxMcpProgressCallback,
): Promise<KtxSqlExecutionResponse> {
await onProgress?.({ progress: 0, message: 'Validating SQL' });
const connectionId = assertSafeConnectionId(input.connectionId);
const connection = project.config.connections[connectionId];
if (!connection) {
@ -416,6 +109,7 @@ async function executeValidatedReadOnlySql(
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(`Connection "${connectionId}" does not support read-only SQL execution.`);
}
await onProgress?.({ progress: 0.3, message: 'Executing' });
const result = await connector.executeReadOnly(
{
connectionId,
@ -424,12 +118,14 @@ async function executeValidatedReadOnlySql(
},
{ runId: 'mcp-sql-execution' },
);
return {
const response = {
headers: result.headers,
...(result.headerTypes ? { headerTypes: result.headerTypes } : {}),
rows: result.rows,
rowCount: result.rowCount ?? result.rows.length,
};
await onProgress?.({ progress: 1, message: `Fetched ${response.rowCount} rows` });
return response;
} finally {
await cleanupConnector(connector);
}
@ -453,9 +149,6 @@ export function createLocalProjectMcpContextPorts(
)
.sort((a, b) => a.id.localeCompare(b.id));
},
async test(input) {
return testLocalConnection(project, options, input.connectionId);
},
},
knowledge: {
async search(input) {
@ -495,58 +188,8 @@ export function createLocalProjectMcpContextPorts(
}
: null;
},
async write(input) {
const existing = await readLocalKnowledgePage(project, {
key: input.key,
userId: input.userId,
});
await writeLocalKnowledgePage(project, {
key: input.key,
scope: 'GLOBAL',
userId: input.userId,
summary: input.summary,
content: input.content,
tags: input.tags,
refs: input.refs,
slRefs: input.slRefs,
source: input.source,
intent: input.intent,
tables: input.tables,
representativeSql: input.representativeSql,
usage: input.usage,
fingerprints: input.fingerprints,
});
return { success: true, key: input.key, action: existing ? 'updated' : 'created' };
},
},
semanticLayer: {
async listSources(input) {
const listed: Array<LocalSlSourceSummary | LocalSlSourceSearchResult> = input.query
? await searchLocalSlSources(project, {
connectionId: input.connectionId,
query: input.query,
embeddingService,
})
: await listLocalSlSources(project, { connectionId: input.connectionId });
const sources = listed.map((source) => ({
connectionId: source.connectionId,
connectionName: source.connectionId,
name: source.name,
description: source.description,
columnCount: source.columnCount,
measureCount: source.measureCount,
joinCount: source.joinCount,
...(hasSlSearchMetadata(source) && source.frequencyTier ? { frequencyTier: source.frequencyTier } : {}),
...(hasSlSearchMetadata(source) && source.snippet ? { snippet: source.snippet } : {}),
...(hasSlSearchMetadata(source) ? { score: source.score } : {}),
...(hasSlSearchMetadata(source) && source.matchReasons ? { matchReasons: source.matchReasons } : {}),
...(hasSlSearchMetadata(source) && source.dictionaryMatches
? { dictionaryMatches: source.dictionaryMatches }
: {}),
...(hasSlSearchMetadata(source) && source.lanes ? { lanes: source.lanes } : {}),
}));
return { sources, totalSources: sources.length };
},
async readSource(input) {
const path = slPath(input.connectionId, input.sourceName);
try {
@ -556,71 +199,9 @@ export function createLocalProjectMcpContextPorts(
return null;
}
},
async writeSource(input) {
const path = slPath(input.connectionId, input.sourceName);
if (input.delete) {
const deleted = await project.fileStore.deleteFile(
path,
LOCAL_AUTHOR,
LOCAL_AUTHOR_EMAIL,
`Remove semantic-layer source: ${input.sourceName}`,
);
return { success: Boolean(deleted), sourceName: input.sourceName };
}
const yaml =
input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0, version: '1.1' });
parseYamlRecord(yaml);
await project.fileStore.writeFile(
path,
`${yaml.trimEnd()}\n`,
LOCAL_AUTHOR,
LOCAL_AUTHOR_EMAIL,
`Update semantic-layer source: ${input.sourceName}`,
);
return { success: true, sourceName: input.sourceName, yaml: `${yaml.trimEnd()}\n` };
},
async validate(input) {
if (options.semanticLayerCompute) {
const connectionId = assertSafeConnectionId(input.connectionId);
const result = await options.semanticLayerCompute.validateSources({
sources: await loadComputableSources(project, connectionId),
dialect: dialectForDriver(project.config.connections[connectionId]?.driver),
recentlyTouched: input.names,
});
return {
success: result.valid,
errors: result.errors,
warnings: result.warnings,
};
}
const names = new Set(input.names ?? []);
const paths = await listSlPaths(project, input.connectionId);
const errors: string[] = [];
for (const path of paths) {
const sourceName = sourceNameFromPath(path);
if (names.size > 0 && !names.has(sourceName)) {
continue;
}
try {
const raw = await project.fileStore.readFile(path);
errors.push(...validateSourceRecord(sourceName, parseYamlRecord(raw.content)));
} catch (error) {
errors.push(`${sourceName}: ${error instanceof Error ? error.message : String(error)}`);
}
}
return {
success: errors.length === 0,
errors,
warnings: [SL_SHAPE_WARNING],
};
},
async query(input) {
async query(input, executionOptions) {
if (!options.semanticLayerCompute) {
throw new Error(
'sl_query requires a semantic-layer query adapter. Local stdio MCP exposes file-backed SL CRUD only.',
);
throw new Error('sl_query requires a semantic-layer query adapter.');
}
return compileLocalSlQuery(project, {
connectionId: input.connectionId,
@ -629,6 +210,7 @@ export function createLocalProjectMcpContextPorts(
execute: Boolean(options.queryExecutor),
maxRows: input.query.limit,
queryExecutor: options.queryExecutor,
onProgress: executionOptions?.onProgress,
});
},
},
@ -651,114 +233,8 @@ export function createLocalProjectMcpContextPorts(
if (options.sqlAnalysis && options.localScan?.createConnector) {
ports.sqlExecution = {
async execute(input) {
return executeValidatedReadOnlySql(project, options, input);
},
};
}
if (options.localIngest) {
ports.ingest = {
async trigger(input) {
const sourceDir = localIngestSourceDir(input.config);
if (input.adapter === 'metabase' && !sourceDir) {
const result = await (options.localIngest?.runLocalMetabaseIngest ?? runLocalMetabaseIngest)({
project,
adapters: options.localIngest?.adapters ?? createDefaultLocalIngestAdapters(project),
metabaseConnectionId: input.connectionId,
trigger: input.trigger,
jobIdFactory: options.localIngest?.jobIdFactory,
pullConfigOptions: options.localIngest?.pullConfigOptions,
agentRunner: options.localIngest?.agentRunner,
llmRuntime: options.localIngest?.llmRuntime,
memoryModel: options.localIngest?.memoryModel,
semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute,
queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor,
logger: options.localIngest?.logger,
});
return {
runId: `metabase-fanout:${result.metabaseConnectionId}`,
jobId: undefined,
reportId: undefined,
fanout: {
status: result.status,
children: result.children.map((child) => ({
runId: child.report.runId,
jobId: child.report.jobId,
reportId: child.report.id,
targetConnectionId: child.targetConnectionId,
metabaseDatabaseId: child.metabaseDatabaseId,
})),
},
};
}
const executeLocalIngest = options.localIngest?.runLocalIngest ?? runLocalIngest;
const result = await executeLocalIngest({
project,
adapters: options.localIngest?.adapters ?? createDefaultLocalIngestAdapters(project),
adapter: input.adapter,
connectionId: input.connectionId,
sourceDir,
pullConfigOptions: options.localIngest?.pullConfigOptions,
trigger: input.trigger,
jobId: options.localIngest?.jobIdFactory?.(),
agentRunner: options.localIngest?.agentRunner,
llmRuntime: options.localIngest?.llmRuntime,
memoryModel: options.localIngest?.memoryModel,
semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute,
queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor,
logger: options.localIngest?.logger,
});
return {
runId: result.report.runId,
jobId: result.report.jobId,
reportId: result.report.id,
};
},
async status(input) {
const report = await getLocalIngestStatus(project, input.runId);
return report ? statusFromIngestReport(report) : null;
},
async report(input) {
return getLocalIngestStatus(project, input.runId);
},
async replay(input) {
const report = await getLocalIngestStatus(project, input.runId);
return report ? ingestReportToMemoryFlowReplay(report) : null;
},
};
}
if (options.localScan) {
ports.scan = {
async trigger(input) {
return runLocalScan({
project,
connectionId: input.connectionId,
mode: input.mode,
detectRelationships: input.detectRelationships,
dryRun: input.dryRun,
trigger: 'mcp',
adapters: options.localScan?.adapters,
databaseIntrospectionUrl: options.localScan?.databaseIntrospectionUrl,
createConnector: options.localScan?.createConnector,
jobId: options.localScan?.jobIdFactory?.(),
now: options.localScan?.now,
});
},
async status(input) {
return getLocalScanStatus(project, input.runId);
},
async report(input) {
return getLocalScanReport(project, input.runId);
},
async listArtifacts(input) {
const report = await getLocalScanReport(project, input.runId);
return report ? listArtifactsForReport(project, input.runId, report) : null;
},
async readArtifact(input) {
return readScanArtifact(project, input.runId, input.path);
async execute(input, executionOptions) {
return executeValidatedReadOnlySql(project, options, input, executionOptions?.onProgress);
},
};
}

File diff suppressed because it is too large Load diff

View file

@ -1,71 +1,8 @@
import { randomUUID } from 'node:crypto';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';
import type { MemoryAgentInput } from '../memory/index.js';
import { jsonErrorToolResult, jsonToolResult, registerKtxContextTools } from './context-tools.js';
import type { KtxMcpServerDeps, KtxMcpServerLike, MemoryCapturePort } from './types.js';
const memoryCaptureInputSchema = {
userMessage: z.string().min(1).describe('The user message that may contain durable knowledge.'),
assistantMessage: z.string().optional().describe('The assistant response that concluded the exchange.'),
connectionId: z.string().min(1).optional().describe('Optional connection id for semantic-layer capture.'),
};
const memoryCaptureStatusInputSchema = {
runId: z.string().min(1).describe('The memory capture run id returned by memory_capture.'),
};
function registerMemoryCaptureTools(deps: {
server: KtxMcpServerLike;
memoryCapture: MemoryCapturePort;
userContext: KtxMcpServerDeps['userContext'];
}): void {
deps.server.registerTool(
'memory_capture',
{
title: 'Memory Capture',
description:
'Capture durable knowledge and semantic-layer updates from the final user/assistant exchange. Returns a run id for polling.',
inputSchema: memoryCaptureInputSchema,
},
async (input) => {
const captureInput: MemoryAgentInput = {
userId: deps.userContext.userId,
chatId: `mcp-${randomUUID()}`,
userMessage: String(input.userMessage),
assistantMessage: typeof input.assistantMessage === 'string' ? input.assistantMessage : undefined,
connectionId: typeof input.connectionId === 'string' ? input.connectionId : undefined,
sourceType: 'external_ingest',
};
const result = await deps.memoryCapture.capture(captureInput);
return jsonToolResult(result);
},
);
deps.server.registerTool(
'memory_capture_status',
{
title: 'Memory Capture Status',
description: 'Read the current or final status for a memory capture run.',
inputSchema: memoryCaptureStatusInputSchema,
},
async (input) => {
const runId = String(input.runId);
const status = await deps.memoryCapture.status(runId);
return status ? jsonToolResult(status) : jsonErrorToolResult(`Memory capture run "${runId}" was not found.`);
},
);
}
import { registerKtxContextTools } from './context-tools.js';
import type { KtxMcpServerDeps, KtxMcpServerLike } from './types.js';
export function createKtxMcpServer(deps: KtxMcpServerDeps): KtxMcpServerDeps['server'] {
if (deps.memoryCapture) {
registerMemoryCaptureTools({
server: deps.server,
memoryCapture: deps.memoryCapture,
userContext: deps.userContext,
});
}
if (deps.contextTools) {
registerKtxContextTools({
server: deps.server,
@ -86,7 +23,6 @@ export function createDefaultKtxMcpServer(
});
createKtxMcpServer({
server: server as KtxMcpServerLike,
memoryCapture: deps.memoryCapture,
userContext: deps.userContext,
contextTools: deps.contextTools,
});

View file

@ -1,16 +1,7 @@
import type { IngestReportSnapshot, MemoryFlowReplayInput, TableUsageOutput } from '../ingest/index.js';
import type { MemoryCaptureService } from '../memory/index.js';
import type { MemoryIngestService } from '../memory/index.js';
import type { KtxEntityDetailsInput, KtxEntityDetailsResponse } from '../scan/entity-details.js';
import type { KtxScanMode, KtxScanReport } from '../scan/index.js';
import type { KtxDiscoverDataInput, KtxDiscoverDataResponse } from '../search/index.js';
import type {
KtxDictionarySearchInput,
KtxDictionarySearchResponse,
SemanticLayerQueryInput,
SlDictionaryMatch,
SlSearchLaneSummary,
SlSearchMatchReason,
} from '../sl/index.js';
import type { KtxDictionarySearchInput, KtxDictionarySearchResponse, SemanticLayerQueryInput } from '../sl/index.js';
import type { WikiSearchLaneSummary, WikiSearchMatchReason } from '../wiki/index.js';
export interface KtxMcpTextContent {
@ -18,15 +9,38 @@ export interface KtxMcpTextContent {
text: string;
}
export interface KtxMcpToolResult<T extends object = object> {
export type NonArrayObject = object & { length?: never };
export interface KtxMcpToolResult<T extends NonArrayObject = NonArrayObject> {
content: KtxMcpTextContent[];
structuredContent?: T;
isError?: true;
}
export interface MemoryCapturePort {
capture: MemoryCaptureService['capture'];
status: MemoryCaptureService['status'];
interface KtxMcpProgressEvent {
progress: number;
total?: number;
message: string;
}
export type KtxMcpProgressCallback = (event: KtxMcpProgressEvent) => void | Promise<void>;
export interface KtxMcpToolHandlerContext {
_meta?: { progressToken?: string | number; [key: string]: unknown };
sendNotification?: (notification: {
method: 'notifications/progress';
params: {
progressToken: string | number;
progress: number;
total?: number;
message?: string;
};
}) => Promise<void>;
}
export interface MemoryIngestPort {
ingest: MemoryIngestService['ingest'];
status: MemoryIngestService['status'];
}
export interface KtxMcpUserContext {
@ -40,8 +54,10 @@ export interface KtxMcpServerLike {
title?: string;
description?: string;
inputSchema: unknown;
outputSchema?: unknown;
annotations?: Record<string, unknown>;
},
handler: (input: Record<string, unknown>) => Promise<unknown>,
handler: (input: Record<string, unknown>, context?: KtxMcpToolHandlerContext) => Promise<unknown>,
): void;
}
@ -51,18 +67,8 @@ export interface KtxConnectionSummary {
connectionType: string;
}
export interface KtxConnectionTestResponse {
id: string;
connectionType: string;
ok: boolean;
tableCount: number | null;
message: string;
warnings: string[];
}
export interface KtxConnectionsMcpPort {
list(): Promise<KtxConnectionSummary[]>;
test?(input: { connectionId: string }): Promise<KtxConnectionTestResponse | null>;
}
export interface KtxKnowledgeSearchResult {
@ -90,62 +96,9 @@ export interface KtxKnowledgePage {
slRefs?: string[];
}
interface KtxHistoricSqlKnowledgeUsage {
executions: number;
distinct_users: number;
first_seen: string;
last_seen: string;
p50_runtime_ms: number | null;
p95_runtime_ms: number | null;
error_rate: number;
rows_produced?: number;
}
export interface KtxKnowledgeWriteResponse {
success: boolean;
key: string;
action: 'created' | 'updated';
}
export interface KtxKnowledgeMcpPort {
search(input: { userId: string; query: string; limit: number }): Promise<KtxKnowledgeSearchResponse>;
read(input: { userId: string; key: string }): Promise<KtxKnowledgePage | null>;
write(input: {
userId: string;
key: string;
summary: string;
content: string;
tags?: string[];
refs?: string[];
slRefs?: string[];
source?: string;
intent?: string;
tables?: string[];
representativeSql?: string;
usage?: KtxHistoricSqlKnowledgeUsage;
fingerprints?: string[];
}): Promise<KtxKnowledgeWriteResponse>;
}
export interface KtxSemanticLayerSourceSummary {
connectionId: string;
connectionName: string;
name: string;
description?: string;
columnCount: number;
measureCount: number;
joinCount: number;
frequencyTier?: TableUsageOutput['frequencyTier'];
snippet?: string;
score?: number;
matchReasons?: SlSearchMatchReason[];
dictionaryMatches?: SlDictionaryMatch[];
lanes?: SlSearchLaneSummary[];
}
export interface KtxSemanticLayerListResponse {
sources: KtxSemanticLayerSourceSummary[];
totalSources: number;
}
export interface KtxSemanticLayerReadResponse {
@ -153,21 +106,6 @@ export interface KtxSemanticLayerReadResponse {
yaml: string;
}
export interface KtxSemanticLayerWriteResponse {
success: boolean;
sourceName: string;
yaml?: string;
errors?: string[];
warnings?: string[];
commitHash?: string;
}
export interface KtxSemanticLayerValidationResponse {
success: boolean;
errors: string[];
warnings: string[];
}
export interface KtxSemanticLayerQueryResponse {
sql: string;
headers: string[];
@ -177,143 +115,11 @@ export interface KtxSemanticLayerQueryResponse {
}
export interface KtxSemanticLayerMcpPort {
listSources(input: { connectionId?: string; query?: string }): Promise<KtxSemanticLayerListResponse>;
readSource(input: { connectionId: string; sourceName: string }): Promise<KtxSemanticLayerReadResponse | null>;
writeSource(input: {
connectionId: string;
sourceName: string;
yaml?: string;
source?: Record<string, unknown>;
delete?: boolean;
}): Promise<KtxSemanticLayerWriteResponse>;
validate(input: { connectionId: string; names?: string[] }): Promise<KtxSemanticLayerValidationResponse>;
query(input: { connectionId?: string; query: SemanticLayerQueryInput }): Promise<KtxSemanticLayerQueryResponse>;
}
export type KtxIngestTriggerKind = 'upload' | 'scheduled_pull' | 'manual_resync';
interface KtxIngestTriggerFanoutChild {
runId: string;
jobId: string;
reportId: string;
targetConnectionId: string;
metabaseDatabaseId: number;
}
export interface KtxIngestTriggerResponse {
runId: string;
jobId?: string;
reportId?: string;
fanout?: {
status: 'all_succeeded' | 'partial_failure' | 'all_failed';
children: KtxIngestTriggerFanoutChild[];
};
}
export interface KtxIngestDiffSummary {
added: number;
modified: number;
deleted: number;
unchanged: number;
}
export interface KtxIngestWorkUnitSummary {
unitKey: string;
rawFiles: string[];
peerFileIndex: string[];
dependencyPaths: string[];
}
export interface KtxIngestStatusResponse {
runId: string;
jobId?: string;
reportId?: string;
status: string;
stage?: string;
progress?: number;
errors?: string[];
done: boolean;
adapter?: string;
connectionId?: string;
sourceDir?: string | null;
syncId?: string;
startedAt?: string;
completedAt?: string;
previousRunId?: string | null;
diffSummary?: KtxIngestDiffSummary;
workUnitCount?: number;
rawFileCount?: number;
workUnits?: KtxIngestWorkUnitSummary[];
evictionDeletedRawPaths?: string[];
}
export interface KtxIngestMcpPort {
trigger(input: {
adapter: string;
connectionId: string;
config?: unknown;
trigger: KtxIngestTriggerKind;
}): Promise<KtxIngestTriggerResponse>;
status(input: { runId: string }): Promise<KtxIngestStatusResponse | null>;
report?(input: { runId: string }): Promise<IngestReportSnapshot | null>;
replay?(input: { runId: string }): Promise<MemoryFlowReplayInput | null>;
}
interface KtxScanTriggerResponse {
runId: string;
status: 'done';
done: true;
connectionId: string;
mode: KtxScanMode;
dryRun: boolean;
syncId: string;
report: KtxScanReport;
}
interface KtxScanStatusResponse {
runId: string;
status: string;
done: boolean;
connectionId: string;
mode: KtxScanMode;
dryRun: boolean;
syncId: string;
progress: number;
startedAt: string;
completedAt: string;
reportPath: string | null;
warnings: KtxScanReport['warnings'];
}
export type KtxScanArtifactType = 'report' | 'raw_source' | 'manifest_shard' | 'enrichment_artifact';
export interface KtxScanArtifactSummary {
path: string;
type: KtxScanArtifactType;
size?: number;
}
export interface KtxScanArtifactListResponse {
runId: string;
artifacts: KtxScanArtifactSummary[];
}
export interface KtxScanArtifactReadResponse extends KtxScanArtifactSummary {
runId: string;
content: string;
}
export interface KtxScanMcpPort {
trigger(input: {
connectionId: string;
mode?: KtxScanMode;
detectRelationships: boolean;
dryRun: boolean;
}): Promise<KtxScanTriggerResponse>;
status(input: { runId: string }): Promise<KtxScanStatusResponse | null>;
report(input: { runId: string }): Promise<KtxScanReport | null>;
listArtifacts?(input: { runId: string }): Promise<KtxScanArtifactListResponse | null>;
readArtifact?(input: { runId: string; path: string }): Promise<KtxScanArtifactReadResponse | null>;
query(
input: { connectionId?: string; query: SemanticLayerQueryInput },
options?: { onProgress?: KtxMcpProgressCallback },
): Promise<KtxSemanticLayerQueryResponse>;
}
export interface KtxEntityDetailsMcpPort {
@ -336,7 +142,10 @@ export interface KtxSqlExecutionResponse {
}
export interface KtxSqlExecutionMcpPort {
execute(input: { connectionId: string; sql: string; maxRows: number }): Promise<KtxSqlExecutionResponse>;
execute(
input: { connectionId: string; sql: string; maxRows: number },
options?: { onProgress?: KtxMcpProgressCallback },
): Promise<KtxSqlExecutionResponse>;
}
export interface KtxMcpContextPorts {
@ -347,13 +156,11 @@ export interface KtxMcpContextPorts {
dictionarySearch?: KtxDictionarySearchMcpPort;
discover?: KtxDiscoverDataMcpPort;
sqlExecution?: KtxSqlExecutionMcpPort;
ingest?: KtxIngestMcpPort;
scan?: KtxScanMcpPort;
memoryIngest?: MemoryIngestPort;
}
export interface KtxMcpServerDeps {
server: KtxMcpServerLike;
memoryCapture?: MemoryCapturePort;
userContext: KtxMcpUserContext;
contextTools?: KtxMcpContextPorts;
}

View file

@ -8,13 +8,13 @@ export {
stepBudgetFor,
} from './capture-signals.js';
export { MemoryAgentService } from './memory-agent.service.js';
export { createLocalProjectMemoryCapture, type CreateLocalProjectMemoryCaptureOptions } from './local-memory.js';
export { createLocalProjectMemoryIngest, type CreateLocalProjectMemoryIngestOptions } from './local-memory.js';
export { LocalMemoryRunStore, type LocalMemoryRunStoreOptions } from './local-memory-runs.js';
export {
MemoryCaptureService,
type MemoryCaptureServiceDeps,
type MemoryCaptureStartResult,
type MemoryCaptureStatus,
MemoryIngestService,
type MemoryIngestServiceDeps,
type MemoryIngestStartResult,
type MemoryIngestStatus,
type MemoryRunRecord,
type MemoryRunStatus,
type MemoryRunStorePort,

View file

@ -3,7 +3,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { initKtxProject } from '../project/index.js';
import { createLocalProjectMemoryCapture } from './local-memory.js';
import { createLocalProjectMemoryIngest } from './local-memory.js';
import { LocalMemoryRunStore } from './local-memory-runs.js';
vi.mock('ai', () => ({
@ -77,7 +77,7 @@ describe('LocalMemoryRunStore', () => {
});
});
describe('createLocalProjectMemoryCapture', () => {
describe('createLocalProjectMemoryIngest', () => {
let tempDir: string;
beforeEach(async () => {
@ -110,13 +110,13 @@ describe('createLocalProjectMemoryCapture', () => {
},
};
const capture = createLocalProjectMemoryCapture(project, {
const ingest = createLocalProjectMemoryIngest(project, {
agentRunner: agentRunner as never,
runIdFactory: () => 'memory-run-1',
});
await expect(
capture.capture({
ingest.ingest({
userId: 'local-user',
chatId: 'chat-1',
userMessage: 'define revenue as paid order value net of refunds',
@ -124,12 +124,12 @@ describe('createLocalProjectMemoryCapture', () => {
sourceType: 'external_ingest',
}),
).resolves.toEqual({ runId: 'memory-run-1' });
await capture.waitForRun('memory-run-1');
await ingest.waitForRun('memory-run-1');
await expect(access(join(project.projectDir, '.ktx/db.sqlite'))).resolves.toBeUndefined();
await expectPathMissing(join(project.projectDir, '.ktx/memory-runs/memory-run-1.json'));
await expect(capture.status('memory-run-1')).resolves.toMatchObject({
await expect(ingest.status('memory-run-1')).resolves.toMatchObject({
runId: 'memory-run-1',
status: 'done',
done: true,
@ -172,12 +172,12 @@ describe('createLocalProjectMemoryCapture', () => {
},
};
const capture = createLocalProjectMemoryCapture(project, {
const ingest = createLocalProjectMemoryIngest(project, {
agentRunner: agentRunner as never,
runIdFactory: () => 'memory-run-2',
});
await capture.capture({
await ingest.ingest({
userId: 'local-user',
chatId: 'chat-2',
userMessage: 'going forward define orders count as count of public orders',
@ -185,12 +185,12 @@ describe('createLocalProjectMemoryCapture', () => {
connectionId: 'warehouse',
sourceType: 'external_ingest',
});
await capture.waitForRun('memory-run-2');
await ingest.waitForRun('memory-run-2');
await expect(access(join(project.projectDir, '.ktx/db.sqlite'))).resolves.toBeUndefined();
await expectPathMissing(join(project.projectDir, '.ktx/memory-runs/memory-run-2.json'));
await expect(capture.status('memory-run-2')).resolves.toMatchObject({
await expect(ingest.status('memory-run-2')).resolves.toMatchObject({
runId: 'memory-run-2',
status: 'done',
captured: { wiki: [], sl: ['orders'], xrefs: [] },

View file

@ -51,7 +51,7 @@ import {
} from '../wiki/index.js';
import { LocalMemoryRunStore } from './local-memory-runs.js';
import { MemoryAgentService } from './memory-agent.service.js';
import { MemoryCaptureService } from './memory-runs.js';
import { MemoryIngestService } from './memory-runs.js';
import type {
MemoryConnectionPort,
MemoryFileStorePort,
@ -64,9 +64,9 @@ import type {
const promptsDir = fileURLToPath(new URL('../../prompts', import.meta.url));
const skillsDir = fileURLToPath(new URL('../../skills', import.meta.url));
const LOCAL_AUTHOR = { name: 'KTX Local', email: 'local@ktx.local' };
const LOCAL_SHAPE_WARNING = 'Local memory capture validates semantic-layer YAML shape only.';
const LOCAL_SHAPE_WARNING = 'Local memory ingest validates semantic-layer YAML shape only.';
export interface CreateLocalProjectMemoryCaptureOptions {
export interface CreateLocalProjectMemoryIngestOptions {
llmRuntime?: KtxLlmRuntimePort;
agentRunner?: AgentRunnerPort;
memoryModel?: string;
@ -76,10 +76,10 @@ export interface CreateLocalProjectMemoryCaptureOptions {
logger?: KtxLogger;
}
export function createLocalProjectMemoryCapture(
export function createLocalProjectMemoryIngest(
project: KtxLocalProject,
options: CreateLocalProjectMemoryCaptureOptions = {},
): MemoryCaptureService {
options: CreateLocalProjectMemoryIngestOptions = {},
): MemoryIngestService {
const logger = options.logger ?? noopLogger;
const rootFileStore = new LocalMemoryFileStore(project.fileStore);
const embedding = new NoopEmbeddingPort();
@ -139,7 +139,7 @@ export function createLocalProjectMemoryCapture(
toolsetFactory,
logger,
});
return new MemoryCaptureService({
return new MemoryIngestService({
memoryAgent,
runs: new LocalMemoryRunStore({ projectDir: project.projectDir, idFactory: options.runIdFactory }),
});
@ -147,7 +147,7 @@ export function createLocalProjectMemoryCapture(
function requireLlmRuntime(runtime: KtxLlmRuntimePort | null | undefined): KtxLlmRuntimePort {
if (!runtime) {
throw new Error('createLocalProjectMemoryCapture requires llm.provider.backend or an injected agentRunner');
throw new Error('createLocalProjectMemoryIngest requires llm.provider.backend or an injected agentRunner');
}
return runtime;
}

View file

@ -1,6 +1,6 @@
import { describe, expect, it, vi } from 'vitest';
import type { MemoryAgentInput, MemoryAgentResult, MemoryAgentService } from './index.js';
import { MemoryCaptureService, type MemoryRunStorePort } from './memory-runs.js';
import { MemoryIngestService, type MemoryRunStorePort } from './memory-runs.js';
class InMemoryRunStore implements MemoryRunStorePort {
readonly rows = new Map<
@ -74,32 +74,32 @@ function deferred<T>() {
}
function buildService(): {
capture: MemoryCaptureService;
ingest: MemoryIngestService;
store: InMemoryRunStore;
ingest: ReturnType<typeof vi.fn>;
memoryAgentIngest: ReturnType<typeof vi.fn>;
run: ReturnType<typeof deferred<MemoryAgentResult>>;
} {
const store = new InMemoryRunStore();
const run = deferred<MemoryAgentResult>();
const ingest = vi.fn<MemoryAgentService['ingest']>().mockReturnValue(run.promise);
const memoryAgent = { ingest };
const memoryAgentIngest = vi.fn<MemoryAgentService['ingest']>().mockReturnValue(run.promise);
const memoryAgent = { ingest: memoryAgentIngest };
return {
capture: new MemoryCaptureService({ memoryAgent, runs: store }),
ingest: new MemoryIngestService({ memoryAgent, runs: store }),
store,
ingest,
memoryAgentIngest,
run,
};
}
describe('MemoryCaptureService', () => {
it('creates a run, executes memory capture, and stores a done summary', async () => {
describe('MemoryIngestService', () => {
it('creates a run, executes memory ingest, and stores a done summary', async () => {
const result: MemoryAgentResult = {
signalDetected: true,
actions: [{ target: 'wiki', type: 'created', key: 'revenue', detail: 'captured revenue definition' }],
skillsLoaded: ['wiki_capture'],
commitHash: 'abc123',
};
const { capture, store, ingest, run } = buildService();
const { ingest, store, memoryAgentIngest, run } = buildService();
const input: MemoryAgentInput = {
userId: 'user-1',
@ -109,21 +109,21 @@ describe('MemoryCaptureService', () => {
connectionId: '00000000-0000-0000-0000-000000000001',
};
const started = await capture.capture(input);
const started = await ingest.ingest(input);
expect(started.runId).toBe('run-1');
expect(ingest).toHaveBeenCalledWith(input);
await expect(capture.status(started.runId)).resolves.toMatchObject({
expect(memoryAgentIngest).toHaveBeenCalledWith(input);
await expect(ingest.status(started.runId)).resolves.toMatchObject({
runId: 'run-1',
status: 'running',
stage: 'capturing',
stage: 'ingesting',
done: false,
});
run.resolve(result);
await capture.waitForRun(started.runId);
await ingest.waitForRun(started.runId);
const status = await capture.status(started.runId);
const status = await ingest.status(started.runId);
expect(status).toEqual({
runId: 'run-1',
stage: 'done',
@ -142,10 +142,10 @@ describe('MemoryCaptureService', () => {
expect(store.rows.get('run-1')?.inputHash).toHaveLength(64);
});
it('stores no-signal captures as done with empty captured arrays', async () => {
const { capture, run } = buildService();
it('stores no-signal ingests as done with empty captured arrays', async () => {
const { ingest, run } = buildService();
const started = await capture.capture({
const started = await ingest.ingest({
userId: 'user-1',
chatId: 'chat-2',
userMessage: 'Thanks.',
@ -157,9 +157,9 @@ describe('MemoryCaptureService', () => {
skillsLoaded: [],
commitHash: null,
});
await capture.waitForRun(started.runId);
await ingest.waitForRun(started.runId);
await expect(capture.status(started.runId)).resolves.toMatchObject({
await expect(ingest.status(started.runId)).resolves.toMatchObject({
done: true,
status: 'done',
captured: { wiki: [], sl: [], xrefs: [] },
@ -172,16 +172,16 @@ describe('MemoryCaptureService', () => {
const memoryAgent = {
ingest: vi.fn<MemoryAgentService['ingest']>().mockRejectedValue(new Error('LLM provider missing')),
};
const capture = new MemoryCaptureService({ memoryAgent, runs: store });
const ingest = new MemoryIngestService({ memoryAgent, runs: store });
const started = await capture.capture({
const started = await ingest.ingest({
userId: 'user-1',
chatId: 'chat-3',
userMessage: 'Remember this.',
});
await capture.waitForRun(started.runId);
await ingest.waitForRun(started.runId);
await expect(capture.status(started.runId)).resolves.toMatchObject({
await expect(ingest.status(started.runId)).resolves.toMatchObject({
done: true,
status: 'error',
stage: 'error',
@ -191,8 +191,8 @@ describe('MemoryCaptureService', () => {
});
it('returns null for an unknown run id', async () => {
const { capture } = buildService();
const { ingest } = buildService();
await expect(capture.status('missing')).resolves.toBeNull();
await expect(ingest.status('missing')).resolves.toBeNull();
});
});

View file

@ -21,16 +21,16 @@ export interface MemoryRunStorePort {
findById(id: string): Promise<MemoryRunRecord | null>;
}
export interface MemoryCaptureServiceDeps {
export interface MemoryIngestServiceDeps {
memoryAgent: Pick<MemoryAgentService, 'ingest'>;
runs: MemoryRunStorePort;
}
export interface MemoryCaptureStartResult {
export interface MemoryIngestStartResult {
runId: string;
}
export interface MemoryCaptureStatus {
export interface MemoryIngestStatus {
runId: string;
status: MemoryRunStatus;
stage: string;
@ -55,7 +55,7 @@ function inputHash(input: MemoryAgentInput): string {
return createHash('sha256').update(stableInput).digest('hex');
}
function capturedKeys(actions: MemoryAction[]): MemoryCaptureStatus['captured'] {
function capturedKeys(actions: MemoryAction[]): MemoryIngestStatus['captured'] {
const wiki = new Set<string>();
const sl = new Set<string>();
const xrefs = new Set<string>();
@ -78,20 +78,20 @@ function capturedKeys(actions: MemoryAction[]): MemoryCaptureStatus['captured']
};
}
export class MemoryCaptureService {
export class MemoryIngestService {
private readonly inFlight = new Map<string, Promise<void>>();
constructor(private readonly deps: MemoryCaptureServiceDeps) {}
constructor(private readonly deps: MemoryIngestServiceDeps) {}
async capture(input: MemoryAgentInput): Promise<MemoryCaptureStartResult> {
async ingest(input: MemoryAgentInput): Promise<MemoryIngestStartResult> {
const row = await this.deps.runs.createRunning({
inputHash: inputHash(input),
chatId: input.chatId,
});
await this.deps.runs.markRunning(row.id, 'capturing');
await this.deps.runs.markRunning(row.id, 'ingesting');
const run = this.runCapture(row.id, input);
const run = this.runIngest(row.id, input);
this.inFlight.set(row.id, run);
run.finally(() => this.inFlight.delete(row.id)).catch(() => undefined);
@ -102,7 +102,7 @@ export class MemoryCaptureService {
await this.inFlight.get(runId);
}
private async runCapture(runId: string, input: MemoryAgentInput): Promise<void> {
private async runIngest(runId: string, input: MemoryAgentInput): Promise<void> {
try {
const outputSummary = await this.deps.memoryAgent.ingest(input);
await this.deps.runs.markDone(runId, outputSummary);
@ -111,7 +111,7 @@ export class MemoryCaptureService {
}
}
async status(runId: string): Promise<MemoryCaptureStatus | null> {
async status(runId: string): Promise<MemoryIngestStatus | null> {
const row = await this.deps.runs.findById(runId);
if (!row) {
return null;

View file

@ -182,6 +182,46 @@ grain: []
});
});
it('strips authoring-only fields (usage, inherits_columns_from) before sending sources to the daemon', async () => {
await project.fileStore.writeFile(
'semantic-layer/warehouse/_schema/public.yaml',
`tables:
invoices:
table: public.invoices
columns:
- name: invoice_id
type: number
pk: true
- name: amount
type: number
usage:
narrative: Activation policy windows table for invoice analytics.
frequencyTier: mid
commonFilters:
- amount
commonGroupBys: []
commonJoins: []
staleSince: null
`,
'ktx',
'ktx@example.com',
'Add manifest shard with usage',
);
await compileLocalSlQuery(project, {
connectionId: 'warehouse',
query: { measures: ['sum(invoices.amount)'], dimensions: [] },
compute,
});
const lastCall = (compute.query as ReturnType<typeof vi.fn>).mock.calls.at(-1)?.[0];
const invoices = lastCall?.sources.find((s: Record<string, unknown>) => s.name === 'invoices');
expect(invoices).toBeDefined();
expect(invoices).not.toHaveProperty('usage');
expect(invoices).not.toHaveProperty('inherits_columns_from');
expect(invoices).not.toHaveProperty('source_type');
});
it('resolves the only configured connection when connectionId is omitted', async () => {
await compileLocalSlQuery(project, {
query: { measures: ['orders.order_count'], dimensions: [] },
@ -236,6 +276,43 @@ grain: []
});
});
it('emits progress while compiling and executing a local semantic-layer query', async () => {
const progress: Array<{ progress: number; message: string }> = [];
const queryExecutor = {
execute: vi.fn(async () => ({
headers: ['status', 'order_count'],
rows: [['paid', 2]],
totalRows: 1,
command: 'SELECT',
rowCount: 1,
})),
};
const result = await compileLocalSlQuery(project, {
connectionId: 'warehouse',
query: {
measures: ['orders.order_count'],
dimensions: ['orders.status'],
limit: 25,
},
compute,
execute: true,
maxRows: 10,
queryExecutor,
onProgress: (event) => {
progress.push({ progress: event.progress, message: event.message });
},
});
expect(result.totalRows).toBe(1);
expect(progress).toEqual([
{ progress: 0, message: 'Compiling query' },
{ progress: 0.3, message: 'Generating SQL' },
{ progress: 0.6, message: 'Executing' },
{ progress: 1, message: 'Fetched 1 rows' },
]);
});
it('requires a query executor for executed mode', async () => {
await expect(
compileLocalSlQuery(project, {

View file

@ -1,7 +1,9 @@
import type { KtxSqlQueryExecutorPort } from '../connections/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import type { KtxMcpProgressCallback } from '../mcp/types.js';
import type { KtxLocalProject } from '../project/index.js';
import { loadLocalSlSourceRecords } from './local-sl.js';
import { toResolvedWire } from './semantic-layer.service.js';
import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput } from './types.js';
const COMPILE_ONLY_REASON =
@ -14,6 +16,7 @@ export interface CompileLocalSlQueryOptions {
execute?: boolean;
maxRows?: number;
queryExecutor?: KtxSqlQueryExecutorPort;
onProgress?: KtxMcpProgressCallback;
}
export interface CompileLocalSlQueryResult extends SemanticLayerQueryExecutionResult {
@ -75,10 +78,10 @@ function resolveLocalConnectionId(project: KtxLocalProject, requested: string |
async function loadComputableSources(
project: KtxLocalProject,
connectionId: string,
): Promise<Record<string, unknown>[]> {
): Promise<ReturnType<typeof toResolvedWire>[]> {
return (await loadLocalSlSourceRecords(project, { connectionId: assertSafeConnectionId(connectionId) }))
.map((record) => ({ ...record.source }))
.filter((source) => source.table || source.sql);
.filter((record) => record.source.table || record.source.sql)
.map((record) => toResolvedWire(record.source));
}
function headersFromColumns(columns: Array<Record<string, unknown>>): string[] {
@ -91,15 +94,20 @@ export async function compileLocalSlQuery(
project: KtxLocalProject,
options: CompileLocalSlQueryOptions,
): Promise<CompileLocalSlQueryResult> {
await options.onProgress?.({ progress: 0, message: 'Compiling query' });
const connectionId = resolveLocalConnectionId(project, options.connectionId);
const dialect = dialectForDriver(project.config.connections[connectionId]?.driver);
const sources = await loadComputableSources(project, connectionId);
await options.onProgress?.({ progress: 0.3, message: 'Generating SQL' });
const response = await options.compute.query({
sources: await loadComputableSources(project, connectionId),
sources,
dialect,
query: options.query,
});
if (!options.execute) {
await options.onProgress?.({ progress: 1, message: 'Fetched 0 rows' });
return {
connectionId,
dialect: response.dialect,
@ -122,6 +130,7 @@ export async function compileLocalSlQuery(
}
const maxRows = options.maxRows ?? options.query.limit;
await options.onProgress?.({ progress: 0.6, message: 'Executing' });
const execution = await options.queryExecutor.execute({
connectionId,
projectDir: project.projectDir,
@ -129,6 +138,7 @@ export async function compileLocalSlQuery(
sql: response.sql,
maxRows,
});
await options.onProgress?.({ progress: 1, message: `Fetched ${execution.totalRows} rows` });
return {
connectionId,