feat(scan): enforce table scope at fetch boundary

This commit is contained in:
Andrey Avtomonov 2026-05-22 18:36:53 +02:00
parent a698389bc9
commit 5b8292cacd
19 changed files with 208 additions and 160 deletions

View file

@ -1,6 +1,7 @@
import { once } from 'node:events';
import { createServer } from 'node:http';
import { describe, expect, it, vi } from 'vitest';
import { tableRefSet } from '../../../scan/table-ref.js';
import { createDaemonLiveDatabaseIntrospection } from './daemon-introspection.js';
const daemonResponse = {
@ -161,7 +162,11 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
baseUrl: `http://127.0.0.1:${address.port}`,
});
await expect(introspection.extractSchema('warehouse')).resolves.toMatchObject({
await expect(
introspection.extractSchema('warehouse', {
tableScope: tableRefSet([{ catalog: 'warehouse', db: 'public', name: 'orders' }]),
}),
).resolves.toMatchObject({
connectionId: 'warehouse',
tables: [{ name: 'customers' }, { name: 'orders' }],
});
@ -176,6 +181,7 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
schemas: ['public'],
statement_timeout_ms: 30_000,
connection_timeout_seconds: 5,
table_scope: [{ catalog: 'warehouse', db: 'public', name: 'orders' }],
},
},
]);
@ -217,7 +223,7 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
expect(runJson).not.toHaveBeenCalled();
});
it('filters out tables not on the enabled_tables allowlist', async () => {
it('does not use connection enabled_tables as a response filter', async () => {
const runJson = vi.fn(async () => daemonResponse);
const introspection = createDaemonLiveDatabaseIntrospection({
connections: {
@ -232,7 +238,8 @@ describe('createDaemonLiveDatabaseIntrospection', () => {
});
const snapshot = await introspection.extractSchema('warehouse');
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.orders']);
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.customers', 'public.orders']);
expect(runJson).toHaveBeenCalledWith('database-introspect', expect.not.objectContaining({ table_scope: expect.anything() }));
});
it('passes through every table when enabled_tables is omitted or empty', async () => {

View file

@ -3,7 +3,7 @@ import { request as httpRequest } from 'node:http';
import { request as httpsRequest } from 'node:https';
import { URL } from 'node:url';
import type { KtxProjectConnectionConfig } from '../../../project/config.js';
import { filterSnapshotTables, resolveEnabledTables } from '../../../scan/enabled-tables.js';
import { tableRefFromKey } from '../../../scan/table-ref.js';
import type { KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaSnapshot, KtxSchemaTable } from '../../../scan/types.js';
import { inferKtxDimensionType, normalizeKtxNativeType } from '../../../scan/type-normalization.js';
import type { LiveDatabaseIntrospectionOptions, LiveDatabaseIntrospectionPort } from './types.js';
@ -220,6 +220,18 @@ function mapDaemonSnapshot(
};
}
function serializeTableScope(options: LiveDatabaseIntrospectionOptions | undefined): Array<{
catalog: string | null;
db: string | null;
name: string;
}> | undefined {
if (!options?.tableScope) return undefined;
return [...options.tableScope].map((key) => {
const ref = tableRefFromKey(key);
return { catalog: ref.catalog, db: ref.db, name: ref.name };
});
}
export function createDaemonLiveDatabaseIntrospection(
options: DaemonLiveDatabaseIntrospectionOptions,
): LiveDatabaseIntrospectionPort {
@ -231,8 +243,9 @@ export function createDaemonLiveDatabaseIntrospection(
const now = options.now ?? (() => new Date());
return {
async extractSchema(connectionId: string, _options?: LiveDatabaseIntrospectionOptions): Promise<KtxSchemaSnapshot> {
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions): Promise<KtxSchemaSnapshot> {
const connection = requirePostgresConnection(options.connections, connectionId);
const tableScope = serializeTableScope(introspectionOptions);
const payload = {
connection_id: connectionId,
driver: normalizeDriver(connection.driver),
@ -240,17 +253,16 @@ export function createDaemonLiveDatabaseIntrospection(
schemas,
statement_timeout_ms: options.statementTimeoutMs ?? 30_000,
connection_timeout_seconds: options.connectionTimeoutSeconds ?? 5,
...(tableScope !== undefined ? { table_scope: tableScope } : {}),
};
const raw = requestJson
? await requestJson('/database/introspect', payload)
: await runJson('database-introspect', payload);
const snapshot = mapDaemonSnapshot(raw, {
return mapDaemonSnapshot(raw, {
connectionId,
extractedAt: now().toISOString(),
schemas,
});
const enabledTables = resolveEnabledTables(connection);
return enabledTables ? filterSnapshotTables(snapshot, enabledTables) : snapshot;
},
};
}

View file

@ -1,4 +1,4 @@
import { mkdtemp, readdir, readFile, rm } from 'node:fs/promises';
import { mkdtemp, readdir, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
@ -58,7 +58,7 @@ describe('LiveDatabaseSourceAdapter', () => {
expect(adapter.skillNames).toEqual(['live_database_ingest']);
});
it('threads tableScope into the introspection port and applies a defensive final filter', async () => {
it('threads tableScope from fetch context into the introspection port without post-filtering', async () => {
const extractSchema = vi.fn(
async (_connectionId: string, _options?: { tableScope?: ReadonlySet<KtxTableRefKey> }) => ({
connectionId: 'warehouse',
@ -93,19 +93,17 @@ describe('LiveDatabaseSourceAdapter', () => {
const scope = tableRefSet([{ catalog: 'A', db: 'MARTS', name: 'IN_SCOPE' }]);
const adapter = new LiveDatabaseSourceAdapter({
introspection: { extractSchema },
resolveTableScope: (connectionId) => (connectionId === 'warehouse' ? scope : undefined),
});
const stagedDir = await mkdtemp(join(tmpdir(), 'ktx-livedb-scope-'));
try {
await adapter.fetch(undefined, stagedDir, {
connectionId: 'warehouse',
sourceKey: 'live-database',
} as never);
tableScope: scope,
});
expect(extractSchema).toHaveBeenCalledWith('warehouse', { tableScope: scope });
const tables = await readdir(join(stagedDir, 'tables'));
expect(tables).toHaveLength(1);
const table = JSON.parse(await readFile(join(stagedDir, 'tables', tables[0]!), 'utf8')) as { name?: string };
expect(table.name).toBe('IN_SCOPE');
expect(tables).toHaveLength(2);
} finally {
await rm(stagedDir, { recursive: true, force: true });
}

View file

@ -1,5 +1,4 @@
import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js';
import { filterSnapshotTables } from '../../../scan/enabled-tables.js';
import { chunkLiveDatabaseStagedDir } from './chunk.js';
import { detectLiveDatabaseStagedDir, writeLiveDatabaseSnapshot } from './stage.js';
import type { LiveDatabaseSourceAdapterDeps } from './types.js';
@ -15,13 +14,12 @@ export class LiveDatabaseSourceAdapter implements SourceAdapter {
}
async fetch(_pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise<void> {
const tableScope = this.deps.resolveTableScope?.(ctx.connectionId);
const tableScope = ctx.tableScope;
const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId, { tableScope });
const filtered = tableScope ? filterSnapshotTables(snapshot, tableScope) : snapshot;
await writeLiveDatabaseSnapshot(stagedDir, {
...filtered,
...snapshot,
connectionId: ctx.connectionId,
extractedAt: filtered.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(),
extractedAt: snapshot.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(),
});
}

View file

@ -12,5 +12,4 @@ export interface LiveDatabaseIntrospectionPort {
export interface LiveDatabaseSourceAdapterDeps {
introspection: LiveDatabaseIntrospectionPort;
now?: () => Date;
resolveTableScope?: (connectionId: string) => ReadonlySet<KtxTableRefKey> | undefined;
}

View file

@ -4,7 +4,6 @@ import { notionConnectionToPullConfig, parseNotionConnectionConfig } from '../..
import { resolveKtxConfigReference } from '../core/config-reference.js';
import { ktxLocalStateDbPath } from '../../context/project/local-state-db.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { resolveEnabledTables } from '../../context/scan/enabled-tables.js';
import type { SqlAnalysisPort } from '../../context/sql-analysis/ports.js';
import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js';
import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
@ -91,10 +90,6 @@ export function createDefaultLocalIngestAdapters(
...options.databaseIntrospection,
...(options.databaseIntrospectionUrl ? { baseUrl: options.databaseIntrospectionUrl } : {}),
}),
resolveTableScope: (connectionId) => {
const connection = project.config.connections[connectionId];
return connection ? resolveEnabledTables(connection) ?? undefined : undefined;
},
}),
new LookmlSourceAdapter({
homeDir: join(project.projectDir, '.ktx/cache'),

View file

@ -9,6 +9,7 @@ import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js';
import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js';
import { buildSyncId } from './raw-sources-paths.js';
import { SqliteLocalIngestStore } from './sqlite-local-ingest-store.js';
import type { KtxTableRefKey } from '../scan/table-ref.js';
import type { IngestTrigger, SourceAdapter, WorkUnit } from './types.js';
type LocalIngestStatus = 'running' | 'done' | 'error';
@ -62,6 +63,7 @@ export interface RunLocalStageOnlyIngestOptions {
now?: () => Date;
dryRun?: boolean;
memoryFlow?: MemoryFlowEventSink;
tableScope?: ReadonlySet<KtxTableRefKey>;
}
const LOCAL_AUTHOR = 'ktx';
@ -225,6 +227,7 @@ async function prepareLocalStagedDir(
stagedDir: string,
sourceDir: string | undefined,
connectionId: string,
tableScope: ReadonlySet<KtxTableRefKey> | undefined,
): Promise<string | null> {
await rm(stagedDir, { recursive: true, force: true });
await mkdir(stagedDir, { recursive: true });
@ -242,7 +245,7 @@ async function prepareLocalStagedDir(
);
}
const pullConfig = await localPullConfigForAdapter(project, adapter, connectionId);
await adapter.fetch(pullConfig, stagedDir, { connectionId, sourceKey: adapter.source });
await adapter.fetch(pullConfig, stagedDir, { connectionId, sourceKey: adapter.source, tableScope });
return null;
}
@ -274,7 +277,14 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti
assertCompatibleExistingRun(existingRun, runId, adapter.source, connectionId);
const stagedDir = join(options.project.projectDir, '.ktx/cache/local-ingest', runId, 'staged');
const sourceDir = await prepareLocalStagedDir(options.project, adapter, stagedDir, options.sourceDir, connectionId);
const sourceDir = await prepareLocalStagedDir(
options.project,
adapter,
stagedDir,
options.sourceDir,
connectionId,
options.tableScope,
);
const detected = await adapter.detect(stagedDir);
if (!detected) {

View file

@ -2,6 +2,7 @@ import type { KtxEmbeddingPort } from '../core/embedding.js';
import type { MemoryAction } from '../../context/memory/types.js';
import type { SemanticLayerService } from '../../context/sl/semantic-layer.service.js';
import type { TouchedSlSource } from '../../context/tools/touched-sl-sources.js';
import type { KtxTableRefKey } from '../scan/table-ref.js';
import type { MemoryFlowEventSink } from './memory-flow/types.js';
import type { StageIndex } from './stages/stage-index.types.js';
import type { WorkUnitOutcome } from './stages/stage-3-work-units.js';
@ -52,6 +53,7 @@ export interface ChunkResult {
export interface FetchContext {
connectionId: string;
sourceKey: string;
tableScope?: ReadonlySet<KtxTableRefKey>;
memoryFlow?: MemoryFlowEventSink;
}

View file

@ -1,5 +1,5 @@
import { hasTableRef, tableRefSet, type KtxTableRefKey } from './table-ref.js';
import type { KtxSchemaSnapshot, KtxTableRef } from './types.js';
import { tableRefSet, type KtxTableRefKey } from './table-ref.js';
import type { KtxTableRef } from './types.js';
/**
* Parses the `enabled_tables` field on a connection into a scope of
@ -61,16 +61,3 @@ function parseDottedEntry(value: string): KtxTableRef | null {
}
return null;
}
/** @internal — kept as a defensive backstop for the live-database adapter and tests. */
export function filterSnapshotTables(
snapshot: KtxSchemaSnapshot,
enabledTables: ReadonlySet<KtxTableRefKey>,
): KtxSchemaSnapshot {
return {
...snapshot,
tables: snapshot.tables.filter((table) =>
hasTableRef(enabledTables, { catalog: table.catalog, db: table.db, name: table.name }),
),
};
}

View file

@ -6,15 +6,14 @@ import YAML from 'yaml';
import type { SourceAdapter } from '../../context/ingest/types.js';
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js';
import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js';
import { resolveEnabledTables } from './enabled-tables.js';
import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
import { tableRefKey, tableRefSet } from './table-ref.js';
import { tableRefKey, tableRefSet, type KtxTableRefKey } from './table-ref.js';
import type {
KtxQueryResult,
KtxReadOnlyQueryInput,
KtxScanConnector,
KtxSchemaSnapshot,
KtxSchemaTable,
} from './types.js';
function relationshipSqlResult(
@ -251,6 +250,73 @@ describe('local scan', () => {
});
});
it('passes enabled_tables as fetch context tableScope and does not post-filter staged snapshots', async () => {
project.config.connections.warehouse = {
...project.config.connections.warehouse,
enabled_tables: ['public.orders'],
};
let capturedTableScope: ReadonlySet<KtxTableRefKey> | undefined;
const adapter: SourceAdapter = {
source: 'live-database',
skillNames: ['live_database_ingest'],
async fetch(_pullConfig, stagedDir, ctx) {
capturedTableScope = ctx.tableScope;
await mkdir(join(stagedDir, 'tables'), { recursive: true });
await writeFile(
join(stagedDir, 'connection.json'),
'{"connectionId":"warehouse","driver":"postgres","scope":{"schemas":["public"]},"metadata":{}}\n',
'utf-8',
);
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
await writeFile(
join(stagedDir, 'tables', 'customers.json'),
'{"name":"customers","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":100,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
await writeFile(
join(stagedDir, 'tables', 'orders.json'),
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":1000,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
},
async detect() {
return true;
},
async chunk() {
return {
workUnits: [
{
unitKey: 'live-database-public-customers',
rawFiles: ['tables/customers.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
{
unitKey: 'live-database-public-orders',
rawFiles: ['tables/orders.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
],
};
},
};
const result = await runLocalScan({
project,
adapters: [adapter],
connectionId: 'warehouse',
jobId: 'scan-strict-scope-fetch',
now: () => new Date('2026-05-22T00:00:00.000Z'),
});
expect([...(capturedTableScope ?? [])]).toEqual([...tableRefSet([{ catalog: null, db: 'public', name: 'orders' }])]);
expect(result.report.diffSummary.tablesAdded).toBe(2);
const structuralManifest = await readFile(join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8');
expect(structuralManifest).toContain('customers:');
expect(structuralManifest).toContain('orders:');
});
it('runs a structural database scan when live-database is not listed in ktx.yaml', async () => {
await writeDatabaseConfigWithoutIngestAdapters(project.projectDir);
project = await loadKtxProject({ projectDir: project.projectDir });
@ -1670,57 +1736,3 @@ describe('resolveEnabledTables', () => {
expect(resolveEnabledTables(undefined)).toBeNull();
});
});
describe('filterSnapshotTables', () => {
function makeSnapshot(tables: Array<{ db: string; name: string }>): KtxSchemaSnapshot {
return {
connectionId: 'test',
driver: 'postgres',
extractedAt: '2026-01-01T00:00:00Z',
scope: {},
metadata: {},
tables: tables.map(
(t): KtxSchemaTable => ({
catalog: null,
db: t.db,
name: t.name,
kind: 'table',
comment: null,
estimatedRows: null,
columns: [],
foreignKeys: [],
}),
),
};
}
it('keeps only enabled tables', () => {
const snapshot = makeSnapshot([
{ db: 'public', name: 'users' },
{ db: 'public', name: 'orders' },
{ db: 'public', name: 'logs' },
]);
const enabled = tableRefSet([
{ catalog: null, db: 'public', name: 'users' },
{ catalog: null, db: 'public', name: 'orders' },
]);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(2);
expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']);
});
it('returns empty tables when none match', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'orders' }]);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(0);
});
it('preserves other snapshot fields', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'users' }]);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.connectionId).toBe('test');
expect(filtered.driver).toBe('postgres');
});
});

View file

@ -10,7 +10,7 @@ import type { KtxProjectLlmConfig, KtxScanEnrichmentConfig, KtxScanRelationshipC
import type { KtxLocalProject } from '../../context/project/project.js';
import { ktxLocalStateDbPath } from '../project/local-state-db.js';
import { redactKtxScanReport } from './credentials.js';
import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js';
import { resolveEnabledTables } from './enabled-tables.js';
import { completedKtxScanEnrichmentStateSummary } from './enrichment-state.js';
import { failedKtxScanEnrichmentSummary, ktxScanErrorMessage } from './enrichment-summary.js';
import {
@ -427,6 +427,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
jobId: options.jobId,
now: options.now,
dryRun: options.dryRun,
tableScope,
});
await options.progress?.update(0.55, scanChangeSummary(scanDiffSummaryFromRecord(record)));
let report = reportFromIngest({
@ -462,27 +463,12 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
rawSourcesDir: report.artifactPaths.rawSourcesDir,
extractedAtFallback: report.createdAt,
});
const structuralSnapshot = tableScope ? filterSnapshotTables(rawSnapshot, tableScope) : rawSnapshot;
if (tableScope && structuralSnapshot.tables.length < rawSnapshot.tables.length) {
const excluded = rawSnapshot.tables.length - structuralSnapshot.tables.length;
let remaining = excluded;
const ds = report.diffSummary;
const subFrom = (field: 'tablesAdded' | 'tablesUnchanged' | 'tablesModified') => {
const take = Math.min(remaining, ds[field]);
ds[field] -= take;
remaining -= take;
};
subFrom('tablesAdded');
subFrom('tablesUnchanged');
subFrom('tablesModified');
await options.progress?.update(0.6, scanChangeSummary(report.diffSummary));
}
const manifestArtifacts = await writeLocalScanManifestShards({
project: options.project,
connectionId: options.connectionId,
syncId: record.syncId,
driver,
snapshot: structuralSnapshot,
snapshot: rawSnapshot,
dryRun: false,
});
report.artifactPaths.manifestShards = manifestArtifacts.manifestShards;

View file

@ -1,6 +1,5 @@
import { describe, expect, it } from 'vitest';
import {
hasTableRef,
scopedTableNames,
tableRefFromKey,
tableRefKey,
@ -37,26 +36,6 @@ describe('tableRefSet', () => {
});
});
describe('hasTableRef', () => {
const scope = tableRefSet([
{ catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' },
{ catalog: null, db: 'public', name: 'users' },
]);
it('matches fully qualified entries exactly', () => {
expect(hasTableRef(scope, { catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' })).toBe(true);
});
it('matches when the scope omits catalog (legacy 2-part entry)', () => {
expect(hasTableRef(scope, { catalog: 'PRODUCTION_DB', db: 'public', name: 'users' })).toBe(true);
});
it('rejects refs not in the scope', () => {
expect(hasTableRef(scope, { catalog: 'ANALYTICS', db: 'STAGING', name: 'LISTINGS' })).toBe(false);
expect(hasTableRef(scope, { catalog: null, db: 'public', name: 'orders' })).toBe(false);
});
});
describe('scopedTableNames', () => {
it('projects to the requested (catalog, db) namespace', () => {
const scope = tableRefSet([

View file

@ -31,17 +31,6 @@ export function tableRefSet(refs: readonly KtxTableRef[]): ReadonlySet<KtxTableR
return new Set(refs.map(tableRefKey));
}
export function hasTableRef(scope: ReadonlySet<KtxTableRefKey>, ref: KtxTableRef): boolean {
if (scope.has(tableRefKey(ref))) return true;
if (ref.catalog !== null) {
if (scope.has(tableRefKey({ ...ref, catalog: null }))) return true;
}
if (ref.db !== null) {
if (scope.has(tableRefKey({ ...ref, db: null }))) return true;
}
return false;
}
/**
* Return the bare table names from a scope that fall within the given
* (catalog, db) namespace. `catalog: null` is treated as a wildcard so that

View file

@ -142,10 +142,9 @@ export interface KtxScanInput {
/**
* Restricts introspection to a specific set of fully-qualified tables.
* `undefined` means "all tables within {@link scope}". Connectors that honor
* this field should push the filter into their metadata queries; the
* live-database adapter also applies a final filter before writing, so a
* connector that ignores `tableScope` will over-fetch but produce correct
* output.
* this field should push the filter into their metadata queries. Callers do
* not post-filter, so a connector that ignores `tableScope` will over-fetch
* and surface the extra tables in output.
*/
tableScope?: ReadonlySet<KtxTableRefKey>;
mode?: KtxScanMode;

View file

@ -24,7 +24,6 @@ import { PostgresPgssReader } from './context/ingest/adapters/historic-sql/postg
import { SnowflakeHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/snowflake-query-history-reader.js';
import type { SourceAdapter } from './context/ingest/types.js';
import type { KtxLocalProject } from './context/project/project.js';
import { resolveEnabledTables } from './context/scan/enabled-tables.js';
import { createHttpSqlAnalysisPort } from './context/sql-analysis/http-sql-analysis-port.js';
import type { SqlAnalysisPort } from './context/sql-analysis/ports.js';
import {
@ -367,10 +366,6 @@ export function createKtxCliLocalIngestAdapters(
});
const liveDatabase = new LiveDatabaseSourceAdapter({
introspection: createKtxCliLiveDatabaseIntrospection(project, options),
resolveTableScope: (connectionId) => {
const connection = project.config.connections[connectionId];
return connection ? resolveEnabledTables(connection) ?? undefined : undefined;
},
});
return base.map((adapter) => (adapter.source === 'live-database' ? liveDatabase : adapter));
}

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import json
from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass
from datetime import datetime, timezone
@ -24,6 +25,16 @@ join pg_catalog.pg_class c
and c.relname = t.table_name
where t.table_schema = any(%s)
and t.table_type = 'BASE TABLE'
and (
%s::jsonb is null
or exists (
select 1
from jsonb_to_recordset(%s::jsonb) as scope(catalog text, db text, name text)
where (scope.catalog is null or scope.catalog = current_database())
and (scope.db is null or scope.db = t.table_schema)
and scope.name = t.table_name
)
)
order by t.table_schema, t.table_name
"""
@ -52,6 +63,16 @@ where n.nspname = any(%s)
and c.relkind in ('r', 'p')
and a.attnum > 0
and not a.attisdropped
and (
%s::jsonb is null
or exists (
select 1
from jsonb_to_recordset(%s::jsonb) as scope(catalog text, db text, name text)
where (scope.catalog is null or scope.catalog = current_database())
and (scope.db is null or scope.db = n.nspname)
and scope.name = c.relname
)
)
order by n.nspname, c.relname, a.attnum
"""
@ -80,6 +101,16 @@ join information_schema.key_column_usage target_key
and target_key.ordinal_position = source_key.position_in_unique_constraint
where source_constraint.constraint_type = 'FOREIGN KEY'
and source_constraint.table_schema = any(%s)
and (
%s::jsonb is null
or exists (
select 1
from jsonb_to_recordset(%s::jsonb) as scope(catalog text, db text, name text)
where (scope.catalog is null or scope.catalog = current_database())
and (scope.db is null or scope.db = source_constraint.table_schema)
and scope.name = source_constraint.table_name
)
)
order by source_constraint.table_schema, source_constraint.table_name, source_constraint.constraint_name, source_key.ordinal_position
"""
@ -108,6 +139,12 @@ class LiveDatabaseTable(BaseModel):
foreign_keys: list[LiveDatabaseForeignKey] = Field(default_factory=list)
class LiveDatabaseTableScopeRef(BaseModel):
catalog: str | None = None
db: str | None = None
name: str
class DatabaseIntrospectionRequest(BaseModel):
connection_id: str
driver: str = "postgres"
@ -115,6 +152,7 @@ class DatabaseIntrospectionRequest(BaseModel):
schemas: list[str] = Field(default_factory=lambda: ["public"])
statement_timeout_ms: int = Field(default=30_000, ge=1)
connection_timeout_seconds: int = Field(default=5, ge=1)
table_scope: list[LiveDatabaseTableScopeRef] | None = None
@field_validator("schemas")
@classmethod
@ -169,6 +207,23 @@ def _statement_timeout_config(statement_timeout_ms: int) -> tuple[str, tuple[str
)
def _table_scope_json(
table_scope: Sequence[LiveDatabaseTableScopeRef] | None,
) -> str | None:
if table_scope is None:
return None
return json.dumps(
[
{
"catalog": ref.catalog,
"db": ref.db,
"name": ref.name,
}
for ref in table_scope
]
)
def _load_postgres_rows(
request: DatabaseIntrospectionRequest,
) -> DatabaseIntrospectionRows:
@ -190,7 +245,8 @@ def _load_postgres_rows(
connection.execute("BEGIN READ ONLY")
try:
connection.execute(*_statement_timeout_config(request.statement_timeout_ms))
params = (request.schemas,)
scope_json = _table_scope_json(request.table_scope)
params = (request.schemas, scope_json, scope_json)
table_rows = list(connection.execute(TABLES_SQL, params))
column_rows = list(connection.execute(COLUMNS_SQL, params))
foreign_key_rows = list(connection.execute(FOREIGN_KEYS_SQL, params))

View file

@ -114,6 +114,7 @@ def test_database_introspect_endpoint_returns_snapshot() -> None:
"driver": "postgres",
"url": "postgresql://readonly@example.test/warehouse",
"schemas": ["public"],
"table_scope": [{"db": "public", "name": "orders"}],
},
)
@ -121,6 +122,8 @@ def test_database_introspect_endpoint_returns_snapshot() -> None:
assert response.json()["connection_id"] == "warehouse"
assert response.json()["tables"][0]["name"] == "orders"
assert calls[0].connection_id == "warehouse"
assert calls[0].table_scope[0].db == "public"
assert calls[0].table_scope[0].name == "orders"
def test_database_introspect_endpoint_maps_value_error_to_400() -> None:

View file

@ -311,6 +311,9 @@ def test_database_introspect_command_reads_stdin_and_writes_json(
assert request.connection_id == "warehouse"
assert request.driver == "postgres"
assert request.schemas == ["public"]
assert request.table_scope is not None
assert request.table_scope[0].db == "public"
assert request.table_scope[0].name == "orders"
return DatabaseIntrospectionResponse(
connection_id="warehouse",
extracted_at="2026-04-28T10:00:00+00:00",
@ -337,7 +340,7 @@ def test_database_introspect_command_reads_stdin_and_writes_json(
sys,
"stdin",
io.StringIO(
'{"connection_id":"warehouse","driver":"postgres","url":"postgresql://readonly@example.test/warehouse","schemas":["public"]}'
'{"connection_id":"warehouse","driver":"postgres","url":"postgresql://readonly@example.test/warehouse","schemas":["public"],"table_scope":[{"db":"public","name":"orders"}]}'
),
)

View file

@ -5,7 +5,9 @@ import pytest
from ktx_daemon.database_introspection import (
DatabaseIntrospectionRequest,
DatabaseIntrospectionRows,
LiveDatabaseTableScopeRef,
_statement_timeout_config,
_table_scope_json,
introspect_database_response,
)
@ -146,6 +148,22 @@ def test_database_introspection_request_rejects_empty_schema_list() -> None:
)
def test_table_scope_json_serializes_null_wildcards() -> None:
assert _table_scope_json(
[
LiveDatabaseTableScopeRef(catalog=None, db="public", name="orders"),
LiveDatabaseTableScopeRef(
catalog="warehouse",
db="marts",
name="customers",
),
]
) == (
'[{"catalog": null, "db": "public", "name": "orders"}, '
'{"catalog": "warehouse", "db": "marts", "name": "customers"}]'
)
def test_statement_timeout_config_uses_parameterized_set_config() -> None:
assert _statement_timeout_config(30_000) == (
"SELECT set_config('statement_timeout', %s, true)",