fix(snowflake): unblock multi-schema ingest and relationship discovery (#204)

* feat(setup): drop redundant Snowflake schema prompt; fall back to free-text on listSchemas failure

Snowflake setup previously asked for a single schema as free text, then
ran a multiselect against the discovered schemas — two schema questions
back-to-back, with the first being only a session bootstrap. The SDK's
`schema` is optional, so the bootstrap step is unnecessary.

- Remove the free-text Snowflake schema prompt; only pass `schema` to
  snowflake-sdk when one is configured.
- When `listSchemas()` fails (e.g. role lacks SHOW SCHEMAS), prompt the
  user for a comma-separated list, persist it as `schema_names`, and use
  it as both the table-list filter and the multiselect default. Applies
  to every driver with a scope-discovery spec, not just Snowflake.
- Update docs to lead with `schema_names`; keep `schema_name` as a
  documented single-schema shorthand.

* fix(snowflake): keep introspecting when primary-key discovery is denied

The PK query joins INFORMATION_SCHEMA.TABLE_CONSTRAINTS and
INFORMATION_SCHEMA.KEY_COLUMN_USAGE, which require grants the
connection role may not have. Previously a 'SQL compilation error:
Object ANALYTICS.INFORMATION_SCHEMA.KEY_COLUMN_USAGE does not exist
or not authorized' aborted the entire introspect — schemas, columns,
and row counts were all discarded over a missing nice-to-have.

Wrap the constraint query in try/catch, log a one-line warning per
schema, and return an empty PK map. Columns end up with
primaryKey=false; relationship inference still has FK and profiling
to fall back on.

* fix(scan): unblock relationship discovery on Snowflake

Two adjacent bugs prevented the scan's relationship pipeline from producing
any joins on a Snowflake warehouse:

- relationship-profiling.ts fell through to a default `GROUP_CONCAT` branch
  for unknown drivers. Snowflake has no GROUP_CONCAT, so every per-table
  profile query failed with "Unknown function GROUP_CONCAT". Add an explicit
  Snowflake branch that uses LISTAGG with a literal '\x1f' delimiter
  (Snowflake requires the delimiter to be a constant, so CHR(31) is rejected).
- description-generation.ts destructured `connector.sampleTable` and
  `connector.sampleColumn` into bare locals, losing the `this` binding when
  the class-method connectors (Snowflake, Postgres, MySQL) were invoked.
  Every sample call threw "Cannot read properties of undefined (reading
  'assertConnection')" and degraded LLM descriptions to metadata-only
  prompts. Call the methods through the connector instead.

Without these, even after the primary-key probe is allowed to fail softly,
the scan ends up with 0 validated relationships and an empty `joins:` block
in every shard YAML.

* test(scan): cover table-ref helpers

* feat(scan): plumb tableScope through live-database introspection port

* feat(scan): apply tableScope during metadata fetch

* feat(scan): enforce table scope at fetch boundary

* feat(scan): pool Snowflake sessions and batch enrichment for faster ingest (#206)

* feat(cli): add RSA key-pair auth option to Snowflake setup wizard

Extends the interactive Snowflake setup flow with an authentication-method
prompt (password vs RSA/JWT key-pair). The RSA branch collects a private-key
path (env/file/absolute) and an optional passphrase; the resulting connection
config records `authMethod: 'rsa'` with `privateKey` and `passphrase` instead
of `password`.

* feat(scan): pool Snowflake sessions

* fix(scan): reuse structural snapshots and cleanup connectors

* feat(scan): parallelize relationship profiling

* feat(scan): batch table description generation

* docs: document Snowflake ingest concurrency knobs

* fix(scan): close Snowflake ingest perf verification gaps

* fix(scan): keep batched description failure bounded

* feat(scan): dispatch query-history probes by connection driver

Extract historic-sql dialect resolution into a shared helper so the
status-project readiness check and the local ingest factory agree on
which connections enable query history and which probe to run. The
status command now picks the postgres/snowflake/bigquery probe based on
the connection's driver instead of always reporting against postgres,
which previously caused snowflake connections with queryHistory.enabled
to surface a misleading "driver is snowflake" failure.

Also drops a noisy console.warn from Snowflake primary-key discovery —
INFORMATION_SCHEMA.KEY_COLUMN_USAGE is commonly ungranted for read-only
roles and the FK + profiling paths handle the empty PK map already.

* fix(llm): allow StructuredOutput tool and raise maxTurns for generateObject

The Claude Code agent SDK announces an internal pseudo-tool named
StructuredOutput in the system/init message whenever outputFormat is set
to { type: 'json_schema' }. The runtime's isolation check built its
allowedToolIds set only from MCP tool ids and treated StructuredOutput
as an unexpected host-injected tool, so every generateObject call threw
"Claude Code runtime isolation failed: tools=StructuredOutput ..." and
the table-descriptions and relationship-LLM-proposal enrichment stages
recorded null output across the board.

Whitelist StructuredOutput specifically in generateObject's
allowedToolIds — the check also enforces missing_tools symmetry, so
generateText and runAgentLoop, which do not see StructuredOutput, must
not require it.

generateObject also ran with maxTurns: 1, which the model intermittently
breached when it emitted thinking text before the structured response.
Raised to 5 to give the schema-bound call enough headroom without
allowing unbounded loops. The existing tests now exercise the path with
an init message that announces StructuredOutput so the regression cannot
slip back in.

* chore(scripts): add ktx-reset.sh project-cleanup helper

Convenience script for repeatable ingest testing: takes a project
directory and prunes everything except ktx.yaml and .ktx/secrets/, so
the next ktx setup or ktx ingest run starts from a known-clean state.
This commit is contained in:
Andrey Avtomonov 2026-05-23 10:41:30 +02:00 committed by GitHub
parent b0dd13ce7c
commit 394a985d2a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
72 changed files with 3508 additions and 655 deletions

View file

@ -0,0 +1,48 @@
import type { HistoricSqlDialect } from './types.js';
const KNOWN_DIALECTS = ['postgres', 'bigquery', 'snowflake'] as const;
function isKnownDialect(value: string): value is HistoricSqlDialect {
return (KNOWN_DIALECTS as readonly string[]).includes(value);
}
function recordOrNull(value: unknown): Record<string, unknown> | null {
return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record<string, unknown>) : null;
}
function historicSqlRecord(connection: unknown): Record<string, unknown> | null {
const conn = recordOrNull(connection);
return conn ? recordOrNull(conn.historicSql) : null;
}
function queryHistoryRecord(connection: unknown): Record<string, unknown> | null {
const conn = recordOrNull(connection);
const context = conn ? recordOrNull(conn.context) : null;
return context ? recordOrNull(context.queryHistory) : null;
}
export function isQueryHistoryEnabled(connection: unknown): boolean {
const queryHistory = queryHistoryRecord(connection);
if (queryHistory) {
return queryHistory.enabled === true;
}
return historicSqlRecord(connection)?.enabled === true;
}
/**
* Resolves the query-history dialect for a connection. Returns null when
* query history is disabled, or when the connection's driver has no
* query-history reader.
*/
export function queryHistoryDialectForConnection(connection: unknown): HistoricSqlDialect | null {
if (!isQueryHistoryEnabled(connection)) {
return null;
}
const conn = recordOrNull(connection);
const driver = String(conn?.driver ?? '').toLowerCase();
if (driver === 'postgres' || driver === 'postgresql') return 'postgres';
if (driver === 'bigquery') return 'bigquery';
if (driver === 'snowflake') return 'snowflake';
const legacy = String(historicSqlRecord(connection)?.dialect ?? '').toLowerCase();
return isKnownDialect(legacy) ? legacy : null;
}

View file

@ -1,6 +1,7 @@
import { once } from 'node:events';
import { createServer } from 'node:http';
import { describe, expect, it, vi } from 'vitest';
import { tableRefSet } from '../../../scan/table-ref.js';
import { createDaemonLiveDatabaseIntrospection } from './daemon-introspection.js';
const daemonResponse = {
@ -161,7 +162,11 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
baseUrl: `http://127.0.0.1:${address.port}`,
});
await expect(introspection.extractSchema('warehouse')).resolves.toMatchObject({
await expect(
introspection.extractSchema('warehouse', {
tableScope: tableRefSet([{ catalog: 'warehouse', db: 'public', name: 'orders' }]),
}),
).resolves.toMatchObject({
connectionId: 'warehouse',
tables: [{ name: 'customers' }, { name: 'orders' }],
});
@ -176,6 +181,7 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
schemas: ['public'],
statement_timeout_ms: 30_000,
connection_timeout_seconds: 5,
table_scope: [{ catalog: 'warehouse', db: 'public', name: 'orders' }],
},
},
]);
@ -217,7 +223,7 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
expect(runJson).not.toHaveBeenCalled();
});
it('filters out tables not on the enabled_tables allowlist', async () => {
it('does not use connection enabled_tables as a response filter', async () => {
const runJson = vi.fn(async () => daemonResponse);
const introspection = createDaemonLiveDatabaseIntrospection({
connections: {
@ -232,7 +238,8 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
});
const snapshot = await introspection.extractSchema('warehouse');
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.orders']);
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.customers', 'public.orders']);
expect(runJson).toHaveBeenCalledWith('database-introspect', expect.not.objectContaining({ table_scope: expect.anything() }));
});
it('passes through every table when enabled_tables is omitted or empty', async () => {

View file

@ -3,10 +3,10 @@ import { request as httpRequest } from 'node:http';
import { request as httpsRequest } from 'node:https';
import { URL } from 'node:url';
import type { KtxProjectConnectionConfig } from '../../../project/config.js';
import { filterSnapshotTables, resolveEnabledTables } from '../../../scan/enabled-tables.js';
import { tableRefFromKey } from '../../../scan/table-ref.js';
import type { KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaSnapshot, KtxSchemaTable } from '../../../scan/types.js';
import { inferKtxDimensionType, normalizeKtxNativeType } from '../../../scan/type-normalization.js';
import type { LiveDatabaseIntrospectionPort } from './types.js';
import type { LiveDatabaseIntrospectionOptions, LiveDatabaseIntrospectionPort } from './types.js';
type KtxDaemonDatabaseIntrospectionCommand = 'database-introspect';
@ -220,6 +220,18 @@ function mapDaemonSnapshot(
};
}
function serializeTableScope(options: LiveDatabaseIntrospectionOptions | undefined): Array<{
catalog: string | null;
db: string | null;
name: string;
}> | undefined {
if (!options?.tableScope) return undefined;
return [...options.tableScope].map((key) => {
const ref = tableRefFromKey(key);
return { catalog: ref.catalog, db: ref.db, name: ref.name };
});
}
export function createDaemonLiveDatabaseIntrospection(
options: DaemonLiveDatabaseIntrospectionOptions,
): LiveDatabaseIntrospectionPort {
@ -231,8 +243,9 @@ export function createDaemonLiveDatabaseIntrospection(
const now = options.now ?? (() => new Date());
return {
async extractSchema(connectionId: string): Promise<KtxSchemaSnapshot> {
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions): Promise<KtxSchemaSnapshot> {
const connection = requirePostgresConnection(options.connections, connectionId);
const tableScope = serializeTableScope(introspectionOptions);
const payload = {
connection_id: connectionId,
driver: normalizeDriver(connection.driver),
@ -240,17 +253,16 @@ export function createDaemonLiveDatabaseIntrospection(
schemas,
statement_timeout_ms: options.statementTimeoutMs ?? 30_000,
connection_timeout_seconds: options.connectionTimeoutSeconds ?? 5,
...(tableScope !== undefined ? { table_scope: tableScope } : {}),
};
const raw = requestJson
? await requestJson('/database/introspect', payload)
: await runJson('database-introspect', payload);
const snapshot = mapDaemonSnapshot(raw, {
return mapDaemonSnapshot(raw, {
connectionId,
extractedAt: now().toISOString(),
schemas,
});
const enabledTables = resolveEnabledTables(connection);
return enabledTables ? filterSnapshotTables(snapshot, enabledTables) : snapshot;
},
};
}

View file

@ -1,7 +1,8 @@
import { mkdtemp } from 'node:fs/promises';
import { mkdtemp, readdir, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { tableRefSet, type KtxTableRefKey } from '../../../scan/table-ref.js';
import { LiveDatabaseSourceAdapter } from './live-database.adapter.js';
describe('LiveDatabaseSourceAdapter', () => {
@ -43,7 +44,7 @@ describe('LiveDatabaseSourceAdapter', () => {
await adapter.fetch(undefined, dir, { connectionId: 'conn-1', sourceKey: 'live-database' });
expect(extractSchema).toHaveBeenCalledWith('conn-1');
expect(extractSchema).toHaveBeenCalledWith('conn-1', { tableScope: undefined });
await expect(adapter.detect(dir)).resolves.toBe(true);
const chunked = await adapter.chunk(dir);
expect(chunked.workUnits.map((wu) => wu.unitKey)).toEqual(['live-database-public-orders']);
@ -56,4 +57,55 @@ describe('LiveDatabaseSourceAdapter', () => {
expect(adapter.source).toBe('live-database');
expect(adapter.skillNames).toEqual(['live_database_ingest']);
});
it('threads tableScope from fetch context into the introspection port without post-filtering', async () => {
const extractSchema = vi.fn(
async (_connectionId: string, _options?: { tableScope?: ReadonlySet<KtxTableRefKey> }) => ({
connectionId: 'warehouse',
driver: 'snowflake' as const,
extractedAt: '2026-05-22T00:00:00.000Z',
scope: {},
metadata: {},
tables: [
{
catalog: 'A',
db: 'MARTS',
name: 'IN_SCOPE',
kind: 'table' as const,
comment: null,
estimatedRows: 0,
columns: [],
foreignKeys: [],
},
{
catalog: 'A',
db: 'MARTS',
name: 'OUT_OF_SCOPE',
kind: 'table' as const,
comment: null,
estimatedRows: 0,
columns: [],
foreignKeys: [],
},
],
}),
);
const scope = tableRefSet([{ catalog: 'A', db: 'MARTS', name: 'IN_SCOPE' }]);
const adapter = new LiveDatabaseSourceAdapter({
introspection: { extractSchema },
});
const stagedDir = await mkdtemp(join(tmpdir(), 'ktx-livedb-scope-'));
try {
await adapter.fetch(undefined, stagedDir, {
connectionId: 'warehouse',
sourceKey: 'live-database',
tableScope: scope,
});
expect(extractSchema).toHaveBeenCalledWith('warehouse', { tableScope: scope });
const tables = await readdir(join(stagedDir, 'tables'));
expect(tables).toHaveLength(2);
} finally {
await rm(stagedDir, { recursive: true, force: true });
}
});
});

View file

@ -14,7 +14,8 @@ export class LiveDatabaseSourceAdapter implements SourceAdapter {
}
async fetch(_pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise<void> {
const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId);
const tableScope = ctx.tableScope;
const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId, { tableScope });
await writeLiveDatabaseSnapshot(stagedDir, {
...snapshot,
connectionId: ctx.connectionId,

View file

@ -1,7 +1,12 @@
import type { KtxSchemaSnapshot } from '../../../scan/types.js';
import type { KtxTableRefKey } from '../../../scan/table-ref.js';
export interface LiveDatabaseIntrospectionOptions {
tableScope?: ReadonlySet<KtxTableRefKey>;
}
export interface LiveDatabaseIntrospectionPort {
extractSchema(connectionId: string): Promise<KtxSchemaSnapshot>;
extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions): Promise<KtxSchemaSnapshot>;
}
export interface LiveDatabaseSourceAdapterDeps {

View file

@ -9,6 +9,7 @@ import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js';
import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js';
import { buildSyncId } from './raw-sources-paths.js';
import { SqliteLocalIngestStore } from './sqlite-local-ingest-store.js';
import type { KtxTableRefKey } from '../scan/table-ref.js';
import type { IngestTrigger, SourceAdapter, WorkUnit } from './types.js';
type LocalIngestStatus = 'running' | 'done' | 'error';
@ -62,6 +63,7 @@ export interface RunLocalStageOnlyIngestOptions {
now?: () => Date;
dryRun?: boolean;
memoryFlow?: MemoryFlowEventSink;
tableScope?: ReadonlySet<KtxTableRefKey>;
}
const LOCAL_AUTHOR = 'ktx';
@ -225,6 +227,7 @@ async function prepareLocalStagedDir(
stagedDir: string,
sourceDir: string | undefined,
connectionId: string,
tableScope: ReadonlySet<KtxTableRefKey> | undefined,
): Promise<string | null> {
await rm(stagedDir, { recursive: true, force: true });
await mkdir(stagedDir, { recursive: true });
@ -242,7 +245,7 @@ async function prepareLocalStagedDir(
);
}
const pullConfig = await localPullConfigForAdapter(project, adapter, connectionId);
await adapter.fetch(pullConfig, stagedDir, { connectionId, sourceKey: adapter.source });
await adapter.fetch(pullConfig, stagedDir, { connectionId, sourceKey: adapter.source, tableScope });
return null;
}
@ -274,7 +277,14 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti
assertCompatibleExistingRun(existingRun, runId, adapter.source, connectionId);
const stagedDir = join(options.project.projectDir, '.ktx/cache/local-ingest', runId, 'staged');
const sourceDir = await prepareLocalStagedDir(options.project, adapter, stagedDir, options.sourceDir, connectionId);
const sourceDir = await prepareLocalStagedDir(
options.project,
adapter,
stagedDir,
options.sourceDir,
connectionId,
options.tableScope,
);
const detected = await adapter.detect(stagedDir);
if (!detected) {

View file

@ -2,6 +2,7 @@ import type { KtxEmbeddingPort } from '../core/embedding.js';
import type { MemoryAction } from '../../context/memory/types.js';
import type { SemanticLayerService } from '../../context/sl/semantic-layer.service.js';
import type { TouchedSlSource } from '../../context/tools/touched-sl-sources.js';
import type { KtxTableRefKey } from '../scan/table-ref.js';
import type { MemoryFlowEventSink } from './memory-flow/types.js';
import type { StageIndex } from './stages/stage-index.types.js';
import type { WorkUnitOutcome } from './stages/stage-3-work-units.js';
@ -52,6 +53,7 @@ export interface ChunkResult {
export interface FetchContext {
connectionId: string;
sourceKey: string;
tableScope?: ReadonlySet<KtxTableRefKey>;
memoryFlow?: MemoryFlowEventSink;
}

View file

@ -91,9 +91,14 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
});
});
it('validates structured output with the caller schema', async () => {
it('validates structured output with the caller schema and whitelists the SDK StructuredOutput tool', async () => {
const schema = z.object({ answer: z.string() });
const query = vi.fn((_input: any) => stream([initMessage(), resultMessage({ structured_output: { answer: 'yes' } })]));
const query = vi.fn((_input: any) =>
stream([
initMessage({ tools: ['StructuredOutput'] }),
resultMessage({ structured_output: { answer: 'yes' } }),
]),
);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
@ -341,7 +346,10 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
it('passes scrubbed env to object generation and agent loops', async () => {
const schema = z.object({ answer: z.string() });
const objectQuery = vi.fn((_input: any) =>
stream([initMessage(), resultMessage({ structured_output: { answer: 'yes' } })]),
stream([
initMessage({ tools: ['StructuredOutput'] }),
resultMessage({ structured_output: { answer: 'yes' } }),
]),
);
const objectRuntime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',

View file

@ -47,6 +47,13 @@ const BUILTIN_TOOLS = [
const KTX_MCP_SERVER_NAME = 'ktx';
// SDK-internal pseudo-tool that the Claude Code CLI announces in its
// system/init message whenever outputFormat: { type: 'json_schema' } is set.
// Structured output is returned via result.structured_output (not through
// canUseTool), so the tool only needs to be whitelisted for generateObject's
// init isolation check; generateText / runAgentLoop never see it.
const STRUCTURED_OUTPUT_TOOL_NAME = 'StructuredOutput';
function isResult(message: SDKMessage): message is SDKResultMessage {
return message.type === 'result';
}
@ -238,7 +245,12 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
projectDir: this.deps.projectDir,
model: modelForRole(this.deps.modelSlots, input.role),
env: this.deps.env,
maxTurns: 1,
// Structured output occasionally takes more than one assistant turn —
// the model may emit thinking/text before the StructuredOutput tool
// call, or the SDK may count assistant + tool-result as separate turns.
// 5 leaves headroom without enabling unbounded loops; the json_schema
// constraint still forces the final answer to be the schema.
maxTurns: 5,
tools: input.tools,
}),
outputFormat: { type: 'json_schema' as const, schema: jsonSchema(input.schema as z.ZodType) },
@ -247,7 +259,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
options,
allowedToolIds: new Set(mcpToolIds(input.tools ?? {})),
allowedToolIds: new Set([...mcpToolIds(input.tools ?? {}), STRUCTURED_OUTPUT_TOOL_NAME]),
expectedMcpServerNames: expectedMcpServerNames(input.tools),
});
const error = resultError(result);

View file

@ -74,6 +74,7 @@ connections:
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
},
@ -278,6 +279,7 @@ scan:
maxLlmTablesPerBatch: 12
maxCandidatesPerColumn: 7
profileSampleRows: 500
profileConcurrency: 3
validationConcurrency: 2
validationBudget: 0
`);
@ -291,6 +293,7 @@ scan:
maxLlmTablesPerBatch: 12,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
validationBudget: 0,
});
@ -302,6 +305,7 @@ scan:
expect(serializeKtxProjectConfig(config)).toContain('maxLlmTablesPerBatch: 12');
expect(serializeKtxProjectConfig(config)).toContain('maxCandidatesPerColumn: 7');
expect(serializeKtxProjectConfig(config)).toContain('profileSampleRows: 500');
expect(serializeKtxProjectConfig(config)).toContain('profileConcurrency: 3');
expect(serializeKtxProjectConfig(config)).toContain('validationConcurrency: 2');
expect(serializeKtxProjectConfig(config)).toContain('validationBudget: 0');
});
@ -326,6 +330,7 @@ scan:
maxLlmTablesPerBatch: 0
maxCandidatesPerColumn: -4
profileSampleRows: 0
profileConcurrency: 0
validationConcurrency: 0
validationBudget: 1.5
`;
@ -341,6 +346,7 @@ scan:
'scan.relationships.maxLlmTablesPerBatch',
'scan.relationships.maxCandidatesPerColumn',
'scan.relationships.profileSampleRows',
'scan.relationships.profileConcurrency',
'scan.relationships.validationConcurrency',
'scan.relationships.validationBudget',
]),

View file

@ -163,6 +163,11 @@ const scanRelationshipsSchema = z
.default(25)
.describe('Maximum number of candidate join partners considered per column during relationship discovery.'),
profileSampleRows: z.int().positive().default(10000).describe('Number of rows sampled per table when profiling values for relationship inference.'),
profileConcurrency: z
.int()
.positive()
.default(4)
.describe('Parallel relationship-profile queries run against the database during scan.'),
validationConcurrency: z.int().positive().default(4).describe('Number of relationship validation queries run in parallel against the database.'),
validationBudget: z
.union([z.literal('all'), z.int().nonnegative()])

View file

@ -378,6 +378,121 @@ describe('KtxDescriptionGenerator', () => {
expect(cache.set).toHaveBeenCalledWith('warehouse.public.orders', 'Commerce orders');
expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders');
});
it('generates one structured table description and reuses table samples for all columns', async () => {
const llmRuntime = createLlmProvider('unused');
llmRuntime.generateObject = vi.fn(async () => ({
tableDescription: 'Commerce orders',
columns: [
{ name: 'status', description: 'Current order state' },
{ name: 'amount', description: 'Order amount in dollars' },
],
}));
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmRuntime,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
rawDescriptions: { db: 'Orders fact table' },
columns: [
{ name: 'status', type: 'text' },
{ name: 'amount', type: 'numeric' },
],
},
});
expect(result.tableDescription).toBe('Commerce orders');
expect(Object.fromEntries(result.columnDescriptions)).toEqual({
status: 'Current order state',
amount: 'Order amount in dollars',
});
expect(connector.sampleTable).toHaveBeenCalledTimes(1);
expect(connector.sampleColumn).not.toHaveBeenCalled();
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).not.toHaveBeenCalled();
});
it('falls back to one column generateText call for each missing structured column', async () => {
const llmRuntime = createLlmProvider('Fallback status');
llmRuntime.generateObject = vi.fn(async () => ({
tableDescription: 'Commerce orders',
columns: [{ name: 'amount', description: 'Order amount in dollars' }],
}));
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmRuntime,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [
{ name: 'status', type: 'text' },
{ name: 'amount', type: 'numeric' },
],
},
});
expect(Object.fromEntries(result.columnDescriptions)).toEqual({
status: 'Fallback status',
amount: 'Order amount in dollars',
});
expect(connector.sampleColumn).not.toHaveBeenCalled();
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).toHaveBeenCalledTimes(1);
});
it('does not run per-column fallback when structured object generation throws', async () => {
const llmRuntime = createLlmProvider('Fallback description');
llmRuntime.generateObject = vi.fn(async () => {
throw new Error('object output unavailable');
});
const warnings: string[] = [];
const generator = new KtxDescriptionGenerator({
llmRuntime,
onWarning: (warning) => warnings.push(warning.code),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector: createConnector(),
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'status', type: 'text' }],
},
});
expect(result.tableDescription).toBeNull();
expect(Object.fromEntries(result.columnDescriptions)).toEqual({ status: null });
expect(warnings).toContain('enrichment_failed');
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).not.toHaveBeenCalled();
});
});
describe('KtxDescriptionGenerator resilience', () => {

View file

@ -1,4 +1,5 @@
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { z } from 'zod';
import type {
KtxColumnSampleInput,
KtxColumnSampleResult,
@ -53,7 +54,7 @@ export interface KtxDescriptionColumn {
sampleValues?: unknown[];
}
export interface KtxDescriptionColumnTable extends KtxTableRef {
interface KtxDescriptionColumnTable extends KtxTableRef {
columns: KtxDescriptionColumn[];
}
@ -112,6 +113,23 @@ export interface KtxGenerateTableDescriptionInput {
table: KtxDescriptionTableInput;
}
export interface KtxGenerateBatchedTableDescriptionsInput {
connectionId: string;
connector: KtxDescriptionSamplingPort;
context: KtxScanContext;
dataSourceType: string;
supportsNestedAnalysis: boolean;
table: KtxDescriptionColumnTable & {
rawDescriptions?: Record<string, string>;
columns: Array<KtxDescriptionColumn & { type?: string; comment?: string | null }>;
};
}
export interface KtxBatchedTableDescriptionsResult {
tableDescription: string | null;
columnDescriptions: Map<string, string | null>;
}
export interface KtxGenerateDataSourceDescriptionInput {
connectionId: string;
connector: KtxDescriptionSamplingPort;
@ -136,6 +154,18 @@ interface ColumnTaskResult {
skipped: boolean;
}
const batchedTableDescriptionSchema = z.object({
tableDescription: z.string(),
columns: z.array(
z.object({
name: z.string(),
description: z.string(),
}),
),
});
type BatchedTableDescriptionOutput = z.infer<typeof batchedTableDescriptionSchema>;
function descriptionSources(rawDescriptions: Record<string, string> | undefined): Array<[string, string]> {
if (!rawDescriptions) {
return [];
@ -250,6 +280,76 @@ function wordLimitLine(maxWords: number): string {
return `Please provide a concise description in ${maxWords} words or less.`;
}
function sampleValuesByColumn(
columns: readonly KtxDescriptionColumn[],
sampleData: KtxTableSampleResult | null,
): Map<string, unknown[]> {
const values = new Map<string, unknown[]>();
for (const column of columns) {
const existingValues = column.sampleValues?.filter((value) => value !== null && value !== undefined) ?? [];
if (existingValues.length > 0) {
values.set(column.name, existingValues);
}
}
if (!sampleData) {
return values;
}
for (const column of columns) {
const index = sampleData.headers.findIndex((header) => header.toLowerCase() === column.name.toLowerCase());
if (index < 0) {
continue;
}
const sampledValues = sampleData.rows
.map((row) => row[index])
.filter((value) => value !== null && value !== undefined);
if (sampledValues.length > 0) {
values.set(column.name, sampledValues);
}
}
return values;
}
function batchedPrompt(input: {
table: KtxGenerateBatchedTableDescriptionsInput['table'];
sampleData: KtxTableSampleResult | null;
dataSourceType: string;
tableMaxWords: number;
columnMaxWords: number;
}): KtxDescriptionPrompt {
const columnLines = input.table.columns
.map((column) => {
const typePart = column.type ? ` (${column.type})` : '';
const commentPart = column.rawDescriptions?.db ? ` - ${column.rawDescriptions.db}` : '';
return `- ${column.name}${typePart}${commentPart}`;
})
.join('\n');
const sampleLines =
input.sampleData && input.sampleData.rows.length > 0
? input.sampleData.rows
.slice(0, 5)
.map((row) =>
input.sampleData!.headers.map((header, index) => `${header}=${String(row[index] ?? '')}`).join(', '),
)
.join('\n')
: 'unavailable';
return {
system: [
'Analyze one database table and return structured JSON matching the supplied schema.',
`The table description must be ${input.tableMaxWords} words or less.`,
`Each column description must be ${input.columnMaxWords} words or less.`,
'Describe business meaning directly. Do not repeat table or column names as filler.',
].join('\n'),
user: [
`Table: ${input.table.name}`,
`Data source type: ${input.dataSourceType}`,
'Columns:',
columnLines,
'Sample rows:',
sampleLines,
].join('\n'),
};
}
/** @internal */
export function buildKtxColumnDescriptionPrompt(
input: KtxColumnDescriptionPromptInput & { maxWords?: number },
@ -463,11 +563,11 @@ export class KtxDescriptionGenerator {
}
}
const sampleTable = input.connector.sampleTable;
const connector = input.connector;
let sampleData: KtxTableSampleResult | null = null;
let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null;
if (!sampleTable) {
if (!connector.sampleTable) {
fallbackReason = 'capability_missing';
this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', {
connectorId: input.connector.id,
@ -484,7 +584,7 @@ export class KtxDescriptionGenerator {
try {
sampleData = await retryAsync(
() =>
sampleTable(
connector.sampleTable!(
{
connectionId: input.connectionId,
table: tableRef,
@ -582,6 +682,156 @@ export class KtxDescriptionGenerator {
}
}
async generateBatchedTableDescriptions(
input: KtxGenerateBatchedTableDescriptionsInput,
): Promise<KtxBatchedTableDescriptionsResult> {
const tableRef = toTableRef(input.table);
let sampleData: KtxTableSampleResult | null = null;
let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null;
if (!input.connector.sampleTable) {
fallbackReason = 'capability_missing';
this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'connector_capability_missing',
message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, capability: 'sampleTable' },
});
} else {
try {
sampleData = await retryAsync(
() =>
input.connector.sampleTable!(
{
connectionId: input.connectionId,
table: tableRef,
limit: 20,
},
input.context,
),
{
attempts: 3,
baseDelayMs: 200,
signal: input.context.signal,
onAttemptFailure: (error, attempt) => {
this.logger?.warn(`sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
attempt,
});
},
},
);
if (sampleData.rows.length === 0) {
fallbackReason = 'empty_sample';
this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
}
} catch (error) {
if (error instanceof KtxAbortedError) {
throw error;
}
fallbackReason = 'sampling_failed';
this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'sampling_failed',
message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, error: errorMessage(error) },
});
}
}
const sampleValues = sampleValuesByColumn(input.table.columns, sampleData);
const descriptions = new Map<string, string | null>();
let tableDescription: string | null = null;
let structuredGenerationSucceeded = false;
try {
const prompt = batchedPrompt({
table: input.table,
sampleData,
dataSourceType: input.dataSourceType,
tableMaxWords: this.settings.tableMaxWords,
columnMaxWords: this.settings.columnMaxWords,
});
const generated = await this.llmRuntime.generateObject<
BatchedTableDescriptionOutput,
typeof batchedTableDescriptionSchema
>({
role: 'candidateExtraction',
system: prompt.system,
prompt: prompt.user,
schema: batchedTableDescriptionSchema,
temperature: this.settings.temperature,
});
structuredGenerationSucceeded = true;
tableDescription = generated.tableDescription.trim() || null;
const generatedColumns = new Map(
generated.columns.map((column) => [column.name.toLowerCase(), column.description.trim() || null]),
);
for (const column of input.table.columns) {
const description = generatedColumns.get(column.name.toLowerCase()) ?? null;
descriptions.set(column.name, description);
}
if (tableDescription && fallbackReason !== null) {
this.onWarning?.({
code: 'description_fallback_used',
message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, reason: fallbackReason },
});
}
} catch (error) {
this.logger?.warn(`Batched table description failed for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'enrichment_failed',
message: `Failed to generate batched description for table ${input.table.name}: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id },
});
}
if (!structuredGenerationSucceeded) {
for (const column of input.table.columns) {
descriptions.set(column.name, null);
}
return { tableDescription, columnDescriptions: descriptions };
}
const tableContext = `Table: ${input.table.name} | Columns: ${input.table.columns.map((column) => column.name).join(', ')} | Data source: ${input.dataSourceType}`;
for (const column of input.table.columns) {
if (descriptions.get(column.name)) {
continue;
}
const fallback = await this.generateColumnDescriptionFromPreparedValues({
column,
columnValues: sampleValues.get(column.name) ?? [],
tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
});
descriptions.set(column.name, fallback);
}
return { tableDescription, columnDescriptions: descriptions };
}
async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise<string | null> {
if (input.tables.length === 0) {
return 'No tables found in database';
@ -684,11 +934,11 @@ export class KtxDescriptionGenerator {
});
columnValues = [];
} else {
const sampleColumn = input.connector.sampleColumn;
const connector = input.connector;
try {
const sample = await retryAsync(
() =>
sampleColumn(
connector.sampleColumn!(
{
connectionId: input.connectionId,
table: tableRef,
@ -732,27 +982,13 @@ export class KtxDescriptionGenerator {
}
}
const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined);
const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return {
columnName: column.name,
description: null,
skipped: false,
processed: false,
};
}
const prompt = buildKtxColumnDescriptionPrompt({
columnName: column.name,
columnValues: nonNullValues,
const description = await this.generateColumnDescriptionFromPreparedValues({
column,
columnValues: columnValues ?? [],
tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
rawDescriptions: column.rawDescriptions,
maxWords: this.settings.columnMaxWords,
});
const description = await this.generateAiDescription(prompt, 'ktx-column-description');
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
@ -782,6 +1018,30 @@ export class KtxDescriptionGenerator {
}
}
private async generateColumnDescriptionFromPreparedValues(input: {
column: KtxDescriptionColumn;
columnValues: unknown[];
tableContext: string;
dataSourceType: string;
supportsNestedAnalysis: boolean;
}): Promise<string | null> {
const nonNullValues = input.columnValues.filter((value) => value !== null && value !== undefined);
const hasRawDescriptions = descriptionSources(input.column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return null;
}
const prompt = buildKtxColumnDescriptionPrompt({
columnName: input.column.name,
columnValues: nonNullValues,
tableContext: input.tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
rawDescriptions: input.column.rawDescriptions,
maxWords: this.settings.columnMaxWords,
});
return this.generateAiDescription(prompt, 'ktx-column-description');
}
private async generateAiDescription(prompt: KtxDescriptionPrompt, _operationName: string): Promise<string | null> {
try {
const text = await this.llmRuntime.generateText({

View file

@ -1,17 +1,63 @@
import type { KtxSchemaSnapshot } from './types.js';
import { tableRefSet, type KtxTableRefKey } from './table-ref.js';
import type { KtxTableRef } from './types.js';
export function resolveEnabledTables(connection: Record<string, unknown> | undefined): Set<string> | null {
/**
* Parses the `enabled_tables` field on a connection into a scope of
* fully-qualified table refs. Returns `null` when the field is absent or
* empty (meaning "no scope — include every table in the resolved schemas").
*
* Accepted entry forms:
* "catalog.db.name" fully qualified
* "db.name" schema-qualified (catalog = null; legacy / Postgres-shape)
* "name" bare (catalog = db = null; SQLite-shape)
* { catalog?, db?, name } escape hatch for identifiers containing dots
*
* The setup wizard writes the fully-qualified form going forward; the lenient
* parser keeps existing project configs working.
*/
export function resolveEnabledTables(
connection: Record<string, unknown> | undefined,
): ReadonlySet<KtxTableRefKey> | null {
const raw = connection?.enabled_tables;
if (!Array.isArray(raw) || raw.length === 0) return null;
return new Set(raw.filter((v): v is string => typeof v === 'string'));
const refs: KtxTableRef[] = [];
for (const value of raw) {
const parsed = parseEnabledTableEntry(value);
if (parsed) refs.push(parsed);
}
if (refs.length === 0) return null;
return tableRefSet(refs);
}
export function filterSnapshotTables(snapshot: KtxSchemaSnapshot, enabledTables: Set<string>): KtxSchemaSnapshot {
return {
...snapshot,
tables: snapshot.tables.filter((table) => {
const key = table.db ? `${table.db}.${table.name}` : table.name;
return enabledTables.has(key);
}),
};
function parseEnabledTableEntry(value: unknown): KtxTableRef | null {
if (typeof value === 'string') {
return parseDottedEntry(value);
}
if (value && typeof value === 'object' && !Array.isArray(value)) {
const entry = value as { catalog?: unknown; db?: unknown; name?: unknown };
const name = typeof entry.name === 'string' ? entry.name : null;
if (!name) return null;
return {
catalog: typeof entry.catalog === 'string' ? entry.catalog : null,
db: typeof entry.db === 'string' ? entry.db : null,
name,
};
}
return null;
}
function parseDottedEntry(value: string): KtxTableRef | null {
const trimmed = value.trim();
if (trimmed.length === 0) return null;
const parts = trimmed.split('.');
if (parts.length === 3) {
return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! };
}
if (parts.length === 2) {
return { catalog: null, db: parts[0]!, name: parts[1]! };
}
if (parts.length === 1) {
return { catalog: null, db: null, name: parts[0]! };
}
return null;
}

View file

@ -289,6 +289,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 12,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
},
});
@ -378,6 +379,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
validationRequiredForManifest: true,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
},
profileWarnings: [],
@ -472,6 +474,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
dryRun: false,
@ -741,6 +744,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
dryRun: false,

View file

@ -382,6 +382,7 @@ export async function writeLocalScanEnrichmentArtifacts(
validationRequiredForManifest: input.relationshipSettings.validationRequiredForManifest,
maxCandidatesPerColumn: input.relationshipSettings.maxCandidatesPerColumn,
profileSampleRows: input.relationshipSettings.profileSampleRows,
profileConcurrency: input.relationshipSettings.profileConcurrency,
validationConcurrency: input.relationshipSettings.validationConcurrency,
}
: undefined,

View file

@ -299,6 +299,38 @@ describe('local scan enrichment', () => {
]);
});
it('uses the supplied snapshot without calling connector.introspect', async () => {
const scanConnector = connector();
const introspect = vi.mocked(scanConnector.introspect);
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'structural',
connector: scanConnector,
snapshot,
context: { runId: 'scan-run-snapshot' },
providers: null,
});
expect(result.snapshot).toEqual(snapshot);
expect(introspect).not.toHaveBeenCalled();
});
it('falls back to connector.introspect when no snapshot is supplied', async () => {
const scanConnector = connector();
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'structural',
connector: scanConnector,
context: { runId: 'scan-run-introspect' },
providers: null,
});
expect(result.snapshot).toEqual(snapshot);
expect(scanConnector.introspect).toHaveBeenCalledTimes(1);
});
it('runs deterministic relationship detection for relationship scans', async () => {
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -473,7 +505,7 @@ describe('local scan enrichment', () => {
expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 });
});
it('generates table descriptions with bounded table-level concurrency', async () => {
it('generates batched table descriptions with bounded table-level concurrency', async () => {
const concurrentSnapshot: KtxSchemaSnapshot = {
...snapshot,
tables: Array.from({ length: 8 }, (_, index) => ({
@ -497,27 +529,27 @@ describe('local scan enrichment', () => {
],
})),
};
let activeColumnSamples = 0;
let maxActiveColumnSamples = 0;
let activeTableSamples = 0;
let maxActiveTableSamples = 0;
const scanConnector = {
...connector(),
introspect: vi.fn(async () => concurrentSnapshot),
sampleColumn: vi.fn(async () => {
activeColumnSamples += 1;
maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples);
sampleColumn: vi.fn(async () => ({
values: ['1'],
nullCount: 0,
distinctCount: 1,
})),
sampleTable: vi.fn(async () => {
activeTableSamples += 1;
maxActiveTableSamples = Math.max(maxActiveTableSamples, activeTableSamples);
await new Promise((resolve) => setTimeout(resolve, 10));
activeColumnSamples -= 1;
activeTableSamples -= 1;
return {
values: ['1'],
nullCount: 0,
distinctCount: 1,
headers: ['id'],
rows: [[1]],
totalRows: 1,
};
}),
sampleTable: vi.fn(async () => ({
headers: ['id'],
rows: [[1]],
totalRows: 1,
})),
};
const settings = {
...buildDefaultKtxProjectConfig().scan.relationships,
@ -533,7 +565,8 @@ describe('local scan enrichment', () => {
relationshipSettings: settings,
});
expect(maxActiveColumnSamples).toBe(6);
expect(maxActiveTableSamples).toBe(4);
expect(scanConnector.sampleColumn).not.toHaveBeenCalled();
});
it('reports enrichment progress for countable stages', async () => {
@ -675,7 +708,7 @@ describe('local scan enrichment', () => {
providerIdentity: { provider: 'fake', embeddingDimensions: 6 },
});
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject');
const embedBatch = vi.spyOn(providers.embedding, 'embedBatch');
const second = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -693,7 +726,7 @@ describe('local scan enrichment', () => {
expect(first.state.resumedStages).toEqual([]);
expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(generateText).not.toHaveBeenCalled();
expect(generateObject).not.toHaveBeenCalled();
expect(embedBatch).not.toHaveBeenCalled();
expect(second.descriptionUpdates).toEqual(first.descriptionUpdates);
expect(second.embeddingUpdates).toEqual(first.embeddingUpdates);
@ -731,7 +764,7 @@ describe('local scan enrichment', () => {
tables: [{ ...firstTable, name: 'customers' }],
})),
};
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject');
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -747,7 +780,7 @@ describe('local scan enrichment', () => {
expect(result.state.resumedStages).toEqual([]);
expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(generateText).toHaveBeenCalled();
expect(generateObject).toHaveBeenCalled();
});
it('runs providerless enriched scans as relationship-only discovery enrichment', async () => {

View file

@ -1,7 +1,7 @@
import pLimit from 'p-limit';
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js';
import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js';
import { KtxDescriptionGenerator } from './description-generation.js';
import { buildKtxColumnEmbeddingText } from './embedding-text.js';
import {
completedKtxScanEnrichmentStateSummary,
@ -41,7 +41,7 @@ import type {
KtxTableRef,
} from './types.js';
const DESCRIPTION_TABLE_CONCURRENCY = 6;
const DESCRIPTION_TABLE_CONCURRENCY = 4;
export interface KtxLocalScanEnrichmentProviders {
llmRuntime: KtxLlmRuntimePort;
@ -53,6 +53,7 @@ export interface KtxLocalScanEnrichmentInput {
mode: KtxScanMode;
detectRelationships?: boolean;
connector: KtxScanConnector;
snapshot?: KtxSchemaSnapshot;
context: KtxScanContext;
providers: KtxLocalScanEnrichmentProviders | null;
stateStore?: KtxScanEnrichmentStateStore | null;
@ -179,7 +180,17 @@ function deterministicLlmRuntime(): KtxLlmRuntimePort {
async generateText(input) {
return `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'data source'}`;
},
async generateObject() {
async generateObject(input) {
if (input.prompt.includes('Sample rows:')) {
const columns = Array.from(input.prompt.matchAll(/^- ([^\s(]+)/gm), (match) => ({
name: match[1] ?? 'column',
description: `Deterministic description for ${match[1] ?? 'column'}`,
}));
return {
tableDescription: `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'table'}`,
columns,
} as never;
}
return { pkCandidates: [], fkCandidates: [] } as never;
},
async runAgentLoop() {
@ -234,30 +245,6 @@ export function snapshotToKtxEnrichedSchema(
};
}
function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable {
return {
catalog: table.catalog,
db: table.db,
name: table.name,
columns: table.columns.map((column) => ({
name: column.name,
...(column.comment ? { sampleValues: [column.comment], rawDescriptions: { db: column.comment } } : {}),
})),
};
}
function tableMetadataColumns(table: KtxSchemaTable): Array<{
name: string;
nativeType?: string | null;
comment?: string | null;
}> {
return table.columns.map((column) => ({
name: column.name,
nativeType: column.nativeType ?? null,
comment: column.comment ?? null,
}));
}
function embeddingBatchSize(maxBatchSize: number): number {
return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100;
}
@ -306,32 +293,28 @@ async function generateDescriptions(input: {
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
const batched = await generator.generateBatchedTableDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
columns: tableMetadataColumns(table),
columns: table.columns.map((column) => ({
name: column.name,
type: column.nativeType,
...(column.comment ? { rawDescriptions: { db: column.comment } } : {}),
})),
},
});
return {
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
tableDescription: batched.tableDescription,
columnDescriptions: Object.fromEntries(batched.columnDescriptions),
};
}),
),
@ -472,15 +455,17 @@ export async function runLocalScanEnrichment(
): Promise<KtxLocalScanEnrichmentResult> {
const progress = input.context.progress;
await progress?.update(0, 'Loading enrichment schema snapshot');
const snapshot = await input.connector.introspect(
{
connectionId: input.connectionId,
driver: input.connector.driver,
mode: input.mode,
detectRelationships: input.detectRelationships,
},
input.context,
);
const snapshot =
input.snapshot ??
(await input.connector.introspect(
{
connectionId: input.connectionId,
driver: input.connector.driver,
mode: input.mode,
detectRelationships: input.detectRelationships,
},
input.context,
));
await progress?.update(0.05, `Loaded schema snapshot with ${snapshot.tables.length} tables`);
const now = input.now ?? (() => new Date());

View file

@ -6,9 +6,15 @@ import YAML from 'yaml';
import type { SourceAdapter } from '../../context/ingest/types.js';
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js';
import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js';
import { resolveEnabledTables } from './enabled-tables.js';
import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js';
import { tableRefKey, tableRefSet, type KtxTableRefKey } from './table-ref.js';
import type {
KtxQueryResult,
KtxReadOnlyQueryInput,
KtxScanConnector,
KtxSchemaSnapshot,
} from './types.js';
function relationshipSqlResult(
input: KtxReadOnlyQueryInput,
@ -120,7 +126,43 @@ async function writeDatabaseConfigWithoutIngestAdapters(projectDir: string): Pro
);
}
function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceAdapter {
function defaultFetchSnapshot(options: { extractedAt?: () => string } = {}): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: options.extractedAt?.() ?? '2026-04-29T09:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
name: 'orders',
catalog: null,
db: 'public',
kind: 'table',
comment: null,
estimatedRows: null,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: null,
},
],
foreignKeys: [],
},
],
};
}
function fetchOnlyAdapter(options: { extractedAt?: () => string; snapshot?: KtxSchemaSnapshot } = {}): SourceAdapter {
const scanSnapshot = options.snapshot
? { ...options.snapshot, ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}) }
: defaultFetchSnapshot(options);
return {
source: 'live-database',
skillNames: ['live_database_ingest'],
@ -129,39 +171,89 @@ function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceA
await writeFile(
join(stagedDir, 'connection.json'),
`${JSON.stringify({
connectionId: 'warehouse',
driver: 'postgres',
...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}),
scope: { schemas: ['public'] },
metadata: {},
connectionId: scanSnapshot.connectionId,
driver: scanSnapshot.driver,
extractedAt: scanSnapshot.extractedAt,
scope: scanSnapshot.scope,
metadata: scanSnapshot.metadata,
})}\n`,
'utf-8',
);
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
await writeFile(
join(stagedDir, 'tables', 'orders.json'),
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":null,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
for (const table of scanSnapshot.tables) {
await writeFile(join(stagedDir, 'tables', `${table.name}.json`), `${JSON.stringify(table)}\n`, 'utf-8');
}
},
async detect() {
return true;
},
async chunk() {
return {
workUnits: [
{
unitKey: 'live-database-public-orders',
rawFiles: ['tables/orders.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
],
workUnits: scanSnapshot.tables.map((table) => ({
unitKey: `live-database-${table.db ?? 'default'}-${table.name}`,
rawFiles: [`tables/${table.name}.json`],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
})),
};
},
};
}
function nativeScanSnapshot(): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: '2026-04-29T09:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
catalog: null,
db: 'public',
name: 'orders',
kind: 'table',
comment: 'Orders',
estimatedRows: 1,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: 'Order id',
},
],
},
],
};
}
function nativeScanConnector(options: { cleanup?: () => Promise<void> } = {}): KtxScanConnector {
return {
id: 'test:warehouse',
driver: 'postgres',
capabilities: {
structuralIntrospection: true,
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: false,
nestedAnalysis: false,
eventStreamDiscovery: false,
formalForeignKeys: false,
estimatedRowCounts: false,
},
introspect: vi.fn(async () => nativeScanSnapshot()),
sampleTable: vi.fn(async () => ({ headers: ['id'], rows: [[1]], totalRows: 1 })),
sampleColumn: vi.fn(async () => ({ values: ['1'], nullCount: 0, distinctCount: 1 })),
...(options.cleanup ? { cleanup: options.cleanup } : {}),
};
}
describe('local scan', () => {
let tempDir: string;
let project: KtxLocalProject;
@ -244,6 +336,73 @@ describe('local scan', () => {
});
});
it('passes enabled_tables as fetch context tableScope and does not post-filter staged snapshots', async () => {
project.config.connections.warehouse = {
...project.config.connections.warehouse,
enabled_tables: ['public.orders'],
};
let capturedTableScope: ReadonlySet<KtxTableRefKey> | undefined;
const adapter: SourceAdapter = {
source: 'live-database',
skillNames: ['live_database_ingest'],
async fetch(_pullConfig, stagedDir, ctx) {
capturedTableScope = ctx.tableScope;
await mkdir(join(stagedDir, 'tables'), { recursive: true });
await writeFile(
join(stagedDir, 'connection.json'),
'{"connectionId":"warehouse","driver":"postgres","scope":{"schemas":["public"]},"metadata":{}}\n',
'utf-8',
);
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
await writeFile(
join(stagedDir, 'tables', 'customers.json'),
'{"name":"customers","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":100,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
await writeFile(
join(stagedDir, 'tables', 'orders.json'),
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":1000,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
},
async detect() {
return true;
},
async chunk() {
return {
workUnits: [
{
unitKey: 'live-database-public-customers',
rawFiles: ['tables/customers.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
{
unitKey: 'live-database-public-orders',
rawFiles: ['tables/orders.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
],
};
},
};
const result = await runLocalScan({
project,
adapters: [adapter],
connectionId: 'warehouse',
jobId: 'scan-strict-scope-fetch',
now: () => new Date('2026-05-22T00:00:00.000Z'),
});
expect([...(capturedTableScope ?? [])]).toEqual([...tableRefSet([{ catalog: null, db: 'public', name: 'orders' }])]);
expect(result.report.diffSummary.tablesAdded).toBe(2);
const structuralManifest = await readFile(join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8');
expect(structuralManifest).toContain('customers:');
expect(structuralManifest).toContain('orders:');
});
it('runs a structural database scan when live-database is not listed in ktx.yaml', async () => {
await writeDatabaseConfigWithoutIngestAdapters(project.projectDir);
project = await loadKtxProject({ projectDir: project.projectDir });
@ -265,6 +424,59 @@ describe('local scan', () => {
});
});
it('threads the structural snapshot into enrichment without connector re-introspection', async () => {
project.config.scan.enrichment = { mode: 'deterministic' };
const connector = nativeScanConnector();
const introspect = vi.mocked(connector.introspect);
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'enriched',
connector,
jobId: 'scan-enrichment-snapshot-threading',
now: () => new Date('2026-04-29T09:11:00.000Z'),
});
expect(result.report.enrichment.tableDescriptions).toBe('completed');
expect(introspect).not.toHaveBeenCalled();
});
it('cleans up a scan connector constructed by local scan', async () => {
const cleanup = vi.fn(async () => undefined);
await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
createConnector: vi.fn(async () => nativeScanConnector({ cleanup })),
jobId: 'scan-owned-connector-cleanup',
now: () => new Date('2026-04-29T09:13:00.000Z'),
});
expect(cleanup).toHaveBeenCalledTimes(1);
});
it('does not clean up a caller-supplied scan connector', async () => {
const cleanup = vi.fn(async () => undefined);
await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
connector: nativeScanConnector({ cleanup }),
jobId: 'scan-supplied-connector-cleanup',
now: () => new Date('2026-04-29T09:13:30.000Z'),
});
expect(cleanup).not.toHaveBeenCalled();
});
it('reuses scan report and raw-source paths when the same local scan run id is retried', async () => {
const first = await runLocalScan({
project,
@ -447,10 +659,11 @@ describe('local scan', () => {
};
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -534,10 +747,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -551,6 +765,142 @@ describe('local scan', () => {
expect(result.report.warnings).toEqual([]);
});
it('keeps prototype connector methods when enabled_tables is configured', async () => {
project.config.connections.warehouse = {
...project.config.connections.warehouse,
enabled_tables: ['public.customers', 'public.orders'],
};
const scopedAdapter: SourceAdapter = {
source: 'live-database',
skillNames: ['live_database_ingest'],
async fetch(_pullConfig, stagedDir) {
await mkdir(join(stagedDir, 'tables'), { recursive: true });
await writeFile(
join(stagedDir, 'connection.json'),
'{"connectionId":"warehouse","driver":"postgres","scope":{"schemas":["public"]},"metadata":{}}\n',
'utf-8',
);
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
await writeFile(
join(stagedDir, 'tables', 'customers.json'),
'{"name":"customers","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":100,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
await writeFile(
join(stagedDir, 'tables', 'orders.json'),
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":1000,"columns":[{"name":"customer_id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":false,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
},
async detect() {
return true;
},
async chunk() {
return {
workUnits: [
{
unitKey: 'live-database-public-customers',
rawFiles: ['tables/customers.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
{
unitKey: 'live-database-public-orders',
rawFiles: ['tables/orders.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
],
};
},
};
class FakeClassConnector implements KtxScanConnector {
readonly id = 'test:warehouse';
readonly driver = 'postgres' as const;
readonly capabilities = {
structuralIntrospection: true as const,
tableSampling: false,
columnSampling: false,
columnStats: true,
readOnlySql: true,
nestedAnalysis: false,
eventStreamDiscovery: false,
formalForeignKeys: false,
estimatedRowCounts: true,
};
async introspect(): Promise<KtxSchemaSnapshot> {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: '2026-05-22T00:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
catalog: null,
db: 'public',
name: 'customers',
kind: 'table',
comment: null,
estimatedRows: 100,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: null,
},
],
},
{
catalog: null,
db: 'public',
name: 'orders',
kind: 'table',
comment: null,
estimatedRows: 1000,
foreignKeys: [],
columns: [
{
name: 'customer_id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: false,
comment: null,
},
],
},
],
};
}
async executeReadOnly(input: KtxReadOnlyQueryInput): Promise<KtxQueryResult> {
return relationshipSqlResult(input);
}
}
const result = await runLocalScan({
project,
adapters: [scopedAdapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
connector: new FakeClassConnector(),
jobId: 'scan-prototype-connector-scope',
now: () => new Date('2026-05-22T00:00:00.000Z'),
});
expect(result.report.relationships.accepted).toBe(1);
expect(result.report.warnings).toEqual([]);
});
it('threads scan relationship settings into relationship-only local scans', async () => {
project.config.scan.enrichment = { mode: 'deterministic' };
project.config.scan.relationships = {
@ -628,10 +978,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -737,10 +1088,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -863,10 +1215,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -993,10 +1346,11 @@ describe('local scan', () => {
return relationshipSqlResult(input, { throwOnCoverage: true });
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -1128,7 +1482,8 @@ describe('local scan', () => {
join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'),
'utf-8',
);
expect(manifestRaw).toContain('ai: "Deterministic description');
expect(manifestRaw).toContain('ai: |-');
expect(manifestRaw).toContain('Deterministic description');
});
it('persists structural artifacts and a recoverable warning when standalone enrichment execution fails', async () => {
@ -1301,10 +1656,11 @@ describe('local scan', () => {
},
};
const llmRuntime = deterministicLlmRuntime();
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const first = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -1333,7 +1689,7 @@ describe('local scan', () => {
const generateObject = vi.spyOn(llmRuntime, 'generateObject');
const retry = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -1359,7 +1715,6 @@ describe('local scan', () => {
failedStages: [],
});
expect(retry.report.enrichment.embeddings).toBe('completed');
expect(generateObject).toHaveBeenCalledTimes(1);
expect(generateObject).toHaveBeenCalledWith(expect.objectContaining({ role: 'candidateExtraction' }));
expect(embeddingAttempts).toBe(2);
@ -1512,69 +1867,18 @@ describe('resolveEnabledTables', () => {
expect(resolveEnabledTables({ driver: 'postgres', enabled_tables: [] })).toBeNull();
});
it('returns Set of enabled table names', () => {
it('returns a canonical set of enabled table refs', () => {
const result = resolveEnabledTables({
driver: 'postgres',
enabled_tables: ['public.users', 'public.orders'],
});
expect(result).toBeInstanceOf(Set);
expect(result!.size).toBe(2);
expect(result!.has('public.users')).toBe(true);
expect(result!.has('public.orders')).toBe(true);
expect(result!.has(tableRefKey({ catalog: null, db: 'public', name: 'users' }))).toBe(true);
expect(result!.has(tableRefKey({ catalog: null, db: 'public', name: 'orders' }))).toBe(true);
});
it('returns null for undefined connection', () => {
expect(resolveEnabledTables(undefined)).toBeNull();
});
});
describe('filterSnapshotTables', () => {
function makeSnapshot(tables: Array<{ db: string; name: string }>): KtxSchemaSnapshot {
return {
connectionId: 'test',
driver: 'postgres',
extractedAt: '2026-01-01T00:00:00Z',
scope: {},
metadata: {},
tables: tables.map(
(t): KtxSchemaTable => ({
catalog: null,
db: t.db,
name: t.name,
kind: 'table',
comment: null,
estimatedRows: null,
columns: [],
foreignKeys: [],
}),
),
};
}
it('keeps only enabled tables', () => {
const snapshot = makeSnapshot([
{ db: 'public', name: 'users' },
{ db: 'public', name: 'orders' },
{ db: 'public', name: 'logs' },
]);
const enabled = new Set(['public.users', 'public.orders']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(2);
expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']);
});
it('returns empty tables when none match', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = new Set(['public.orders']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(0);
});
it('preserves other snapshot fields', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = new Set(['public.users']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.connectionId).toBe('test');
expect(filtered.driver).toBe('postgres');
});
});

View file

@ -10,7 +10,7 @@ import type { KtxProjectLlmConfig, KtxScanEnrichmentConfig, KtxScanRelationshipC
import type { KtxLocalProject } from '../../context/project/project.js';
import { ktxLocalStateDbPath } from '../project/local-state-db.js';
import { redactKtxScanReport } from './credentials.js';
import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js';
import { resolveEnabledTables } from './enabled-tables.js';
import { completedKtxScanEnrichmentStateSummary } from './enrichment-state.js';
import { failedKtxScanEnrichmentSummary, ktxScanErrorMessage } from './enrichment-summary.js';
import {
@ -25,9 +25,7 @@ import type {
KtxConnectionDriver,
KtxProgressPort,
KtxScanConnector,
KtxScanContext,
KtxScanEnrichmentStateSummary,
KtxScanInput,
KtxScanMode,
KtxScanReport,
KtxScanTrigger,
@ -370,17 +368,6 @@ async function readScanReport(
}
}
function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set<string>): KtxScanConnector {
return {
...connector,
async introspect(input: KtxScanInput, ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
const snapshot = await connector.introspect(input, ctx);
return filterSnapshotTables(snapshot, enabledTables);
},
};
}
function withInternalLiveDatabaseAdapter(project: KtxLocalProject): KtxLocalProject {
if (project.config.ingest.adapters.includes(LIVE_DATABASE_ADAPTER)) {
return project;
@ -402,14 +389,17 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
assertSupportedMode(mode);
await options.progress?.update(0.05, 'Preparing scan');
const rawConnector = await resolveScanConnector(options, mode);
const ownsConnector = !!rawConnector && !options.connector;
try {
const connection = options.project.config.connections[options.connectionId];
if (!connection) {
throw new Error(`Connection "${options.connectionId}" is not configured in ktx.yaml`);
}
const driver = normalizeDriver(connection.driver);
const enabledTables = resolveEnabledTables(connection);
const connector = rawConnector && enabledTables ? createFilteredConnector(rawConnector, enabledTables) : rawConnector;
const tableScope = resolveEnabledTables(connection) ?? undefined;
const connector = rawConnector;
const adapters =
options.adapters ??
createDefaultLocalIngestAdapters(options.project, { databaseIntrospectionUrl: options.databaseIntrospectionUrl });
@ -441,6 +431,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
jobId: options.jobId,
now: options.now,
dryRun: options.dryRun,
tableScope,
});
await options.progress?.update(0.55, scanChangeSummary(scanDiffSummaryFromRecord(record)));
let report = reportFromIngest({
@ -467,6 +458,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
}
const enrichmentStateStore = connector ? createLocalScanEnrichmentStateStore(options) : null;
let enrichmentState: KtxScanEnrichmentStateSummary = completedKtxScanEnrichmentStateSummary();
let enrichmentSnapshot: KtxSchemaSnapshot | null = null;
if (!reusedExistingScanArtifacts && !report.dryRun && report.artifactPaths.rawSourcesDir) {
await options.progress?.update(0.7, 'Writing schema artifacts');
const rawSnapshot = await readLocalScanStructuralSnapshot({
@ -476,27 +468,13 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
rawSourcesDir: report.artifactPaths.rawSourcesDir,
extractedAtFallback: report.createdAt,
});
const structuralSnapshot = enabledTables ? filterSnapshotTables(rawSnapshot, enabledTables) : rawSnapshot;
if (enabledTables && structuralSnapshot.tables.length < rawSnapshot.tables.length) {
const excluded = rawSnapshot.tables.length - structuralSnapshot.tables.length;
let remaining = excluded;
const ds = report.diffSummary;
const subFrom = (field: 'tablesAdded' | 'tablesUnchanged' | 'tablesModified') => {
const take = Math.min(remaining, ds[field]);
ds[field] -= take;
remaining -= take;
};
subFrom('tablesAdded');
subFrom('tablesUnchanged');
subFrom('tablesModified');
await options.progress?.update(0.6, scanChangeSummary(report.diffSummary));
}
enrichmentSnapshot = rawSnapshot;
const manifestArtifacts = await writeLocalScanManifestShards({
project: options.project,
connectionId: options.connectionId,
syncId: record.syncId,
driver,
snapshot: structuralSnapshot,
snapshot: rawSnapshot,
dryRun: false,
});
report.artifactPaths.manifestShards = manifestArtifacts.manifestShards;
@ -515,6 +493,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
mode,
detectRelationships: options.detectRelationships,
connector,
...(enrichmentSnapshot ? { snapshot: enrichmentSnapshot } : {}),
context: { runId: record.runId, progress: options.progress?.startPhase(0.18) },
providers: enrichmentProviders,
stateStore: enrichmentStateStore,
@ -585,6 +564,11 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
syncId: record.syncId,
report,
};
} finally {
if (ownsConnector) {
await rawConnector?.cleanup?.();
}
}
}
/** @internal */

View file

@ -70,6 +70,7 @@ interface KtxRelationshipDiagnosticsPolicy {
validationRequiredForManifest: boolean;
maxCandidatesPerColumn: number;
profileSampleRows: number;
profileConcurrency: number;
validationConcurrency: number;
}
@ -118,6 +119,7 @@ const DEFAULT_POLICY: KtxRelationshipDiagnosticsPolicy = {
validationRequiredForManifest: true,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
};

View file

@ -228,6 +228,7 @@ export async function discoverKtxRelationships(
executor,
ctx: input.context,
profileSampleRows: input.settings.profileSampleRows,
profileConcurrency: input.settings.profileConcurrency,
cache: profileCache,
});
const deterministicCandidates: KtxRelationshipDiscoveryCandidate[] = generateKtxRelationshipDiscoveryCandidates(

View file

@ -1,7 +1,7 @@
import { readFile } from 'node:fs/promises';
import { join } from 'node:path';
import Database from 'better-sqlite3';
import { afterEach, describe, expect, it } from 'vitest';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
import { loadKtxRelationshipBenchmarkFixture, maskKtxRelationshipBenchmarkSnapshot } from './relationship-benchmarks.js';
@ -351,4 +351,94 @@ describe('relationship profiling', () => {
scaleExecutor.close();
}
});
it('profiles tables concurrently up to profileConcurrency', async () => {
let inFlight = 0;
let maxInFlight = 0;
const executor = {
executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await new Promise((resolve) => setTimeout(resolve, 10));
inFlight -= 1;
return {
headers: [
'column_name',
'table_row_count',
'row_count',
'null_count',
'distinct_count',
'min_text_length',
'max_text_length',
'sample_values',
],
rows: [[input.sql.includes('accounts') ? 'id' : 'account_id', 2, 2, 0, 2, 1, 2, '1\u001f2']],
totalRows: 1,
rowCount: 1,
};
}),
};
await profileKtxRelationshipSchema({
connectionId: 'warehouse',
driver: 'sqlite',
schema: schemaWithTables(['accounts', 'orders', 'payments', 'refunds']),
executor,
ctx: { runId: 'profile-concurrency' },
profileConcurrency: 4,
});
expect(maxInFlight).toBe(4);
});
it('keeps profiling other tables when one table profile fails', async () => {
const executor = {
executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => {
if (input.sql.includes('"orders"')) {
throw new Error('orders unavailable');
}
return {
headers: [
'column_name',
'table_row_count',
'row_count',
'null_count',
'distinct_count',
'min_text_length',
'max_text_length',
'sample_values',
],
rows: [['id', 2, 2, 0, 2, 1, 2, '1\u001f2']],
totalRows: 1,
rowCount: 1,
};
}),
};
const result = await profileKtxRelationshipSchema({
connectionId: 'warehouse',
driver: 'sqlite',
schema: schemaWithTables(['accounts', 'orders']),
executor,
ctx: { runId: 'profile-error-isolated' },
profileConcurrency: 2,
});
expect(result.warnings).toContain('profile_failed:orders:orders unavailable');
expect(result.tables).toHaveLength(2);
expect(Object.keys(result.columns)).toContain('accounts.id');
});
});
function schemaWithTables(names: string[]): KtxEnrichedSchema {
return schema(
names.map((name) =>
table(name, [
column(name, name === 'orders' ? 'account_id' : 'id', {
nullable: false,
primaryKey: name !== 'orders',
}),
]),
),
);
}

View file

@ -1,4 +1,5 @@
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import { mapWithConcurrency } from './relationship-validation.js';
import type {
KtxConnectionDriver,
KtxQueryResult,
@ -60,6 +61,7 @@ export interface ProfileKtxRelationshipSchemaInput {
ctx: KtxScanContext;
sampleValuesPerColumn?: number;
profileSampleRows?: number;
profileConcurrency?: number;
cache?: KtxRelationshipProfileCache;
}
@ -227,6 +229,9 @@ function sampleAggregateSql(driver: KtxConnectionDriver, innerSql: string): stri
if (driver === 'clickhouse') {
return `(SELECT arrayStringConcat(groupArray(toString(value)), '\\x1F') FROM (${innerSql}) AS relationship_profile_values)`;
}
if (driver === 'snowflake') {
return `(SELECT LISTAGG(CAST(value AS VARCHAR), '\\x1f') FROM (${innerSql}) AS relationship_profile_values)`;
}
return `(SELECT GROUP_CONCAT(CAST(value AS TEXT), char(31)) FROM (${innerSql}) AS relationship_profile_values)`;
}
@ -386,6 +391,10 @@ async function queryTableProfile(input: {
};
}
type TableProfileResult =
| { tableProfile: Awaited<ReturnType<typeof queryTableProfile>> }
| { cached: KtxRelationshipCachedTableProfile; queryCount: 0 };
export async function profileKtxRelationshipSchema(
input: ProfileKtxRelationshipSchemaInput,
): Promise<KtxRelationshipProfileArtifact> {
@ -405,54 +414,68 @@ export async function profileKtxRelationshipSchema(
const tables: KtxRelationshipTableProfile[] = [];
const columns: Record<string, KtxRelationshipColumnProfile> = {};
const warnings: string[] = [];
const executor = input.executor;
for (const table of input.schema.tables.filter((candidate) => candidate.enabled)) {
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
const profileSampleRows = input.profileSampleRows ?? 10000;
const cacheKey = tableProfileCacheKey({
connectionId: input.connectionId,
driver: input.driver,
ctx: input.ctx,
table: table.ref,
sampleValuesPerColumn,
profileSampleRows,
});
const cached = input.cache?.tableProfiles.get(cacheKey);
if (cached) {
tables.push(cached.table);
Object.assign(columns, cached.columns);
for (const warning of cached.warnings) {
warnings.push(warning);
}
continue;
}
try {
const tableProfile = await queryTableProfile({
const enabledTables = input.schema.tables.filter((candidate) => candidate.enabled);
const tableResults = await mapWithConcurrency<KtxEnrichedTable, TableProfileResult>(
enabledTables,
input.profileConcurrency ?? 4,
async (table) => {
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
const profileSampleRows = input.profileSampleRows ?? 10000;
const cacheKey = tableProfileCacheKey({
connectionId: input.connectionId,
driver: input.driver,
table,
executor: input.executor,
ctx: input.ctx,
table: table.ref,
sampleValuesPerColumn,
profileSampleRows,
});
queryTotal += tableProfile.queryCount;
tables.push(tableProfile.table);
Object.assign(columns, tableProfile.columns);
input.cache?.tableProfiles.set(cacheKey, {
table: tableProfile.table,
columns: tableProfile.columns,
warnings: [],
});
} catch (error) {
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
warnings.push(failureWarning);
input.cache?.tableProfiles.set(cacheKey, {
table: { table: table.ref, rowCount: 0 },
columns: {},
warnings: [failureWarning],
});
const cached = input.cache?.tableProfiles.get(cacheKey);
if (cached) {
return { cached, queryCount: 0 };
}
try {
const tableProfile = await queryTableProfile({
connectionId: input.connectionId,
driver: input.driver,
table,
executor,
ctx: input.ctx,
sampleValuesPerColumn,
profileSampleRows,
});
input.cache?.tableProfiles.set(cacheKey, {
table: tableProfile.table,
columns: tableProfile.columns,
warnings: [],
});
return { tableProfile };
} catch (error) {
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
const cachedFailure = {
table: { table: table.ref, rowCount: 0 },
columns: {},
warnings: [failureWarning],
};
input.cache?.tableProfiles.set(cacheKey, cachedFailure);
return { cached: cachedFailure, queryCount: 0 };
}
},
);
for (const result of tableResults) {
if ('tableProfile' in result) {
queryTotal += result.tableProfile.queryCount;
tables.push(result.tableProfile.table);
Object.assign(columns, result.tableProfile.columns);
continue;
}
tables.push(result.cached.table);
Object.assign(columns, result.cached.columns);
for (const warning of result.cached.warnings) {
warnings.push(warning);
}
}

View file

@ -193,7 +193,7 @@ function statusFor(input: {
return 'rejected';
}
async function mapWithConcurrency<TInput, TOutput>(
export async function mapWithConcurrency<TInput, TOutput>(
inputs: readonly TInput[],
concurrency: number,
mapOne: (input: TInput) => Promise<TOutput>,

View file

@ -0,0 +1,67 @@
import { describe, expect, it } from 'vitest';
import {
scopedTableNames,
tableRefFromKey,
tableRefKey,
tableRefSet,
type KtxTableRefKey,
} from './table-ref.js';
describe('tableRefKey roundtrip', () => {
it('encodes and decodes a three-part ref', () => {
const ref = { catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' };
expect(tableRefFromKey(tableRefKey(ref))).toEqual(ref);
});
it('treats null catalog/db as the empty segment', () => {
const ref = { catalog: null, db: 'public', name: 'users' };
expect(tableRefFromKey(tableRefKey(ref))).toEqual(ref);
});
it('roundtrips a bare-name ref', () => {
const ref = { catalog: null, db: null, name: 'orders' };
expect(tableRefFromKey(tableRefKey(ref))).toEqual(ref);
});
});
describe('tableRefSet', () => {
it('produces a set with member-equality on canonical keys', () => {
const scope = tableRefSet([
{ catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' },
{ catalog: 'ANALYTICS', db: 'MARTS', name: 'ITEMS' },
]);
expect(scope.size).toBe(2);
expect(scope.has(tableRefKey({ catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' }))).toBe(true);
expect(scope.has(tableRefKey({ catalog: 'ANALYTICS', db: 'MARTS', name: 'OTHER' }))).toBe(false);
});
});
describe('scopedTableNames', () => {
it('projects to the requested (catalog, db) namespace', () => {
const scope = tableRefSet([
{ catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' },
{ catalog: 'ANALYTICS', db: 'MARTS', name: 'ITEMS' },
{ catalog: 'ANALYTICS', db: 'STAGING', name: 'LISTINGS' },
]);
expect(scopedTableNames(scope, { catalog: 'ANALYTICS', db: 'MARTS' }).sort()).toEqual(['ITEMS', 'LISTINGS']);
expect(scopedTableNames(scope, { catalog: 'ANALYTICS', db: 'STAGING' })).toEqual(['LISTINGS']);
});
it('treats null in the scope entry as a wildcard for that segment', () => {
const scope = tableRefSet([{ catalog: null, db: 'public', name: 'users' }]);
expect(scopedTableNames(scope, { catalog: 'any-catalog', db: 'public' })).toEqual(['users']);
});
it('returns empty when no scope entry matches the namespace', () => {
const scope = tableRefSet([{ catalog: 'A', db: 'B', name: 'C' }]);
expect(scopedTableNames(scope, { catalog: 'X', db: 'Y' })).toEqual([]);
});
it('dedupes when the same name appears under different catalog projections', () => {
const scope: ReadonlySet<KtxTableRefKey> = tableRefSet([
{ catalog: null, db: 'public', name: 'users' },
{ catalog: 'A', db: 'public', name: 'users' },
]);
expect(scopedTableNames(scope, { catalog: 'A', db: 'public' })).toEqual(['users']);
});
});

View file

@ -0,0 +1,53 @@
import type { KtxTableRef } from './types.js';
/**
* Branded canonical string representation of a {@link KtxTableRef}.
*
* Connectors compare scopes for set membership via these keys instead of the
* raw object (JS `Set<object>` uses identity equality, which would be useless
* here). Build a key with {@link tableRefKey} and decode with
* {@link tableRefFromKey}.
*/
export type KtxTableRefKey = string & { readonly __brand: 'KtxTableRefKey' };
const SEPARATOR = '\x1f';
/** @internal */
export function tableRefKey(ref: KtxTableRef): KtxTableRefKey {
return `${ref.catalog ?? ''}${SEPARATOR}${ref.db ?? ''}${SEPARATOR}${ref.name}` as KtxTableRefKey;
}
/** @internal */
export function tableRefFromKey(key: KtxTableRefKey): KtxTableRef {
const [catalog = '', db = '', name = ''] = key.split(SEPARATOR);
return {
catalog: catalog.length > 0 ? catalog : null,
db: db.length > 0 ? db : null,
name,
};
}
export function tableRefSet(refs: readonly KtxTableRef[]): ReadonlySet<KtxTableRefKey> {
return new Set(refs.map(tableRefKey));
}
/**
* Return the bare table names from a scope that fall within the given
* (catalog, db) namespace. `catalog: null` is treated as a wildcard so that
* legacy 2-part `"db.name"` entries continue to match. Same for `db: null`.
*/
export function scopedTableNames(
scope: ReadonlySet<KtxTableRefKey>,
namespace: { catalog?: string | null; db?: string | null },
): string[] {
const names = new Set<string>();
const wantCatalog = namespace.catalog ?? null;
const wantDb = namespace.db ?? null;
for (const key of scope) {
const ref = tableRefFromKey(key);
if (wantCatalog !== null && ref.catalog !== null && ref.catalog !== wantCatalog) continue;
if (wantDb !== null && ref.db !== null && ref.db !== wantDb) continue;
names.add(ref.name);
}
return [...names];
}

View file

@ -1,3 +1,5 @@
import type { KtxTableRefKey } from './table-ref.js';
export type KtxConnectionDriver =
| 'sqlite'
| 'postgres'
@ -137,6 +139,14 @@ export interface KtxScanInput {
connectionId: string;
driver: KtxConnectionDriver;
scope?: KtxSchemaScope;
/**
* Restricts introspection to a specific set of fully-qualified tables.
* `undefined` means "all tables within {@link scope}". Connectors that honor
* this field should push the filter into their metadata queries. Callers do
* not post-filter, so a connector that ignores `tableScope` will over-fetch
* and surface the extra tables in output.
*/
tableScope?: ReadonlySet<KtxTableRefKey>;
mode?: KtxScanMode;
dryRun?: boolean;
detectRelationships?: boolean;