diff --git a/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts b/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts index ff01fda9..c39fcf43 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/daemon-introspection.ts @@ -6,7 +6,7 @@ import type { KtxProjectConnectionConfig } from '../../../project/config.js'; import { filterSnapshotTables, resolveEnabledTables } from '../../../scan/enabled-tables.js'; import type { KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaSnapshot, KtxSchemaTable } from '../../../scan/types.js'; import { inferKtxDimensionType, normalizeKtxNativeType } from '../../../scan/type-normalization.js'; -import type { LiveDatabaseIntrospectionPort } from './types.js'; +import type { LiveDatabaseIntrospectionOptions, LiveDatabaseIntrospectionPort } from './types.js'; type KtxDaemonDatabaseIntrospectionCommand = 'database-introspect'; @@ -231,7 +231,7 @@ export function createDaemonLiveDatabaseIntrospection( const now = options.now ?? (() => new Date()); return { - async extractSchema(connectionId: string): Promise { + async extractSchema(connectionId: string, _options?: LiveDatabaseIntrospectionOptions): Promise { const connection = requirePostgresConnection(options.connections, connectionId); const payload = { connection_id: connectionId, diff --git a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts index 7e7a3f74..0a04c87e 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.test.ts @@ -1,7 +1,8 @@ -import { mkdtemp } from 'node:fs/promises'; +import { mkdtemp, readdir, readFile, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; +import { tableRefSet, type KtxTableRefKey } from '../../../scan/table-ref.js'; import { LiveDatabaseSourceAdapter } from './live-database.adapter.js'; describe('LiveDatabaseSourceAdapter', () => { @@ -43,7 +44,7 @@ describe('LiveDatabaseSourceAdapter', () => { await adapter.fetch(undefined, dir, { connectionId: 'conn-1', sourceKey: 'live-database' }); - expect(extractSchema).toHaveBeenCalledWith('conn-1'); + expect(extractSchema).toHaveBeenCalledWith('conn-1', { tableScope: undefined }); await expect(adapter.detect(dir)).resolves.toBe(true); const chunked = await adapter.chunk(dir); expect(chunked.workUnits.map((wu) => wu.unitKey)).toEqual(['live-database-public-orders']); @@ -56,4 +57,57 @@ describe('LiveDatabaseSourceAdapter', () => { expect(adapter.source).toBe('live-database'); expect(adapter.skillNames).toEqual(['live_database_ingest']); }); + + it('threads tableScope into the introspection port and applies a defensive final filter', async () => { + const extractSchema = vi.fn( + async (_connectionId: string, _options?: { tableScope?: ReadonlySet }) => ({ + connectionId: 'warehouse', + driver: 'snowflake' as const, + extractedAt: '2026-05-22T00:00:00.000Z', + scope: {}, + metadata: {}, + tables: [ + { + catalog: 'A', + db: 'MARTS', + name: 'IN_SCOPE', + kind: 'table' as const, + comment: null, + estimatedRows: 0, + columns: [], + foreignKeys: [], + }, + { + catalog: 'A', + db: 'MARTS', + name: 'OUT_OF_SCOPE', + kind: 'table' as const, + comment: null, + estimatedRows: 0, + columns: [], + foreignKeys: [], + }, + ], + }), + ); + const scope = tableRefSet([{ catalog: 'A', db: 'MARTS', name: 'IN_SCOPE' }]); + const adapter = new LiveDatabaseSourceAdapter({ + introspection: { extractSchema }, + resolveTableScope: (connectionId) => (connectionId === 'warehouse' ? scope : undefined), + }); + const stagedDir = await mkdtemp(join(tmpdir(), 'ktx-livedb-scope-')); + try { + await adapter.fetch(undefined, stagedDir, { + connectionId: 'warehouse', + sourceKey: 'live-database', + } as never); + expect(extractSchema).toHaveBeenCalledWith('warehouse', { tableScope: scope }); + const tables = await readdir(join(stagedDir, 'tables')); + expect(tables).toHaveLength(1); + const table = JSON.parse(await readFile(join(stagedDir, 'tables', tables[0]!), 'utf8')) as { name?: string }; + expect(table.name).toBe('IN_SCOPE'); + } finally { + await rm(stagedDir, { recursive: true, force: true }); + } + }); }); diff --git a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts index 9e5076ab..1b6160f7 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/live-database.adapter.ts @@ -1,4 +1,5 @@ import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js'; +import { filterSnapshotTables } from '../../../scan/enabled-tables.js'; import { chunkLiveDatabaseStagedDir } from './chunk.js'; import { detectLiveDatabaseStagedDir, writeLiveDatabaseSnapshot } from './stage.js'; import type { LiveDatabaseSourceAdapterDeps } from './types.js'; @@ -14,11 +15,13 @@ export class LiveDatabaseSourceAdapter implements SourceAdapter { } async fetch(_pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise { - const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId); + const tableScope = this.deps.resolveTableScope?.(ctx.connectionId); + const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId, { tableScope }); + const filtered = tableScope ? filterSnapshotTables(snapshot, tableScope) : snapshot; await writeLiveDatabaseSnapshot(stagedDir, { - ...snapshot, + ...filtered, connectionId: ctx.connectionId, - extractedAt: snapshot.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(), + extractedAt: filtered.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(), }); } diff --git a/packages/cli/src/context/ingest/adapters/live-database/types.ts b/packages/cli/src/context/ingest/adapters/live-database/types.ts index b9846b1b..cfe33670 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/types.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/types.ts @@ -1,10 +1,16 @@ import type { KtxSchemaSnapshot } from '../../../scan/types.js'; +import type { KtxTableRefKey } from '../../../scan/table-ref.js'; + +export interface LiveDatabaseIntrospectionOptions { + tableScope?: ReadonlySet; +} export interface LiveDatabaseIntrospectionPort { - extractSchema(connectionId: string): Promise; + extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions): Promise; } export interface LiveDatabaseSourceAdapterDeps { introspection: LiveDatabaseIntrospectionPort; now?: () => Date; + resolveTableScope?: (connectionId: string) => ReadonlySet | undefined; } diff --git a/packages/cli/src/context/ingest/local-adapters.ts b/packages/cli/src/context/ingest/local-adapters.ts index ea7556e7..9e2b52a5 100644 --- a/packages/cli/src/context/ingest/local-adapters.ts +++ b/packages/cli/src/context/ingest/local-adapters.ts @@ -4,6 +4,7 @@ import { notionConnectionToPullConfig, parseNotionConnectionConfig } from '../.. import { resolveKtxConfigReference } from '../core/config-reference.js'; import { ktxLocalStateDbPath } from '../../context/project/local-state-db.js'; import type { KtxLocalProject } from '../../context/project/project.js'; +import { resolveEnabledTables } from '../../context/scan/enabled-tables.js'; import type { SqlAnalysisPort } from '../../context/sql-analysis/ports.js'; import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; @@ -90,6 +91,10 @@ export function createDefaultLocalIngestAdapters( ...options.databaseIntrospection, ...(options.databaseIntrospectionUrl ? { baseUrl: options.databaseIntrospectionUrl } : {}), }), + resolveTableScope: (connectionId) => { + const connection = project.config.connections[connectionId]; + return connection ? resolveEnabledTables(connection) ?? undefined : undefined; + }, }), new LookmlSourceAdapter({ homeDir: join(project.projectDir, '.ktx/cache'), diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index 88ee9880..53f6c458 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -15,12 +15,16 @@ import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js'; import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js'; import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js'; -import type { LiveDatabaseIntrospectionPort } from './context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from './context/ingest/adapters/live-database/types.js'; import { LiveDatabaseSourceAdapter } from './context/ingest/adapters/live-database/live-database.adapter.js'; import { PostgresPgssReader } from './context/ingest/adapters/historic-sql/postgres-pgss-reader.js'; import { SnowflakeHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'; import type { SourceAdapter } from './context/ingest/types.js'; import type { KtxLocalProject } from './context/project/project.js'; +import { resolveEnabledTables } from './context/scan/enabled-tables.js'; import { createHttpSqlAnalysisPort } from './context/sql-analysis/http-sql-analysis-port.js'; import type { SqlAnalysisPort } from './context/sql-analysis/ports.js'; import { @@ -116,7 +120,7 @@ function createKtxCliLiveDatabaseIntrospection( connections: project.config.connections, }); return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions) { const connection = project.config.connections[connectionId]; if (isKtxPostgresConnectionConfig(connection)) { return postgres.extractSchema(connectionId); @@ -140,14 +144,15 @@ function createKtxCliLiveDatabaseIntrospection( const { createSnowflakeLiveDatabaseIntrospection } = await import('./connectors/snowflake/live-database-introspection.js'); const { isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');; if (!isKtxSnowflakeConnectionConfig(connection)) { - return daemon.extractSchema(connectionId); + return daemon.extractSchema(connectionId, options); } const snowflake = createSnowflakeLiveDatabaseIntrospection({ connections: project.config.connections, + projectDir: project.projectDir, }); return snowflake.extractSchema(connectionId); } - return daemon.extractSchema(connectionId); + return daemon.extractSchema(connectionId, options); }, }; } @@ -263,6 +268,7 @@ async function createEphemeralSnowflakeHistoricSqlClient( const connector = new connectorModule.KtxSnowflakeScanConnector({ connectionId, connection, + projectDir: project.projectDir, }); try { const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never); @@ -361,6 +367,10 @@ export function createKtxCliLocalIngestAdapters( }); const liveDatabase = new LiveDatabaseSourceAdapter({ introspection: createKtxCliLiveDatabaseIntrospection(project, options), + resolveTableScope: (connectionId) => { + const connection = project.config.connections[connectionId]; + return connection ? resolveEnabledTables(connection) ?? undefined : undefined; + }, }); return base.map((adapter) => (adapter.source === 'live-database' ? liveDatabase : adapter)); }