fix: surface silent failures and drop unused dead-code paths (#193)

Address overengineering audit findings across cli/context/connector packages:

- F1 Snowflake `query`: drop bare catch that flattened all errors to empty result
- F2 memory-agent: treat LLM `stopReason === 'error'` as crash (skip squash-merge)
- F3 WikiSearchTool: description honest about token-only fallback vs sqlite-fts5 hybrid
- F5 Scan enrichment provider resolution: return discriminated status and surface
  distinct `llm_unavailable` / `embedding_unavailable` warnings per failure mode
- F6 Relationship validation budget: drop dead `tableCount === undefined → 'all'`
  branch; update tests to pass `tableCount` like production
- F8 `ktx sql`: use canonical `resolveOutputMode` (now honors KTX_OUTPUT/CI/TTY)
- F9 MCP stdio server: default `protocolIo.stderr` to `process.stderr` so
  memory_ingest startup failures are visible
- F13/F14 Scan/setup JSON readers: distinguish ENOENT from corruption instead of
  silently treating both as missing
- F15 `createKtxCliScanConnector`: throw config-shape error when driver matches
  but type guard rejects, instead of "no native connector"
- F16 ContextEvidenceSearchTool: surface `embedding_unhealthy:<reason>` instead
  of silently dropping the semantic lane
- F17 PromptService: default partials to `[]` (removes stale `clinical_policy`
  reference from a prior product)
- F20 `contextBuildCommands`: drop unused `runId` parameter

Dead-code removal:

- F4 Delete `AgentRunnerService` (duplicated `RuntimeAgentRunner`, only test-used);
  migrate tests to exercise `AiSdkKtxLlmRuntime.runAgentLoop` directly
- F7 Delete `KtxScanOrchestrator` and its test (no production callers; the
  inline pipeline in `runLocalScan` is the single source of truth)
- F18 Delete `generateKtxText`/`generateKtxObject` pass-through helpers; inline
  the single `runtime.generateObject` call at its caller

Plus a clarifying comment on the SQLite `resolveStringReference` `file:` carve-out
(load-bearing for SQLite URI form, not a bug).
This commit is contained in:
Andrey Avtomonov 2026-05-21 02:38:18 +02:00 committed by GitHub
parent 7737ccaf1a
commit 0958bc03dc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 186 additions and 820 deletions

View file

@ -19,47 +19,60 @@ export async function createKtxCliScanConnector(
}
if (driver === 'sqlite' || driver === 'sqlite3') {
const { KtxSqliteScanConnector, isKtxSqliteConnectionConfig } = await import('@ktx/connector-sqlite');
if (isKtxSqliteConnectionConfig(connection)) {
return new KtxSqliteScanConnector({ connectionId, connection, projectDir: project.projectDir });
if (!isKtxSqliteConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxSqliteScanConnector({ connectionId, connection, projectDir: project.projectDir });
}
if (driver === 'postgres' || driver === 'postgresql') {
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('@ktx/connector-postgres');
if (isKtxPostgresConnectionConfig(connection)) {
return new KtxPostgresScanConnector({ connectionId, connection });
if (!isKtxPostgresConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxPostgresScanConnector({ connectionId, connection });
}
if (driver === 'mysql') {
const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('@ktx/connector-mysql');
if (isKtxMysqlConnectionConfig(connection)) {
return new KtxMysqlScanConnector({ connectionId, connection });
if (!isKtxMysqlConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxMysqlScanConnector({ connectionId, connection });
}
if (driver === 'clickhouse') {
const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('@ktx/connector-clickhouse');
if (isKtxClickHouseConnectionConfig(connection)) {
return new KtxClickHouseScanConnector({ connectionId, connection });
if (!isKtxClickHouseConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxClickHouseScanConnector({ connectionId, connection });
}
if (driver === 'sqlserver') {
const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('@ktx/connector-sqlserver');
if (isKtxSqlServerConnectionConfig(connection)) {
return new KtxSqlServerScanConnector({ connectionId, connection });
if (!isKtxSqlServerConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxSqlServerScanConnector({ connectionId, connection });
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('@ktx/connector-bigquery');
if (isKtxBigQueryConnectionConfig(connection)) {
return new KtxBigQueryScanConnector({ connectionId, connection });
if (!isKtxBigQueryConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxBigQueryScanConnector({ connectionId, connection });
}
if (driver === 'snowflake') {
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('@ktx/connector-snowflake');
if (isKtxSnowflakeConnectionConfig(connection)) {
return new KtxSnowflakeScanConnector({ connectionId, connection });
if (!isKtxSnowflakeConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxSnowflakeScanConnector({ connectionId, connection });
}
throw new Error(
`Connection "${connectionId}" uses driver "${driver}", which has no native standalone KTX scan connector. Supported drivers: ${SUPPORTED_DRIVERS}.`,
);
}
function invalidConnectionConfigError(connectionId: string, driver: string): Error {
return new Error(
`Connection "${connectionId}" uses driver "${driver}" but its configuration in ktx.yaml does not match the expected shape for that driver. Check the required fields for ${driver} (e.g. url/host/database).`,
);
}

View file

@ -59,7 +59,7 @@ export async function createKtxMcpServerFactory(input: {
try {
memoryIngest = createLocalProjectMemoryIngest(input.project, { semanticLayerCompute, queryExecutor });
} catch (error) {
input.io?.stderr.write(`KTX MCP memory_ingest disabled: ${error instanceof Error ? error.message : String(error)}\n`);
io.stderr.write(`KTX MCP memory_ingest disabled: ${error instanceof Error ? error.message : String(error)}\n`);
}
return () =>

View file

@ -23,7 +23,7 @@ export async function runKtxMcpStdioServer(options: RunKtxMcpStdioServerOptions)
: undefined;
const protocolIo: KtxCliIo = {
stdout: { write() {} },
stderr: options.io?.stderr ?? { write() {} },
stderr: options.io?.stderr ?? process.stderr,
};
const createMcpServer =
options.createMcpServer ??

View file

@ -206,7 +206,7 @@ describe('setup context build state', () => {
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-abc123'),
commands: contextBuildCommands(tempDir),
failureReason: 'Previous foreground context build did not finish. Rerun setup or ktx ingest.',
sourceProgress: [
{
@ -638,7 +638,7 @@ describe('setup context build state', () => {
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-stale'),
commands: contextBuildCommands(tempDir),
failureReason: 'Previous foreground context build did not finish. Rerun setup or ktx ingest.',
});
const io = makeIo();

View file

@ -125,7 +125,7 @@ async function pathExists(path: string): Promise<boolean> {
}
}
export function contextBuildCommands(projectDir: string, runId?: string): KtxSetupContextCommands {
export function contextBuildCommands(projectDir: string): KtxSetupContextCommands {
const resolvedProjectDir = resolve(projectDir);
return {
build: `ktx setup --project-dir ${resolvedProjectDir}`,
@ -177,7 +177,7 @@ function normalizeState(projectDir: string, value: unknown): KtxSetupContextStat
retryableFailedTargets: Array.isArray(record.retryableFailedTargets)
? record.retryableFailedTargets.filter((item): item is string => typeof item === 'string')
: [],
commands: contextBuildCommands(projectDir, runId),
commands: contextBuildCommands(projectDir),
...(typeof record.failureReason === 'string' ? { failureReason: record.failureReason } : {}),
...(normalizeSourceProgress(record.sourceProgress) ? { sourceProgress: normalizeSourceProgress(record.sourceProgress) } : {}),
};
@ -241,7 +241,7 @@ export async function writeKtxSetupContextState(projectDir: string, state: KtxSe
await mkdir(join(resolvedProjectDir, '.ktx', 'setup'), { recursive: true });
const normalized = normalizeState(resolvedProjectDir, {
...state,
commands: contextBuildCommands(resolvedProjectDir, state.runId),
commands: contextBuildCommands(resolvedProjectDir),
});
await writeFile(statePath(resolvedProjectDir), `${JSON.stringify(normalized, null, 2)}\n`, 'utf-8');
}
@ -323,8 +323,11 @@ function stringArrayValue(value: unknown): string[] {
async function readJsonFile(path: string): Promise<unknown | null> {
try {
return JSON.parse(await readFile(path, 'utf-8')) as unknown;
} catch {
return null;
} catch (error) {
if (error instanceof Error && (error as NodeJS.ErrnoException).code === 'ENOENT') {
return null;
}
throw new Error(`Failed to read JSON file ${path}: ${error instanceof Error ? error.message : String(error)}`);
}
}
@ -549,7 +552,7 @@ async function runBuild(
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(args.projectDir, runId),
commands: contextBuildCommands(args.projectDir),
failureReason: 'Previous foreground context build did not finish. Rerun setup or ktx ingest.',
};
await writeKtxSetupContextState(args.projectDir, incompleteState);
@ -663,7 +666,7 @@ async function completeExistingContext(
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(args.projectDir, runId),
commands: contextBuildCommands(args.projectDir),
});
writeExistingContextSuccess(readiness, io);
return { status: 'ready', projectDir: args.projectDir, runId };

View file

@ -331,7 +331,7 @@ describe('setup status', () => {
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-abc123'),
commands: contextBuildCommands(tempDir),
failureReason: 'Previous foreground context build did not finish. Rerun setup or ktx ingest.',
});
@ -505,7 +505,7 @@ describe('setup status', () => {
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-test'),
commands: contextBuildCommands(tempDir),
});
await writeKtxSetupState(tempDir, { completed_steps: ['project', 'context'] });
return { status: 'ready', projectDir: tempDir, runId: 'setup-context-local-test' };
@ -2043,7 +2043,7 @@ describe('setup status', () => {
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-ready'),
commands: contextBuildCommands(tempDir),
});
const previousRuntimeRoot = process.env.KTX_RUNTIME_ROOT;
@ -2148,7 +2148,7 @@ describe('setup status', () => {
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-ready'),
commands: contextBuildCommands(tempDir),
});
const readyMenuSelect = vi.fn();

View file

@ -2,13 +2,14 @@ import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project';
import type { KtxQueryResult, KtxScanConnector } from '@ktx/context/scan';
import type { SqlAnalysisDialect, SqlAnalysisPort } from '@ktx/context/sql-analysis';
import type { KtxCliIo } from './cli-runtime.js';
import { type KtxOutputMode, resolveOutputMode } from './io/mode.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
import { createManagedDaemonSqlAnalysisPort } from './managed-python-http.js';
import { profileMark } from './startup-profile.js';
profileMark('module:sql');
type KtxSqlOutputMode = 'pretty' | 'plain' | 'json';
type KtxSqlOutputMode = KtxOutputMode;
export type KtxSqlArgs = {
command: 'execute';
@ -53,11 +54,6 @@ function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDia
return map[normalized] ?? 'postgres';
}
function resolveOutputMode(args: KtxSqlArgs): KtxSqlOutputMode {
if (args.json === true) return 'json';
return args.output ?? 'pretty';
}
function formatValue(value: unknown): string {
if (value === null || value === undefined) return '';
if (typeof value === 'string') return value;
@ -159,7 +155,8 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps:
},
{ runId: 'cli-sql' },
);
printSqlResult(resultOutput(args.connectionId, result), resolveOutputMode(args), io);
const mode = resolveOutputMode({ explicit: args.output, json: args.json, io });
printSqlResult(resultOutput(args.connectionId, result), mode, io);
return 0;
} finally {
await cleanupConnector(connector);

View file

@ -290,8 +290,6 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined;
const result = await this.executeSnowflakeQuery(connection, sql, binds);
return { ...result, totalRows: result.rows.length, rowCount: result.rows.length };
} catch {
return { headers: [], rows: [], totalRows: 0, rowCount: 0 };
} finally {
if (connection) {
await this.destroyConnection(connection);

View file

@ -90,6 +90,8 @@ function resolveStringReference(key: keyof KtxSqliteConnectionConfig, value: str
if (value.startsWith('env:')) {
return process.env[value.slice('env:'.length)] ?? '';
}
// `file:` on the `url` key is SQLite's native URI form (e.g. `file:///db.sqlite`), not a
// file-contents reference — skip the read so the URI passes through verbatim.
if (key !== 'url' && value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;

View file

@ -1,31 +0,0 @@
import type { KtxLlmProvider } from '@ktx/llm';
import type { KtxLogger } from '../core/index.js';
import { AiSdkKtxLlmRuntime, type AgentTelemetryPort } from '../llm/ai-sdk-runtime.js';
import type { KtxLlmDebugRequestRecorder } from '../llm/debug-request-recorder.js';
import type { AgentRunnerPort, RunLoopParams, RunLoopResult } from '../llm/runtime-port.js';
export type {
RunLoopParams,
RunLoopResult,
RunLoopStepInfo,
RunLoopStopReason,
} from '../llm/runtime-port.js';
export type { AgentTelemetryPort } from '../llm/ai-sdk-runtime.js';
export interface AgentRunnerServiceDeps {
llmProvider: KtxLlmProvider;
telemetry?: AgentTelemetryPort;
debugRequestRecorder?: KtxLlmDebugRequestRecorder;
logger?: KtxLogger;
}
export class AgentRunnerService implements AgentRunnerPort {
private readonly runtime: AiSdkKtxLlmRuntime;
constructor(deps: AgentRunnerServiceDeps) {
this.runtime = new AiSdkKtxLlmRuntime(deps);
}
runLoop(params: RunLoopParams): Promise<RunLoopResult> {
return this.runtime.runAgentLoop(params);
}
}

View file

@ -1,9 +1,9 @@
export type {
AgentRunnerServiceDeps,
AgentTelemetryPort,
AgentRunnerPort,
RunLoopParams,
RunLoopResult,
RunLoopStepInfo,
RunLoopStopReason,
} from './agent-runner.service.js';
export { AgentRunnerService } from './agent-runner.service.js';
} from '../llm/runtime-port.js';
export { RuntimeAgentRunner } from '../llm/runtime-port.js';
export type { AgentTelemetryPort } from '../llm/ai-sdk-runtime.js';

View file

@ -70,7 +70,6 @@ export type {
KtxEnrichedRelationship,
KtxEnrichedSchema,
KtxEnrichedTable,
KtxEnrichmentScanPhaseResult,
KtxGenerateColumnDescriptionsInput,
KtxGenerateDataSourceDescriptionInput,
KtxGenerateTableDescriptionInput,
@ -92,9 +91,6 @@ export type {
KtxScanLoggerPort,
KtxScanMetadataStore,
KtxScanMode,
KtxScanOrchestratorOptions,
KtxScanOrchestratorRunInput,
KtxScanOrchestratorRunResult,
KtxScanRelationshipSummary,
KtxScanReport,
KtxScanTrigger,
@ -108,7 +104,6 @@ export type {
KtxSchemaTable,
KtxSchemaTableKind,
KtxSkippedRelationship,
KtxStructuralScanPhaseResult,
KtxStructuralSyncPlan,
KtxStructuralSyncStats,
KtxTableDescriptionPromptInput,
@ -128,7 +123,6 @@ export {
isKtxDataDictionaryCandidate,
ktxColumnTypeMappingFromNative,
KtxDescriptionGenerator,
KtxScanOrchestrator,
normalizeKtxNativeType,
REDACTED_KTX_CREDENTIAL_VALUE,
redactKtxCredentialEnvelope,

View file

@ -7,10 +7,11 @@ vi.mock('ai', () => ({
}));
import { generateText } from 'ai';
import { AgentRunnerService, type RunLoopStepInfo } from './agent-runner.service.js';
import { AiSdkKtxLlmRuntime } from './ai-sdk-runtime.js';
import type { RunLoopStepInfo } from './runtime-port.js';
describe('AgentRunnerService.runLoop', () => {
let runner: AgentRunnerService;
describe('AiSdkKtxLlmRuntime.runAgentLoop', () => {
let runtime: AiSdkKtxLlmRuntime;
const llmProvider = {
getModel: vi.fn().mockReturnValue({ modelId: 'claude-sonnet-4-6', provider: 'anthropic' }),
getModelByName: vi.fn(),
@ -33,7 +34,7 @@ describe('AgentRunnerService.runLoop', () => {
beforeEach(() => {
vi.clearAllMocks();
runner = new AgentRunnerService({ llmProvider: llmProvider as any });
runtime = new AiSdkKtxLlmRuntime({ llmProvider: llmProvider as any });
});
afterEach(() => vi.clearAllMocks());
@ -43,7 +44,7 @@ describe('AgentRunnerService.runLoop', () => {
const repairHandler = vi.fn();
llmProvider.repairToolCallHandler.mockReturnValueOnce(repairHandler);
const tools = { noop: { description: 'noop', inputSchema: {}, execute: vi.fn() } };
await runner.runLoop({
await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: 'SYS',
userPrompt: 'USR',
@ -72,7 +73,7 @@ describe('AgentRunnerService.runLoop', () => {
it('returns stopReason=natural when the loop completes without error', async () => {
(generateText as any).mockResolvedValue({ text: 'done', toolCalls: [], steps: [] });
const result = await runner.runLoop({
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: 'system',
userPrompt: 'user',
@ -94,7 +95,7 @@ describe('AgentRunnerService.runLoop', () => {
it('returns stopReason=error with the error on generateText failure', async () => {
const err = new Error('LLM unavailable');
(generateText as any).mockRejectedValue(err);
const result = await runner.runLoop({
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
@ -115,7 +116,7 @@ describe('AgentRunnerService.runLoop', () => {
return { text: 'ok', toolCalls: [], steps: [] };
});
await runner.runLoop({
await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
@ -140,7 +141,7 @@ describe('AgentRunnerService.runLoop', () => {
return { text: 'ok', toolCalls: [], steps: [] };
});
const result = await runner.runLoop({
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
@ -167,7 +168,7 @@ describe('AgentRunnerService.runLoop', () => {
config: { instance: { name: 'test-instance' } },
},
} as any;
const runnerWithTelemetry = new AgentRunnerService({
const runtimeWithTelemetry = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
telemetry: {
createTelemetry: (tags) => ({
@ -180,7 +181,7 @@ describe('AgentRunnerService.runLoop', () => {
}),
},
});
await runnerWithTelemetry.runLoop({
await runtimeWithTelemetry.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
@ -204,7 +205,7 @@ describe('AgentRunnerService.runLoop', () => {
config: { instance: { name: 'test-instance' } },
},
} as any;
const runnerWithTelemetry = new AgentRunnerService({
const runtimeWithTelemetry = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
telemetry: {
createTelemetry: (tags) => ({
@ -217,7 +218,7 @@ describe('AgentRunnerService.runLoop', () => {
}),
},
});
await runnerWithTelemetry.runLoop({
await runtimeWithTelemetry.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
@ -241,7 +242,7 @@ describe('AgentRunnerService.runLoop', () => {
config: { instance: { name: 'test-instance' } },
},
} as any;
const runnerWithTelemetry = new AgentRunnerService({
const runtimeWithTelemetry = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
telemetry: {
createTelemetry: (tags) => ({
@ -254,7 +255,7 @@ describe('AgentRunnerService.runLoop', () => {
}),
},
});
await runnerWithTelemetry.runLoop({
await runtimeWithTelemetry.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
@ -286,12 +287,12 @@ describe('AgentRunnerService.runLoop', () => {
vertexFallbackTo5m: false,
})),
};
const runnerWithDebug = new AgentRunnerService({
const runtimeWithDebug = new AiSdkKtxLlmRuntime({
llmProvider: provider as any,
debugRequestRecorder: { record },
});
await runnerWithDebug.runLoop({
await runtimeWithDebug.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: 'SECRET SYSTEM PROMPT',
userPrompt: 'SECRET USER PROMPT',

View file

@ -1,12 +0,0 @@
import type { z } from 'zod';
import type { KtxGenerateObjectInput, KtxGenerateTextInput, KtxLlmRuntimePort } from './runtime-port.js';
export async function generateKtxText(input: KtxGenerateTextInput & { runtime: KtxLlmRuntimePort }): Promise<string> {
return input.runtime.generateText(input);
}
export async function generateKtxObject<TOutput, TSchema extends z.ZodType<TOutput>>(
input: KtxGenerateObjectInput<TOutput, TSchema> & { runtime: KtxLlmRuntimePort },
): Promise<TOutput> {
return input.runtime.generateObject(input);
}

View file

@ -4,7 +4,6 @@ export type { AgentTelemetryPort, AiSdkKtxLlmRuntimeDeps } from './ai-sdk-runtim
export { createKtxClaudeCodeEnv, CLAUDE_CODE_PROVIDER_ENV_DENYLIST } from './claude-code-env.js';
export { resolveClaudeCodeModel } from './claude-code-models.js';
export { ClaudeCodeKtxLlmRuntime, mapClaudeCodeStopReason, runClaudeCodeAuthProbe } from './claude-code-runtime.js';
export { generateKtxObject, generateKtxText } from './generation.js';
export type {
AgentRunnerPort,
KtxGenerateObjectInput,

View file

@ -225,8 +225,8 @@ export class MemoryAgentService {
chatId,
},
});
if (runResult.stopReason === 'error' && runResult.error) {
this.logger.warn(`[memory-agent] chat=${chatId} loop failed: ${runResult.error.message}`);
if (runResult.stopReason === 'error') {
throw runResult.error ?? new Error(`[memory-agent] chat=${chatId} loop failed with no error detail`);
}
// Cross-ref + revert gate: still scoped to the session worktree (writes via

View file

@ -65,7 +65,6 @@ describe('@ktx/context package exports', () => {
expect(scan.isKtxDataDictionaryCandidate).toBeTypeOf('function');
expect(scan.buildKtxColumnEmbeddingText).toBeTypeOf('function');
expect(scan.KtxDescriptionGenerator).toBeTypeOf('function');
expect(scan.KtxScanOrchestrator).toBeTypeOf('function');
expect(scan.runLocalScan).toBeTypeOf('function');
expect(scan.writeLocalScanEnrichmentArtifacts).toBeTypeOf('function');
expect(scan.readLocalScanStructuralSnapshot).toBeTypeOf('function');
@ -144,8 +143,8 @@ describe('@ktx/context package exports', () => {
expect(root.assertSearchBackendCapabilities).toBeTypeOf('function');
expect(root.createLocalKtxEmbeddingProviderFromConfig).toBeTypeOf('function');
expect(agent).toBeDefined();
expect(agent.AgentRunnerService).toBeTypeOf('function');
expect(root.AgentRunnerService).toBeTypeOf('function');
expect(agent.RuntimeAgentRunner).toBeTypeOf('function');
expect(root.RuntimeAgentRunner).toBeTypeOf('function');
expect(root.createLocalKtxLlmProviderFromConfig).toBeTypeOf('function');
expect(prompts).toBeDefined();
expect(skills).toBeDefined();

View file

@ -28,7 +28,7 @@ export class PromptService {
constructor(private readonly options: PromptServiceOptions) {
this.logger = options.logger ?? noopLogger;
this.partials = options.partials ?? ['clinical_policy'];
this.partials = options.partials ?? [];
Handlebars.registerHelper('eq', (a: unknown, b: unknown) => a === b);
Handlebars.registerHelper('json', (context: unknown) => JSON.stringify(context, null, 2));
Handlebars.registerHelper('truncate', (str: string, len: number) =>

View file

@ -124,14 +124,6 @@ export type {
export { filterSnapshotTables, getLocalScanReport, getLocalScanStatus, resolveEnabledTables, runLocalScan } from './local-scan.js';
export type { ReadLocalScanStructuralSnapshotInput } from './local-structural-artifacts.js';
export { readLocalScanStructuralSnapshot } from './local-structural-artifacts.js';
export type {
KtxEnrichmentScanPhaseResult,
KtxScanOrchestratorOptions,
KtxScanOrchestratorRunInput,
KtxScanOrchestratorRunResult,
KtxStructuralScanPhaseResult,
} from './orchestrator.js';
export { KtxScanOrchestrator } from './orchestrator.js';
export type {
KtxRelationshipArtifactStatus,
ReadLocalScanRelationshipArtifactsResult,

View file

@ -32,9 +32,40 @@ import type {
KtxScanMode,
KtxScanReport,
KtxScanTrigger,
KtxScanWarning,
KtxSchemaSnapshot,
} from './types.js';
function enrichmentResolutionWarning(
status: 'missing-embeddings-config' | 'missing-llm' | 'missing-embeddings-provider',
): KtxScanWarning {
if (status === 'missing-llm') {
return {
code: 'llm_unavailable',
message:
'scan.enrichment.mode is "llm" but the LLM provider could not be resolved from llm.provider config; LLM-driven enrichment was skipped.',
recoverable: true,
metadata: { reason: status },
};
}
if (status === 'missing-embeddings-config') {
return {
code: 'embedding_unavailable',
message:
'scan.enrichment.mode is "llm" but scan.enrichment.embeddings is not configured; embedding enrichment was skipped.',
recoverable: true,
metadata: { reason: status },
};
}
return {
code: 'embedding_unavailable',
message:
'scan.enrichment.mode is "llm" but the embedding provider could not be resolved from scan.enrichment.embeddings config; embedding enrichment was skipped.',
recoverable: true,
metadata: { reason: status },
};
}
export interface RunLocalScanOptions {
project: KtxLocalProject;
connectionId: string;
@ -152,34 +183,58 @@ interface LocalScanEnrichmentProviderDeps {
embeddingProvider?: KtxEmbeddingProvider | null;
}
export function createLocalScanEnrichmentProvidersFromConfig(
type LocalScanEnrichmentProviderResolution =
| { status: 'ready'; providers: KtxLocalScanEnrichmentProviders }
| { status: 'disabled' }
| { status: 'missing-embeddings-config' }
| { status: 'missing-llm' }
| { status: 'missing-embeddings-provider' };
function resolveLocalScanEnrichmentProviders(
config: KtxScanEnrichmentConfig,
llmConfig: KtxProjectLlmConfig,
deps: LocalScanEnrichmentProviderDeps = {},
): KtxLocalScanEnrichmentProviders | null {
): LocalScanEnrichmentProviderResolution {
if (config.mode === 'deterministic') {
return createDeterministicLocalScanEnrichmentProviders();
return { status: 'ready', providers: createDeterministicLocalScanEnrichmentProviders() };
}
if (config.mode !== 'llm' || !config.embeddings) {
return null;
if (config.mode !== 'llm') {
return { status: 'disabled' };
}
if (!config.embeddings) {
return { status: 'missing-embeddings-config' };
}
const llmRuntime = createLocalKtxLlmRuntimeFromConfig(llmConfig, {
...deps,
projectDir: deps.projectDir,
});
if (!llmRuntime) {
return { status: 'missing-llm' };
}
const embeddingProvider = deps.embeddingProvider ?? null;
if (!llmRuntime || !embeddingProvider) {
return null;
if (!embeddingProvider) {
return { status: 'missing-embeddings-provider' };
}
return {
llmRuntime,
embedding: new KtxScanEmbeddingPortAdapter(embeddingProvider),
status: 'ready',
providers: {
llmRuntime,
embedding: new KtxScanEmbeddingPortAdapter(embeddingProvider),
},
};
}
export function createLocalScanEnrichmentProvidersFromConfig(
config: KtxScanEnrichmentConfig,
llmConfig: KtxProjectLlmConfig,
deps: LocalScanEnrichmentProviderDeps = {},
): KtxLocalScanEnrichmentProviders | null {
const resolved = resolveLocalScanEnrichmentProviders(config, llmConfig, deps);
return resolved.status === 'ready' ? resolved.providers : null;
}
function createLocalScanEnrichmentStateStore(options: RunLocalScanOptions): SqliteLocalScanEnrichmentStateStore | null {
if (options.dryRun) {
return null;
@ -314,8 +369,13 @@ async function readScanReport(
try {
const raw = await project.fileStore.readFile(scanReportPath(connectionId, syncId));
return JSON.parse(raw.content) as KtxScanReport;
} catch {
return null;
} catch (error) {
if (error instanceof Error && (error as NodeJS.ErrnoException).code === 'ENOENT') {
return null;
}
throw new Error(
`Failed to read scan report for ${connectionId}/${syncId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
@ -363,14 +423,22 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
const adapters =
options.adapters ??
createDefaultLocalIngestAdapters(options.project, { databaseIntrospectionUrl: options.databaseIntrospectionUrl });
let enrichmentResolution: LocalScanEnrichmentProviderResolution | null = null;
const enrichmentProviders =
connector && (mode !== 'structural' || options.detectRelationships)
? options.enrichmentProviders !== undefined
? options.enrichmentProviders
: createLocalScanEnrichmentProvidersFromConfig(options.project.config.scan.enrichment, options.project.config.llm, {
projectDir: options.project.projectDir,
embeddingProvider: options.embeddingProvider ?? null,
})
: (() => {
enrichmentResolution = resolveLocalScanEnrichmentProviders(
options.project.config.scan.enrichment,
options.project.config.llm,
{
projectDir: options.project.projectDir,
embeddingProvider: options.embeddingProvider ?? null,
},
);
return enrichmentResolution.status === 'ready' ? enrichmentResolution.providers : null;
})()
: null;
await options.progress?.update(0.15, 'Inspecting database schema');
@ -483,6 +551,9 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
enrichmentState = enrichment.state;
report.enrichmentState = enrichmentState;
report.warnings.push(...enrichment.warnings);
if (enrichmentResolution && enrichmentResolution.status !== 'ready' && enrichmentResolution.status !== 'disabled') {
report.warnings.push(enrichmentResolutionWarning(enrichmentResolution.status));
}
report.artifactPaths.enrichmentArtifacts = artifacts.enrichmentArtifacts;
report.artifactPaths.manifestShards = artifacts.manifestShards;
report.manifestShardsWritten = artifacts.manifestShardsWritten;

View file

@ -1,376 +0,0 @@
import { describe, expect, it, vi } from 'vitest';
import {
createKtxConnectorCapabilities,
type KtxScanConnector,
type KtxScanContext,
type KtxScanEnrichmentStateSummary,
type KtxScanInput,
KtxScanOrchestrator,
type KtxSchemaSnapshot,
} from './index.js';
function snapshot(): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: '2026-04-29T00:00:00.000Z',
scope: { schemas: ['public'] },
metadata: { source: 'test' },
tables: [
{
catalog: null,
db: 'public',
name: 'orders',
kind: 'table',
comment: 'Orders table',
estimatedRows: null,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: 'Order id',
},
],
foreignKeys: [],
},
],
};
}
function connector(
capabilities = createKtxConnectorCapabilities({ tableSampling: true, columnSampling: true }),
): KtxScanConnector {
return {
id: 'connector-1',
driver: 'postgres',
capabilities,
introspect: vi.fn(async () => snapshot()),
};
}
function context(): KtxScanContext {
return {
runId: 'scan-run-1',
logger: {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
};
}
const input: KtxScanInput = {
connectionId: 'warehouse',
driver: 'postgres',
mode: 'structural',
};
describe('KtxScanOrchestrator', () => {
it('runs structural scans through connector introspection and structural host callback', async () => {
const scanConnector = connector();
const scanContext = context();
const runStructural = vi.fn(async (scanSnapshot: KtxSchemaSnapshot) => ({
result: { synced: true },
diffSummary: { tablesAdded: scanSnapshot.tables.length, columnsAdded: 1 },
structuralSyncStats: { tablesCreated: 1, columnsCreated: 1 },
artifactPaths: { manifestShards: ['semantic-layer/warehouse/_schema/public.yaml'] },
}));
const result = await new KtxScanOrchestrator({
now: () => new Date('2026-04-29T00:10:00.000Z'),
syncIdFactory: () => 'sync-1',
}).run({
connector: scanConnector,
input,
trigger: 'schema_scan',
context: scanContext,
runStructural,
});
expect(scanConnector.introspect).toHaveBeenCalledWith(input, scanContext);
expect(runStructural).toHaveBeenCalledWith(snapshot(), scanContext);
expect(result.snapshot.connectionId).toBe('warehouse');
expect(result.structural.result).toEqual({ synced: true });
expect(result.enrichment).toBeNull();
expect(result.report).toMatchObject({
connectionId: 'warehouse',
driver: 'postgres',
syncId: 'sync-1',
runId: 'scan-run-1',
trigger: 'schema_scan',
mode: 'structural',
dryRun: false,
diffSummary: {
tablesAdded: 1,
columnsAdded: 1,
},
structuralSyncStats: {
tablesCreated: 1,
columnsCreated: 1,
},
manifestShardsWritten: 1,
artifactPaths: {
manifestShards: ['semantic-layer/warehouse/_schema/public.yaml'],
},
enrichment: {
dataDictionary: 'skipped',
columnDescriptions: 'skipped',
tableDescriptions: 'skipped',
embeddings: 'skipped',
deterministicRelationships: 'skipped',
llmRelationshipValidation: 'skipped',
statisticalValidation: 'skipped',
},
enrichmentState: {
resumedStages: [],
completedStages: [],
failedStages: [],
},
createdAt: '2026-04-29T00:10:00.000Z',
});
});
it('runs enriched scans through structural and enrichment host callbacks', async () => {
const scanConnector = connector(
createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
}),
);
const scanContext = context();
const result = await new KtxScanOrchestrator({ syncIdFactory: () => 'sync-2' }).run({
connector: scanConnector,
input: { ...input, mode: 'enriched', detectRelationships: true },
trigger: 'schema_scan',
context: scanContext,
runStructural: vi.fn(async () => ({
result: { schemaId: 'schema-1' },
structuralSyncStats: { tablesCreated: 1 },
})),
runEnrichment: vi.fn(async () => ({
result: { enriched: true },
enrichment: {
dataDictionary: 'completed',
columnDescriptions: 'completed',
tableDescriptions: 'completed',
embeddings: 'completed',
deterministicRelationships: 'completed',
statisticalValidation: 'completed',
} as const,
relationships: { accepted: 2, rejected: 1 },
})),
});
expect(result.enrichment?.result).toEqual({ enriched: true });
expect(result.report.enrichment.columnDescriptions).toBe('completed');
expect(result.report.relationships).toEqual({ accepted: 2, review: 0, rejected: 1, skipped: 0 });
expect(result.report.capabilityGaps).toEqual([]);
expect(result.report.warnings).toEqual([]);
});
it('reports host enrichment state summaries from enriched scan phases', async () => {
const scanConnector = connector(
createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
}),
);
const enrichmentState: Partial<KtxScanEnrichmentStateSummary> = {
resumedStages: ['relationships', 'descriptions', 'descriptions'],
completedStages: ['embeddings', 'descriptions', 'relationships'],
failedStages: [],
};
const result = await new KtxScanOrchestrator({ syncIdFactory: () => 'sync-state' }).run({
connector: scanConnector,
input: { ...input, mode: 'enriched', detectRelationships: true },
trigger: 'schema_scan',
context: context(),
runStructural: vi.fn(async () => ({ result: { synced: true } })),
runEnrichment: vi.fn(async () => ({
result: { enriched: true },
enrichmentState,
})),
});
expect(result.report.enrichmentState).toEqual({
resumedStages: ['descriptions', 'relationships'],
completedStages: ['descriptions', 'embeddings', 'relationships'],
failedStages: [],
});
});
it('records recoverable warnings for missing optional capabilities during enriched scans', async () => {
const result = await new KtxScanOrchestrator({ syncIdFactory: () => 'sync-3' }).run({
connector: connector(createKtxConnectorCapabilities()),
input: { ...input, mode: 'enriched', detectRelationships: true },
trigger: 'schema_scan',
context: context(),
runStructural: vi.fn(async () => ({ result: {} })),
runEnrichment: vi.fn(async () => ({ result: {} })),
});
expect(result.report.capabilityGaps).toEqual(['tableSampling', 'columnSampling', 'columnStats', 'readOnlySql']);
expect(result.report.warnings.map((warning) => warning.code)).toEqual([
'connector_capability_missing',
'connector_capability_missing',
'connector_capability_missing',
'connector_capability_missing',
]);
expect(result.report.warnings.every((warning) => warning.recoverable)).toBe(true);
});
it('redacts structural and enrichment warning metadata before returning reports', async () => {
const result = await new KtxScanOrchestrator({ syncIdFactory: () => 'sync-redacted' }).run({
connector: connector(
createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
}),
),
input: { ...input, mode: 'enriched' },
trigger: 'schema_scan',
context: context(),
runStructural: vi.fn(async () => ({
result: {},
warnings: [
{
code: 'sampling_failed',
message: 'structural warning',
recoverable: true,
metadata: {
url: 'postgres://reader:secret@example.test/db', // pragma: allowlist secret
table: 'orders',
},
} as const,
],
})),
runEnrichment: vi.fn(async () => ({
result: {},
warnings: [
{
code: 'embedding_unavailable',
message: 'enrichment warning',
recoverable: true,
metadata: {
nested: {
api_key: 'sk_test_123', // pragma: allowlist secret
schema: 'public',
},
},
} as const,
],
})),
});
expect(result.report.warnings).toEqual([
{
code: 'sampling_failed',
message: 'structural warning',
recoverable: true,
metadata: {
url: '<redacted>',
table: 'orders',
},
},
{
code: 'embedding_unavailable',
message: 'enrichment warning',
recoverable: true,
metadata: {
nested: {
api_key: '<redacted>',
schema: 'public',
},
},
},
]);
});
it('keeps structural results when the enrichment phase fails after structural sync', async () => {
const scanConnector = connector(
createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
}),
);
const runStructural = vi.fn(async () => ({
result: { synced: true },
artifactPaths: {
rawSourcesDir: 'raw-sources/warehouse/live-database/sync-failed-enrichment',
manifestShards: ['semantic-layer/warehouse/_schema/public.yaml'],
},
manifestShardsWritten: 1,
}));
const runEnrichment = vi.fn(async () => {
throw new Error('AI Gateway timed out');
});
const result = await new KtxScanOrchestrator({
now: () => new Date('2026-04-29T18:00:00.000Z'),
syncIdFactory: () => 'sync-failed-enrichment',
}).run({
connector: scanConnector,
input: { ...input, mode: 'enriched', detectRelationships: true },
trigger: 'schema_scan',
context: context(),
runStructural,
runEnrichment,
});
expect(result.structural.result).toEqual({ synced: true });
expect(result.enrichment).toBeNull();
expect(result.report.artifactPaths.manifestShards).toEqual(['semantic-layer/warehouse/_schema/public.yaml']);
expect(result.report.manifestShardsWritten).toBe(1);
expect(result.report.enrichment).toEqual({
dataDictionary: 'failed',
tableDescriptions: 'failed',
columnDescriptions: 'failed',
embeddings: 'failed',
deterministicRelationships: 'failed',
llmRelationshipValidation: 'failed',
statisticalValidation: 'failed',
});
expect(result.report.warnings).toEqual([
{
code: 'enrichment_failed',
message: 'KTX scan enrichment failed after structural scan completed: AI Gateway timed out',
recoverable: true,
metadata: {
mode: 'enriched',
detectRelationships: true,
},
},
]);
});
it('marks dry-run reports without changing host callback behavior', async () => {
const runStructural = vi.fn(async () => ({ result: { planned: true }, manifestShardsWritten: 0 }));
const result = await new KtxScanOrchestrator({ syncIdFactory: () => 'sync-4' }).run({
connector: connector(),
input: { ...input, dryRun: true },
trigger: 'cli',
context: context(),
runStructural,
});
expect(runStructural).toHaveBeenCalledTimes(1);
expect(result.report.dryRun).toBe(true);
expect(result.report.trigger).toBe('cli');
});
});

View file

@ -1,297 +0,0 @@
import { redactKtxScanReport } from './credentials.js';
import { completedKtxScanEnrichmentStateSummary, summarizeKtxScanEnrichmentState } from './enrichment-state.js';
import {
failedKtxScanEnrichmentSummary,
ktxScanErrorMessage,
skippedKtxScanEnrichmentSummary,
} from './enrichment-summary.js';
import type {
KtxConnectorCapabilities,
KtxScanArtifactPaths,
KtxScanConnector,
KtxScanContext,
KtxScanDiffSummary,
KtxScanEnrichmentSummary,
KtxScanEnrichmentStateSummary,
KtxScanInput,
KtxScanRelationshipSummary,
KtxScanReport,
KtxScanTrigger,
KtxScanWarning,
KtxSchemaSnapshot,
KtxStructuralSyncStats,
} from './types.js';
type CapabilityGap = keyof Omit<KtxConnectorCapabilities, 'structuralIntrospection'>;
export interface KtxStructuralScanPhaseResult<TResult = unknown> {
result: TResult;
diffSummary?: Partial<KtxScanDiffSummary>;
structuralSyncStats?: Partial<KtxStructuralSyncStats>;
manifestShardsWritten?: number;
artifactPaths?: Partial<KtxScanArtifactPaths>;
relationships?: Partial<KtxScanRelationshipSummary>;
warnings?: KtxScanWarning[];
}
export interface KtxEnrichmentScanPhaseResult<TResult = unknown> {
result: TResult;
enrichment?: Partial<KtxScanEnrichmentSummary>;
enrichmentState?: Partial<KtxScanEnrichmentStateSummary>;
manifestShardsWritten?: number;
artifactPaths?: Partial<KtxScanArtifactPaths>;
relationships?: Partial<KtxScanRelationshipSummary>;
warnings?: KtxScanWarning[];
}
export interface KtxScanOrchestratorRunInput<TStructuralResult = unknown, TEnrichmentResult = unknown> {
connector: KtxScanConnector;
input: KtxScanInput;
trigger: KtxScanTrigger;
context: KtxScanContext;
syncId?: string;
runStructural: (
snapshot: KtxSchemaSnapshot,
context: KtxScanContext,
) => Promise<KtxStructuralScanPhaseResult<TStructuralResult>>;
runEnrichment?: (
snapshot: KtxSchemaSnapshot,
structural: KtxStructuralScanPhaseResult<TStructuralResult>,
context: KtxScanContext,
) => Promise<KtxEnrichmentScanPhaseResult<TEnrichmentResult>>;
}
export interface KtxScanOrchestratorRunResult<TStructuralResult = unknown, TEnrichmentResult = unknown> {
snapshot: KtxSchemaSnapshot;
structural: KtxStructuralScanPhaseResult<TStructuralResult>;
enrichment: KtxEnrichmentScanPhaseResult<TEnrichmentResult> | null;
report: KtxScanReport;
}
export interface KtxScanOrchestratorOptions {
now?: () => Date;
syncIdFactory?: (input: KtxScanInput, context: KtxScanContext) => string;
}
const emptyDiffSummary: KtxScanDiffSummary = {
tablesAdded: 0,
tablesModified: 0,
tablesDeleted: 0,
tablesUnchanged: 0,
columnsAdded: 0,
columnsModified: 0,
columnsDeleted: 0,
};
const emptyStructuralSyncStats: KtxStructuralSyncStats = {
tablesCreated: 0,
tablesUpdated: 0,
tablesDeleted: 0,
columnsCreated: 0,
columnsUpdated: 0,
columnsDeleted: 0,
};
const emptyArtifactPaths: KtxScanArtifactPaths = {
rawSourcesDir: null,
reportPath: null,
manifestShards: [],
enrichmentArtifacts: [],
};
function mergeDiffSummary(input?: Partial<KtxScanDiffSummary>): KtxScanDiffSummary {
return { ...emptyDiffSummary, ...input };
}
function mergeStructuralSyncStats(input?: Partial<KtxStructuralSyncStats>): KtxStructuralSyncStats {
return { ...emptyStructuralSyncStats, ...input };
}
function mergeEnrichmentSummary(input?: Partial<KtxScanEnrichmentSummary>): KtxScanEnrichmentSummary {
return { ...skippedKtxScanEnrichmentSummary, ...input };
}
function mergeEnrichmentState(input?: Partial<KtxScanEnrichmentStateSummary>): KtxScanEnrichmentStateSummary {
if (!input) {
return completedKtxScanEnrichmentStateSummary();
}
return summarizeKtxScanEnrichmentState({
resumedStages: input.resumedStages ?? [],
completedStages: input.completedStages ?? [],
failedStages: input.failedStages ?? [],
});
}
function mergeArtifactPaths(
structural?: Partial<KtxScanArtifactPaths>,
enrichment?: Partial<KtxScanArtifactPaths>,
): KtxScanArtifactPaths {
return {
...emptyArtifactPaths,
...structural,
...enrichment,
manifestShards: [...(structural?.manifestShards ?? []), ...(enrichment?.manifestShards ?? [])],
enrichmentArtifacts: [...(structural?.enrichmentArtifacts ?? []), ...(enrichment?.enrichmentArtifacts ?? [])],
};
}
function mergeRelationshipSummary(
structural?: Partial<KtxScanRelationshipSummary>,
enrichment?: Partial<KtxScanRelationshipSummary>,
): KtxScanRelationshipSummary {
return {
accepted: (structural?.accepted ?? 0) + (enrichment?.accepted ?? 0),
review: (structural?.review ?? 0) + (enrichment?.review ?? 0),
rejected: (structural?.rejected ?? 0) + (enrichment?.rejected ?? 0),
skipped: (structural?.skipped ?? 0) + (enrichment?.skipped ?? 0),
};
}
function manifestShardsWritten(phase: {
manifestShardsWritten?: number;
artifactPaths?: Partial<KtxScanArtifactPaths>;
}): number {
return phase.manifestShardsWritten ?? phase.artifactPaths?.manifestShards?.length ?? 0;
}
function requiredCapabilities(mode: KtxScanInput['mode'], detectRelationships: boolean | undefined): CapabilityGap[] {
const required = new Set<CapabilityGap>();
if (mode === 'enriched') {
required.add('tableSampling');
required.add('columnSampling');
required.add('columnStats');
required.add('readOnlySql');
}
if (mode === 'relationships' || detectRelationships) {
required.add('columnStats');
required.add('readOnlySql');
}
return [...required];
}
function capabilityGaps(capabilities: KtxConnectorCapabilities, input: KtxScanInput): CapabilityGap[] {
return requiredCapabilities(input.mode ?? 'structural', input.detectRelationships).filter(
(capability) => !capabilities[capability],
);
}
function warningsForCapabilityGaps(gaps: CapabilityGap[]): KtxScanWarning[] {
return gaps.map((gap) => ({
code: 'connector_capability_missing',
message: `KTX scan connector is missing optional capability: ${gap}`,
recoverable: true,
metadata: { capability: gap },
}));
}
function assertNotAborted(context: KtxScanContext): void {
if (context.signal?.aborted) {
throw new Error('KTX scan aborted');
}
}
export class KtxScanOrchestrator {
private readonly now: () => Date;
private readonly syncIdFactory: (input: KtxScanInput, context: KtxScanContext) => string;
constructor(options: KtxScanOrchestratorOptions = {}) {
this.now = options.now ?? (() => new Date());
this.syncIdFactory = options.syncIdFactory ?? ((_, context) => context.runId);
}
async run<TStructuralResult = unknown, TEnrichmentResult = unknown>(
input: KtxScanOrchestratorRunInput<TStructuralResult, TEnrichmentResult>,
): Promise<KtxScanOrchestratorRunResult<TStructuralResult, TEnrichmentResult>> {
const mode = input.input.mode ?? 'structural';
const syncId = input.syncId ?? this.syncIdFactory(input.input, input.context);
const gaps = capabilityGaps(input.connector.capabilities, input.input);
const warnings = warningsForCapabilityGaps(gaps);
input.context.logger?.info('Starting KTX scan', {
connectionId: input.input.connectionId,
connectorId: input.connector.id,
mode,
trigger: input.trigger,
});
assertNotAborted(input.context);
const snapshot = await input.connector.introspect(input.input, input.context);
assertNotAborted(input.context);
const structural = await input.runStructural(snapshot, input.context);
let enrichment: KtxEnrichmentScanPhaseResult<TEnrichmentResult> | null = null;
let failedEnrichment: KtxScanEnrichmentSummary | null = null;
if (mode !== 'structural' || input.input.detectRelationships) {
if (input.runEnrichment) {
assertNotAborted(input.context);
try {
enrichment = await input.runEnrichment(snapshot, structural, input.context);
} catch (error) {
const message = ktxScanErrorMessage(error);
failedEnrichment = failedKtxScanEnrichmentSummary(mode, input.input.detectRelationships ?? false);
warnings.push({
code: 'enrichment_failed',
message: `KTX scan enrichment failed after structural scan completed: ${message}`,
recoverable: true,
metadata: { mode, detectRelationships: input.input.detectRelationships ?? false },
});
input.context.logger?.warn('KTX scan enrichment failed after structural scan completed', {
connectionId: input.input.connectionId,
runId: input.context.runId,
mode,
error: message,
});
}
} else {
failedEnrichment = failedKtxScanEnrichmentSummary(mode, input.input.detectRelationships ?? false);
warnings.push({
code: 'connector_capability_missing',
message: 'KTX scan requested enrichment or relationship detection, but no enrichment phase was provided',
recoverable: true,
metadata: { mode, detectRelationships: input.input.detectRelationships ?? false },
});
}
}
const manifestShardCount = manifestShardsWritten(structural) + (enrichment ? manifestShardsWritten(enrichment) : 0);
const report: KtxScanReport = redactKtxScanReport({
connectionId: input.input.connectionId,
driver: input.input.driver,
syncId,
runId: input.context.runId,
trigger: input.trigger,
mode,
dryRun: input.input.dryRun ?? false,
artifactPaths: mergeArtifactPaths(structural.artifactPaths, enrichment?.artifactPaths),
diffSummary: mergeDiffSummary(structural.diffSummary),
manifestShardsWritten: manifestShardCount,
structuralSyncStats: mergeStructuralSyncStats(structural.structuralSyncStats),
enrichment: mergeEnrichmentSummary(enrichment?.enrichment ?? failedEnrichment ?? undefined),
capabilityGaps: gaps,
warnings: [...warnings, ...(structural.warnings ?? []), ...(enrichment?.warnings ?? [])],
relationships: mergeRelationshipSummary(structural.relationships, enrichment?.relationships),
enrichmentState: mergeEnrichmentState(enrichment?.enrichmentState),
createdAt: this.now().toISOString(),
});
input.context.logger?.info('Completed KTX scan', {
connectionId: report.connectionId,
runId: report.runId,
syncId: report.syncId,
warnings: report.warnings.length,
});
return {
snapshot,
structural,
enrichment,
report,
};
}
}

View file

@ -1,5 +1,5 @@
import { z } from 'zod';
import { generateKtxObject, type KtxLlmRuntimePort } from '../llm/index.js';
import type { KtxLlmRuntimePort } from '../llm/index.js';
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import {
normalizeKtxRelationshipName,
@ -240,11 +240,10 @@ export async function proposeKtxRelationshipCandidatesWithLlm(
const prompt = JSON.stringify(evidence);
try {
const generated = await generateKtxObject<
const generated = await input.llmRuntime.generateObject<
KtxRelationshipLlmProposalOutput,
typeof relationshipLlmProposalSchema
>({
runtime: input.llmRuntime,
role: 'candidateExtraction',
system,
prompt,

View file

@ -115,6 +115,7 @@ describe('relationship validation', () => {
profiles,
executor,
ctx: { runId: 'validate-test' },
tableCount: testSchema.tables.length,
});
expect(validated).toHaveLength(1);
@ -163,6 +164,7 @@ describe('relationship validation', () => {
profiles,
executor,
ctx: { runId: 'validate-test' },
tableCount: testSchema.tables.length,
settings: {
minSourceCoverage: 0.9,
maxViolationRatio: 0.01,
@ -332,6 +334,7 @@ describe('relationship validation', () => {
profiles,
executor,
ctx: { runId: 'llm-rejected-validation' },
tableCount: testSchema.tables.length,
});
expect(validated?.status).toBe('rejected');
@ -385,6 +388,7 @@ describe('relationship validation', () => {
profiles,
executor: throttled,
ctx: { runId: 'validation-concurrency' },
tableCount: testSchema.tables.length,
settings: { concurrency: 1 },
});
@ -476,6 +480,7 @@ describe('relationship validation', () => {
profiles,
executor,
ctx: { runId: 'rule-b-validation-score' },
tableCount: 2,
});
expect(validated).toMatchObject({

View file

@ -337,7 +337,7 @@ export async function validateKtxRelationshipDiscoveryCandidates(
const budgeted = applyKtxRelationshipValidationBudget({
candidates: input.candidates,
tableCount: input.tableCount ?? 0,
budget: settings.validationBudget ?? (input.tableCount === undefined ? 'all' : undefined),
budget: settings.validationBudget,
score: (candidate) => candidate.confidence,
});
const validated = await mapWithConcurrency(

View file

@ -74,10 +74,12 @@ export class ContextEvidenceSearchTool extends BaseTool<typeof contextEvidenceSe
}
let queryEmbedding: number[] | null = null;
let embeddingUnhealthyReason: string | null = null;
try {
queryEmbedding = await this.embeddingService.computeEmbedding(input.query);
} catch {
} catch (error) {
queryEmbedding = null;
embeddingUnhealthyReason = error instanceof Error ? error.message : String(error);
}
const connectionId = input.connectionId ?? context.connectionId ?? context.session?.connectionId;
@ -102,16 +104,20 @@ export class ContextEvidenceSearchTool extends BaseTool<typeof contextEvidenceSe
currentRunId: ingest.runId,
});
const embeddingHealthSuffix = embeddingUnhealthyReason
? ` (semantic lane skipped: embedding_unhealthy:${embeddingUnhealthyReason})`
: '';
if (results.length === 0) {
return {
markdown: `No context evidence found for "${input.query}".`,
markdown: `No context evidence found for "${input.query}"${embeddingHealthSuffix}.`,
structured: { success: true, results: [], totalFound: 0 },
};
}
return {
markdown: [
`Found ${results.length} evidence chunk(s):`,
`Found ${results.length} evidence chunk(s)${embeddingHealthSuffix}:`,
'',
...results.map((result, index) => {
const reasonLine =

View file

@ -46,7 +46,10 @@ export class WikiSearchTool extends BaseTool<typeof WikiSearchInputSchema> {
get description(): string {
return (
'Search knowledge blocks by hybrid lexical, semantic, and token matching. ' +
'Search knowledge blocks. Active lanes vary by project storage: ' +
'projects on sqlite-fts5 storage use hybrid lexical + token + semantic matching, ' +
'others fall back to token-only matching. ' +
'Inspect `lanes` and `matchReasons` on each result to see which lanes contributed. ' +
'Use this when you need to find knowledge on a topic not visible in the discovery index. ' +
'Returns ranked summaries — use wiki_read to load the full content of specific results.'
);