diff --git a/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.test.ts b/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.test.ts index 8237d903..9310f148 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.test.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.test.ts @@ -1,6 +1,7 @@ import { once } from 'node:events'; import { createServer } from 'node:http'; import { describe, expect, it, vi } from 'vitest'; +import { tableRefSet } from '../../../scan/table-ref.js'; import { createDaemonLiveDatabaseIntrospection } from './daemon-introspection.js'; const daemonResponse = { @@ -161,7 +162,11 @@ describe('createDaemonLiveDatabaseIntrospection', () => { baseUrl: `http://127.0.0.1:${address.port}`, }); - await expect(introspection.extractSchema('warehouse')).resolves.toMatchObject({ + await expect( + introspection.extractSchema('warehouse', { + tableScope: tableRefSet([{ catalog: 'warehouse', db: 'public', name: 'orders' }]), + }), + ).resolves.toMatchObject({ connectionId: 'warehouse', tables: [{ name: 'customers' }, { name: 'orders' }], }); @@ -176,6 +181,7 @@ describe('createDaemonLiveDatabaseIntrospection', () => { schemas: ['public'], statement_timeout_ms: 30_000, connection_timeout_seconds: 5, + table_scope: [{ catalog: 'warehouse', db: 'public', name: 'orders' }], }, }, ]); @@ -217,7 +223,7 @@ describe('createDaemonLiveDatabaseIntrospection', () => { expect(runJson).not.toHaveBeenCalled(); }); - it('filters out tables not on the enabled_tables allowlist', async () => { + it('does not use connection enabled_tables as a response filter', async () => { const runJson = vi.fn(async () => daemonResponse); const introspection = createDaemonLiveDatabaseIntrospection({ connections: { @@ -232,7 +238,8 @@ describe('createDaemonLiveDatabaseIntrospection', () => { }); const snapshot = await introspection.extractSchema('warehouse'); - expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.orders']); + expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual(['public.customers', 'public.orders']); + expect(runJson).toHaveBeenCalledWith('database-introspect', expect.not.objectContaining({ table_scope: expect.anything() })); }); it('passes through every table when enabled_tables is omitted or empty', async () => { diff --git a/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts b/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts index c39fcf43..f71e332d 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts @@ -3,7 +3,7 @@ import { request as httpRequest } from 'node:http'; import { request as httpsRequest } from 'node:https'; import { URL } from 'node:url'; import type { KtxProjectConnectionConfig } from '../../../project/config.js'; -import { filterSnapshotTables, resolveEnabledTables } from '../../../scan/enabled-tables.js'; +import { tableRefFromKey } from '../../../scan/table-ref.js'; import type { KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaSnapshot, KtxSchemaTable } from '../../../scan/types.js'; import { inferKtxDimensionType, normalizeKtxNativeType } from '../../../scan/type-normalization.js'; import type { LiveDatabaseIntrospectionOptions, LiveDatabaseIntrospectionPort } from './types.js'; @@ -220,6 +220,18 @@ function mapDaemonSnapshot( }; } +function serializeTableScope(options: LiveDatabaseIntrospectionOptions | undefined): Array<{ + catalog: string | null; + db: string | null; + name: string; +}> | undefined { + if (!options?.tableScope) return undefined; + return [...options.tableScope].map((key) => { + const ref = tableRefFromKey(key); + return { catalog: ref.catalog, db: ref.db, name: ref.name }; + }); +} + export function createDaemonLiveDatabaseIntrospection( options: DaemonLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { @@ -231,8 +243,9 @@ export function createDaemonLiveDatabaseIntrospection( const now = options.now ?? (() => new Date()); return { - async extractSchema(connectionId: string, _options?: LiveDatabaseIntrospectionOptions): Promise { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions): Promise { const connection = requirePostgresConnection(options.connections, connectionId); + const tableScope = serializeTableScope(introspectionOptions); const payload = { connection_id: connectionId, driver: normalizeDriver(connection.driver), @@ -240,17 +253,16 @@ export function createDaemonLiveDatabaseIntrospection( schemas, statement_timeout_ms: options.statementTimeoutMs ?? 30_000, connection_timeout_seconds: options.connectionTimeoutSeconds ?? 5, + ...(tableScope !== undefined ? { table_scope: tableScope } : {}), }; const raw = requestJson ? await requestJson('/database/introspect', payload) : await runJson('database-introspect', payload); - const snapshot = mapDaemonSnapshot(raw, { + return mapDaemonSnapshot(raw, { connectionId, extractedAt: now().toISOString(), schemas, }); - const enabledTables = resolveEnabledTables(connection); - return enabledTables ? filterSnapshotTables(snapshot, enabledTables) : snapshot; }, }; } diff --git a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts index 0a04c87e..6cd543e1 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts @@ -1,4 +1,4 @@ -import { mkdtemp, readdir, readFile, rm } from 'node:fs/promises'; +import { mkdtemp, readdir, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; @@ -58,7 +58,7 @@ describe('LiveDatabaseSourceAdapter', () => { expect(adapter.skillNames).toEqual(['live_database_ingest']); }); - it('threads tableScope into the introspection port and applies a defensive final filter', async () => { + it('threads tableScope from fetch context into the introspection port without post-filtering', async () => { const extractSchema = vi.fn( async (_connectionId: string, _options?: { tableScope?: ReadonlySet }) => ({ connectionId: 'warehouse', @@ -93,19 +93,17 @@ describe('LiveDatabaseSourceAdapter', () => { const scope = tableRefSet([{ catalog: 'A', db: 'MARTS', name: 'IN_SCOPE' }]); const adapter = new LiveDatabaseSourceAdapter({ introspection: { extractSchema }, - resolveTableScope: (connectionId) => (connectionId === 'warehouse' ? scope : undefined), }); const stagedDir = await mkdtemp(join(tmpdir(), 'ktx-livedb-scope-')); try { await adapter.fetch(undefined, stagedDir, { connectionId: 'warehouse', sourceKey: 'live-database', - } as never); + tableScope: scope, + }); expect(extractSchema).toHaveBeenCalledWith('warehouse', { tableScope: scope }); const tables = await readdir(join(stagedDir, 'tables')); - expect(tables).toHaveLength(1); - const table = JSON.parse(await readFile(join(stagedDir, 'tables', tables[0]!), 'utf8')) as { name?: string }; - expect(table.name).toBe('IN_SCOPE'); + expect(tables).toHaveLength(2); } finally { await rm(stagedDir, { recursive: true, force: true }); } diff --git a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts index 1b6160f7..68087bc0 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts @@ -1,5 +1,4 @@ import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js'; -import { filterSnapshotTables } from '../../../scan/enabled-tables.js'; import { chunkLiveDatabaseStagedDir } from './chunk.js'; import { detectLiveDatabaseStagedDir, writeLiveDatabaseSnapshot } from './stage.js'; import type { LiveDatabaseSourceAdapterDeps } from './types.js'; @@ -15,13 +14,12 @@ export class LiveDatabaseSourceAdapter implements SourceAdapter { } async fetch(_pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise { - const tableScope = this.deps.resolveTableScope?.(ctx.connectionId); + const tableScope = ctx.tableScope; const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId, { tableScope }); - const filtered = tableScope ? filterSnapshotTables(snapshot, tableScope) : snapshot; await writeLiveDatabaseSnapshot(stagedDir, { - ...filtered, + ...snapshot, connectionId: ctx.connectionId, - extractedAt: filtered.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(), + extractedAt: snapshot.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(), }); } diff --git a/packages/cli/src/context/ingest/adapters/live-database/types.ts b/packages/cli/src/context/ingest/adapters/live-database/types.ts index cfe33670..6115387f 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/types.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/types.ts @@ -12,5 +12,4 @@ export interface LiveDatabaseIntrospectionPort { export interface LiveDatabaseSourceAdapterDeps { introspection: LiveDatabaseIntrospectionPort; now?: () => Date; - resolveTableScope?: (connectionId: string) => ReadonlySet | undefined; } diff --git a/packages/cli/src/context/ingest/local-adapters.ts b/packages/cli/src/context/ingest/local-adapters.ts index 9e2b52a5..ea7556e7 100644 --- a/packages/cli/src/context/ingest/local-adapters.ts +++ b/packages/cli/src/context/ingest/local-adapters.ts @@ -4,7 +4,6 @@ import { notionConnectionToPullConfig, parseNotionConnectionConfig } from '../.. import { resolveKtxConfigReference } from '../core/config-reference.js'; import { ktxLocalStateDbPath } from '../../context/project/local-state-db.js'; import type { KtxLocalProject } from '../../context/project/project.js'; -import { resolveEnabledTables } from '../../context/scan/enabled-tables.js'; import type { SqlAnalysisPort } from '../../context/sql-analysis/ports.js'; import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; @@ -91,10 +90,6 @@ export function createDefaultLocalIngestAdapters( ...options.databaseIntrospection, ...(options.databaseIntrospectionUrl ? { baseUrl: options.databaseIntrospectionUrl } : {}), }), - resolveTableScope: (connectionId) => { - const connection = project.config.connections[connectionId]; - return connection ? resolveEnabledTables(connection) ?? undefined : undefined; - }, }), new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache'), diff --git a/packages/cli/src/context/ingest/local-stage-ingest.ts b/packages/cli/src/context/ingest/local-stage-ingest.ts index 5897281f..f10a4a78 100644 --- a/packages/cli/src/context/ingest/local-stage-ingest.ts +++ b/packages/cli/src/context/ingest/local-stage-ingest.ts @@ -9,6 +9,7 @@ import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js'; import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js'; import { buildSyncId } from './raw-sources-paths.js'; import { SqliteLocalIngestStore } from './sqlite-local-ingest-store.js'; +import type { KtxTableRefKey } from '../scan/table-ref.js'; import type { IngestTrigger, SourceAdapter, WorkUnit } from './types.js'; type LocalIngestStatus = 'running' | 'done' | 'error'; @@ -62,6 +63,7 @@ export interface RunLocalStageOnlyIngestOptions { now?: () => Date; dryRun?: boolean; memoryFlow?: MemoryFlowEventSink; + tableScope?: ReadonlySet; } const LOCAL_AUTHOR = 'ktx'; @@ -225,6 +227,7 @@ async function prepareLocalStagedDir( stagedDir: string, sourceDir: string | undefined, connectionId: string, + tableScope: ReadonlySet | undefined, ): Promise { await rm(stagedDir, { recursive: true, force: true }); await mkdir(stagedDir, { recursive: true }); @@ -242,7 +245,7 @@ async function prepareLocalStagedDir( ); } const pullConfig = await localPullConfigForAdapter(project, adapter, connectionId); - await adapter.fetch(pullConfig, stagedDir, { connectionId, sourceKey: adapter.source }); + await adapter.fetch(pullConfig, stagedDir, { connectionId, sourceKey: adapter.source, tableScope }); return null; } @@ -274,7 +277,14 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti assertCompatibleExistingRun(existingRun, runId, adapter.source, connectionId); const stagedDir = join(options.project.projectDir, '.ktx/cache/local-ingest', runId, 'staged'); - const sourceDir = await prepareLocalStagedDir(options.project, adapter, stagedDir, options.sourceDir, connectionId); + const sourceDir = await prepareLocalStagedDir( + options.project, + adapter, + stagedDir, + options.sourceDir, + connectionId, + options.tableScope, + ); const detected = await adapter.detect(stagedDir); if (!detected) { diff --git a/packages/cli/src/context/ingest/types.ts b/packages/cli/src/context/ingest/types.ts index 991670f6..337885af 100644 --- a/packages/cli/src/context/ingest/types.ts +++ b/packages/cli/src/context/ingest/types.ts @@ -2,6 +2,7 @@ import type { KtxEmbeddingPort } from '../core/embedding.js'; import type { MemoryAction } from '../../context/memory/types.js'; import type { SemanticLayerService } from '../../context/sl/semantic-layer.service.js'; import type { TouchedSlSource } from '../../context/tools/touched-sl-sources.js'; +import type { KtxTableRefKey } from '../scan/table-ref.js'; import type { MemoryFlowEventSink } from './memory-flow/types.js'; import type { StageIndex } from './stages/stage-index.types.js'; import type { WorkUnitOutcome } from './stages/stage-3-work-units.js'; @@ -52,6 +53,7 @@ export interface ChunkResult { export interface FetchContext { connectionId: string; sourceKey: string; + tableScope?: ReadonlySet; memoryFlow?: MemoryFlowEventSink; } diff --git a/packages/cli/src/context/scan/enabled-tables.ts b/packages/cli/src/context/scan/enabled-tables.ts index e1f53c09..327992ac 100644 --- a/packages/cli/src/context/scan/enabled-tables.ts +++ b/packages/cli/src/context/scan/enabled-tables.ts @@ -1,5 +1,5 @@ -import { hasTableRef, tableRefSet, type KtxTableRefKey } from './table-ref.js'; -import type { KtxSchemaSnapshot, KtxTableRef } from './types.js'; +import { tableRefSet, type KtxTableRefKey } from './table-ref.js'; +import type { KtxTableRef } from './types.js'; /** * Parses the `enabled_tables` field on a connection into a scope of @@ -61,16 +61,3 @@ function parseDottedEntry(value: string): KtxTableRef | null { } return null; } - -/** @internal — kept as a defensive backstop for the live-database adapter and tests. */ -export function filterSnapshotTables( - snapshot: KtxSchemaSnapshot, - enabledTables: ReadonlySet, -): KtxSchemaSnapshot { - return { - ...snapshot, - tables: snapshot.tables.filter((table) => - hasTableRef(enabledTables, { catalog: table.catalog, db: table.db, name: table.name }), - ), - }; -} diff --git a/packages/cli/src/context/scan/local-scan.test.ts b/packages/cli/src/context/scan/local-scan.test.ts index 9e8c7391..0c397322 100644 --- a/packages/cli/src/context/scan/local-scan.test.ts +++ b/packages/cli/src/context/scan/local-scan.test.ts @@ -6,15 +6,14 @@ import YAML from 'yaml'; import type { SourceAdapter } from '../../context/ingest/types.js'; import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js'; -import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js'; +import { resolveEnabledTables } from './enabled-tables.js'; import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js'; -import { tableRefKey, tableRefSet } from './table-ref.js'; +import { tableRefKey, tableRefSet, type KtxTableRefKey } from './table-ref.js'; import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxScanConnector, KtxSchemaSnapshot, - KtxSchemaTable, } from './types.js'; function relationshipSqlResult( @@ -251,6 +250,73 @@ describe('local scan', () => { }); }); + it('passes enabled_tables as fetch context tableScope and does not post-filter staged snapshots', async () => { + project.config.connections.warehouse = { + ...project.config.connections.warehouse, + enabled_tables: ['public.orders'], + }; + let capturedTableScope: ReadonlySet | undefined; + const adapter: SourceAdapter = { + source: 'live-database', + skillNames: ['live_database_ingest'], + async fetch(_pullConfig, stagedDir, ctx) { + capturedTableScope = ctx.tableScope; + await mkdir(join(stagedDir, 'tables'), { recursive: true }); + await writeFile( + join(stagedDir, 'connection.json'), + '{"connectionId":"warehouse","driver":"postgres","scope":{"schemas":["public"]},"metadata":{}}\n', + 'utf-8', + ); + await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8'); + await writeFile( + join(stagedDir, 'tables', 'customers.json'), + '{"name":"customers","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":100,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n', + 'utf-8', + ); + await writeFile( + join(stagedDir, 'tables', 'orders.json'), + '{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":1000,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n', + 'utf-8', + ); + }, + async detect() { + return true; + }, + async chunk() { + return { + workUnits: [ + { + unitKey: 'live-database-public-customers', + rawFiles: ['tables/customers.json'], + dependencyPaths: ['connection.json', 'foreign-keys.json'], + peerFileIndex: [], + }, + { + unitKey: 'live-database-public-orders', + rawFiles: ['tables/orders.json'], + dependencyPaths: ['connection.json', 'foreign-keys.json'], + peerFileIndex: [], + }, + ], + }; + }, + }; + + const result = await runLocalScan({ + project, + adapters: [adapter], + connectionId: 'warehouse', + jobId: 'scan-strict-scope-fetch', + now: () => new Date('2026-05-22T00:00:00.000Z'), + }); + + expect([...(capturedTableScope ?? [])]).toEqual([...tableRefSet([{ catalog: null, db: 'public', name: 'orders' }])]); + expect(result.report.diffSummary.tablesAdded).toBe(2); + const structuralManifest = await readFile(join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8'); + expect(structuralManifest).toContain('customers:'); + expect(structuralManifest).toContain('orders:'); + }); + it('runs a structural database scan when live-database is not listed in ktx.yaml', async () => { await writeDatabaseConfigWithoutIngestAdapters(project.projectDir); project = await loadKtxProject({ projectDir: project.projectDir }); @@ -1670,57 +1736,3 @@ describe('resolveEnabledTables', () => { expect(resolveEnabledTables(undefined)).toBeNull(); }); }); - -describe('filterSnapshotTables', () => { - function makeSnapshot(tables: Array<{ db: string; name: string }>): KtxSchemaSnapshot { - return { - connectionId: 'test', - driver: 'postgres', - extractedAt: '2026-01-01T00:00:00Z', - scope: {}, - metadata: {}, - tables: tables.map( - (t): KtxSchemaTable => ({ - catalog: null, - db: t.db, - name: t.name, - kind: 'table', - comment: null, - estimatedRows: null, - columns: [], - foreignKeys: [], - }), - ), - }; - } - - it('keeps only enabled tables', () => { - const snapshot = makeSnapshot([ - { db: 'public', name: 'users' }, - { db: 'public', name: 'orders' }, - { db: 'public', name: 'logs' }, - ]); - const enabled = tableRefSet([ - { catalog: null, db: 'public', name: 'users' }, - { catalog: null, db: 'public', name: 'orders' }, - ]); - const filtered = filterSnapshotTables(snapshot, enabled); - expect(filtered.tables).toHaveLength(2); - expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']); - }); - - it('returns empty tables when none match', () => { - const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]); - const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'orders' }]); - const filtered = filterSnapshotTables(snapshot, enabled); - expect(filtered.tables).toHaveLength(0); - }); - - it('preserves other snapshot fields', () => { - const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]); - const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'users' }]); - const filtered = filterSnapshotTables(snapshot, enabled); - expect(filtered.connectionId).toBe('test'); - expect(filtered.driver).toBe('postgres'); - }); -}); diff --git a/packages/cli/src/context/scan/local-scan.ts b/packages/cli/src/context/scan/local-scan.ts index 17aa587a..9a06dabe 100644 --- a/packages/cli/src/context/scan/local-scan.ts +++ b/packages/cli/src/context/scan/local-scan.ts @@ -10,7 +10,7 @@ import type { KtxProjectLlmConfig, KtxScanEnrichmentConfig, KtxScanRelationshipC import type { KtxLocalProject } from '../../context/project/project.js'; import { ktxLocalStateDbPath } from '../project/local-state-db.js'; import { redactKtxScanReport } from './credentials.js'; -import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js'; +import { resolveEnabledTables } from './enabled-tables.js'; import { completedKtxScanEnrichmentStateSummary } from './enrichment-state.js'; import { failedKtxScanEnrichmentSummary, ktxScanErrorMessage } from './enrichment-summary.js'; import { @@ -427,6 +427,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise { - const take = Math.min(remaining, ds[field]); - ds[field] -= take; - remaining -= take; - }; - subFrom('tablesAdded'); - subFrom('tablesUnchanged'); - subFrom('tablesModified'); - await options.progress?.update(0.6, scanChangeSummary(report.diffSummary)); - } const manifestArtifacts = await writeLocalScanManifestShards({ project: options.project, connectionId: options.connectionId, syncId: record.syncId, driver, - snapshot: structuralSnapshot, + snapshot: rawSnapshot, dryRun: false, }); report.artifactPaths.manifestShards = manifestArtifacts.manifestShards; diff --git a/packages/cli/src/context/scan/table-ref.test.ts b/packages/cli/src/context/scan/table-ref.test.ts index 8e3ddae1..eb52ac9b 100644 --- a/packages/cli/src/context/scan/table-ref.test.ts +++ b/packages/cli/src/context/scan/table-ref.test.ts @@ -1,6 +1,5 @@ import { describe, expect, it } from 'vitest'; import { - hasTableRef, scopedTableNames, tableRefFromKey, tableRefKey, @@ -37,26 +36,6 @@ describe('tableRefSet', () => { }); }); -describe('hasTableRef', () => { - const scope = tableRefSet([ - { catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' }, - { catalog: null, db: 'public', name: 'users' }, - ]); - - it('matches fully qualified entries exactly', () => { - expect(hasTableRef(scope, { catalog: 'ANALYTICS', db: 'MARTS', name: 'LISTINGS' })).toBe(true); - }); - - it('matches when the scope omits catalog (legacy 2-part entry)', () => { - expect(hasTableRef(scope, { catalog: 'PRODUCTION_DB', db: 'public', name: 'users' })).toBe(true); - }); - - it('rejects refs not in the scope', () => { - expect(hasTableRef(scope, { catalog: 'ANALYTICS', db: 'STAGING', name: 'LISTINGS' })).toBe(false); - expect(hasTableRef(scope, { catalog: null, db: 'public', name: 'orders' })).toBe(false); - }); -}); - describe('scopedTableNames', () => { it('projects to the requested (catalog, db) namespace', () => { const scope = tableRefSet([ diff --git a/packages/cli/src/context/scan/table-ref.ts b/packages/cli/src/context/scan/table-ref.ts index 7d5a75c9..1a2abd70 100644 --- a/packages/cli/src/context/scan/table-ref.ts +++ b/packages/cli/src/context/scan/table-ref.ts @@ -31,17 +31,6 @@ export function tableRefSet(refs: readonly KtxTableRef[]): ReadonlySet, ref: KtxTableRef): boolean { - if (scope.has(tableRefKey(ref))) return true; - if (ref.catalog !== null) { - if (scope.has(tableRefKey({ ...ref, catalog: null }))) return true; - } - if (ref.db !== null) { - if (scope.has(tableRefKey({ ...ref, db: null }))) return true; - } - return false; -} - /** * Return the bare table names from a scope that fall within the given * (catalog, db) namespace. `catalog: null` is treated as a wildcard so that diff --git a/packages/cli/src/context/scan/types.ts b/packages/cli/src/context/scan/types.ts index f5f6c7f1..95e6b590 100644 --- a/packages/cli/src/context/scan/types.ts +++ b/packages/cli/src/context/scan/types.ts @@ -142,10 +142,9 @@ export interface KtxScanInput { /** * Restricts introspection to a specific set of fully-qualified tables. * `undefined` means "all tables within {@link scope}". Connectors that honor - * this field should push the filter into their metadata queries; the - * live-database adapter also applies a final filter before writing, so a - * connector that ignores `tableScope` will over-fetch but produce correct - * output. + * this field should push the filter into their metadata queries. Callers do + * not post-filter, so a connector that ignores `tableScope` will over-fetch + * and surface the extra tables in output. */ tableScope?: ReadonlySet; mode?: KtxScanMode; diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index 31dd12c3..77e18b14 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -24,7 +24,6 @@ import { PostgresPgssReader } from './context/ingest/adapters/historic-sql/postg import { SnowflakeHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'; import type { SourceAdapter } from './context/ingest/types.js'; import type { KtxLocalProject } from './context/project/project.js'; -import { resolveEnabledTables } from './context/scan/enabled-tables.js'; import { createHttpSqlAnalysisPort } from './context/sql-analysis/http-sql-analysis-port.js'; import type { SqlAnalysisPort } from './context/sql-analysis/ports.js'; import { @@ -367,10 +366,6 @@ export function createKtxCliLocalIngestAdapters( }); const liveDatabase = new LiveDatabaseSourceAdapter({ introspection: createKtxCliLiveDatabaseIntrospection(project, options), - resolveTableScope: (connectionId) => { - const connection = project.config.connections[connectionId]; - return connection ? resolveEnabledTables(connection) ?? undefined : undefined; - }, }); return base.map((adapter) => (adapter.source === 'live-database' ? liveDatabase : adapter)); } diff --git a/python/ktx-daemon/src/ktx_daemon/database_introspection.py b/python/ktx-daemon/src/ktx_daemon/database_introspection.py index ba9fa1d8..82058f95 100644 --- a/python/ktx-daemon/src/ktx_daemon/database_introspection.py +++ b/python/ktx-daemon/src/ktx_daemon/database_introspection.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass from datetime import datetime, timezone @@ -24,6 +25,16 @@ join pg_catalog.pg_class c and c.relname = t.table_name where t.table_schema = any(%s) and t.table_type = 'BASE TABLE' + and ( + %s::jsonb is null + or exists ( + select 1 + from jsonb_to_recordset(%s::jsonb) as scope(catalog text, db text, name text) + where (scope.catalog is null or scope.catalog = current_database()) + and (scope.db is null or scope.db = t.table_schema) + and scope.name = t.table_name + ) + ) order by t.table_schema, t.table_name """ @@ -52,6 +63,16 @@ where n.nspname = any(%s) and c.relkind in ('r', 'p') and a.attnum > 0 and not a.attisdropped + and ( + %s::jsonb is null + or exists ( + select 1 + from jsonb_to_recordset(%s::jsonb) as scope(catalog text, db text, name text) + where (scope.catalog is null or scope.catalog = current_database()) + and (scope.db is null or scope.db = n.nspname) + and scope.name = c.relname + ) + ) order by n.nspname, c.relname, a.attnum """ @@ -80,6 +101,16 @@ join information_schema.key_column_usage target_key and target_key.ordinal_position = source_key.position_in_unique_constraint where source_constraint.constraint_type = 'FOREIGN KEY' and source_constraint.table_schema = any(%s) + and ( + %s::jsonb is null + or exists ( + select 1 + from jsonb_to_recordset(%s::jsonb) as scope(catalog text, db text, name text) + where (scope.catalog is null or scope.catalog = current_database()) + and (scope.db is null or scope.db = source_constraint.table_schema) + and scope.name = source_constraint.table_name + ) + ) order by source_constraint.table_schema, source_constraint.table_name, source_constraint.constraint_name, source_key.ordinal_position """ @@ -108,6 +139,12 @@ class LiveDatabaseTable(BaseModel): foreign_keys: list[LiveDatabaseForeignKey] = Field(default_factory=list) +class LiveDatabaseTableScopeRef(BaseModel): + catalog: str | None = None + db: str | None = None + name: str + + class DatabaseIntrospectionRequest(BaseModel): connection_id: str driver: str = "postgres" @@ -115,6 +152,7 @@ class DatabaseIntrospectionRequest(BaseModel): schemas: list[str] = Field(default_factory=lambda: ["public"]) statement_timeout_ms: int = Field(default=30_000, ge=1) connection_timeout_seconds: int = Field(default=5, ge=1) + table_scope: list[LiveDatabaseTableScopeRef] | None = None @field_validator("schemas") @classmethod @@ -169,6 +207,23 @@ def _statement_timeout_config(statement_timeout_ms: int) -> tuple[str, tuple[str ) +def _table_scope_json( + table_scope: Sequence[LiveDatabaseTableScopeRef] | None, +) -> str | None: + if table_scope is None: + return None + return json.dumps( + [ + { + "catalog": ref.catalog, + "db": ref.db, + "name": ref.name, + } + for ref in table_scope + ] + ) + + def _load_postgres_rows( request: DatabaseIntrospectionRequest, ) -> DatabaseIntrospectionRows: @@ -190,7 +245,8 @@ def _load_postgres_rows( connection.execute("BEGIN READ ONLY") try: connection.execute(*_statement_timeout_config(request.statement_timeout_ms)) - params = (request.schemas,) + scope_json = _table_scope_json(request.table_scope) + params = (request.schemas, scope_json, scope_json) table_rows = list(connection.execute(TABLES_SQL, params)) column_rows = list(connection.execute(COLUMNS_SQL, params)) foreign_key_rows = list(connection.execute(FOREIGN_KEYS_SQL, params)) diff --git a/python/ktx-daemon/tests/test_app.py b/python/ktx-daemon/tests/test_app.py index 3c1ce18d..07e0f319 100644 --- a/python/ktx-daemon/tests/test_app.py +++ b/python/ktx-daemon/tests/test_app.py @@ -114,6 +114,7 @@ def test_database_introspect_endpoint_returns_snapshot() -> None: "driver": "postgres", "url": "postgresql://readonly@example.test/warehouse", "schemas": ["public"], + "table_scope": [{"db": "public", "name": "orders"}], }, ) @@ -121,6 +122,8 @@ def test_database_introspect_endpoint_returns_snapshot() -> None: assert response.json()["connection_id"] == "warehouse" assert response.json()["tables"][0]["name"] == "orders" assert calls[0].connection_id == "warehouse" + assert calls[0].table_scope[0].db == "public" + assert calls[0].table_scope[0].name == "orders" def test_database_introspect_endpoint_maps_value_error_to_400() -> None: diff --git a/python/ktx-daemon/tests/test_cli.py b/python/ktx-daemon/tests/test_cli.py index 88e06de7..76376320 100644 --- a/python/ktx-daemon/tests/test_cli.py +++ b/python/ktx-daemon/tests/test_cli.py @@ -311,6 +311,9 @@ def test_database_introspect_command_reads_stdin_and_writes_json( assert request.connection_id == "warehouse" assert request.driver == "postgres" assert request.schemas == ["public"] + assert request.table_scope is not None + assert request.table_scope[0].db == "public" + assert request.table_scope[0].name == "orders" return DatabaseIntrospectionResponse( connection_id="warehouse", extracted_at="2026-04-28T10:00:00+00:00", @@ -337,7 +340,7 @@ def test_database_introspect_command_reads_stdin_and_writes_json( sys, "stdin", io.StringIO( - '{"connection_id":"warehouse","driver":"postgres","url":"postgresql://readonly@example.test/warehouse","schemas":["public"]}' + '{"connection_id":"warehouse","driver":"postgres","url":"postgresql://readonly@example.test/warehouse","schemas":["public"],"table_scope":[{"db":"public","name":"orders"}]}' ), ) diff --git a/python/ktx-daemon/tests/test_database_introspection.py b/python/ktx-daemon/tests/test_database_introspection.py index 7dd2f3f9..0a018046 100644 --- a/python/ktx-daemon/tests/test_database_introspection.py +++ b/python/ktx-daemon/tests/test_database_introspection.py @@ -5,7 +5,9 @@ import pytest from ktx_daemon.database_introspection import ( DatabaseIntrospectionRequest, DatabaseIntrospectionRows, + LiveDatabaseTableScopeRef, _statement_timeout_config, + _table_scope_json, introspect_database_response, ) @@ -146,6 +148,22 @@ def test_database_introspection_request_rejects_empty_schema_list() -> None: ) +def test_table_scope_json_serializes_null_wildcards() -> None: + assert _table_scope_json( + [ + LiveDatabaseTableScopeRef(catalog=None, db="public", name="orders"), + LiveDatabaseTableScopeRef( + catalog="warehouse", + db="marts", + name="customers", + ), + ] + ) == ( + '[{"catalog": null, "db": "public", "name": "orders"}, ' + '{"catalog": "warehouse", "db": "marts", "name": "customers"}]' + ) + + def test_statement_timeout_config_uses_parameterized_set_config() -> None: assert _statement_timeout_config(30_000) == ( "SELECT set_config('statement_timeout', %s, true)",