feat(scan): plumb tableScope through live-database introspection port

This commit is contained in:
Andrey Avtomonov 2026-05-22 17:56:51 +02:00
parent 8c81035662
commit b1a2d4c378
6 changed files with 90 additions and 12 deletions

View file

@ -6,7 +6,7 @@ import type { KtxProjectConnectionConfig } from '../../../project/config.js';
import { filterSnapshotTables, resolveEnabledTables } from '../../../scan/enabled-tables.js';
import type { KtxSchemaColumn, KtxSchemaForeignKey, KtxSchemaSnapshot, KtxSchemaTable } from '../../../scan/types.js';
import { inferKtxDimensionType, normalizeKtxNativeType } from '../../../scan/type-normalization.js';
import type { LiveDatabaseIntrospectionPort } from './types.js';
import type { LiveDatabaseIntrospectionOptions, LiveDatabaseIntrospectionPort } from './types.js';
type KtxDaemonDatabaseIntrospectionCommand = 'database-introspect';
@ -231,7 +231,7 @@ export function createDaemonLiveDatabaseIntrospection(
const now = options.now ?? (() => new Date());
return {
async extractSchema(connectionId: string): Promise<KtxSchemaSnapshot> {
async extractSchema(connectionId: string, _options?: LiveDatabaseIntrospectionOptions): Promise<KtxSchemaSnapshot> {
const connection = requirePostgresConnection(options.connections, connectionId);
const payload = {
connection_id: connectionId,

View file

@ -1,7 +1,8 @@
import { mkdtemp } from 'node:fs/promises';
import { mkdtemp, readdir, readFile, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { tableRefSet, type KtxTableRefKey } from '../../../scan/table-ref.js';
import { LiveDatabaseSourceAdapter } from './live-database.adapter.js';
describe('LiveDatabaseSourceAdapter', () => {
@ -43,7 +44,7 @@ describe('LiveDatabaseSourceAdapter', () => {
await adapter.fetch(undefined, dir, { connectionId: 'conn-1', sourceKey: 'live-database' });
expect(extractSchema).toHaveBeenCalledWith('conn-1');
expect(extractSchema).toHaveBeenCalledWith('conn-1', { tableScope: undefined });
await expect(adapter.detect(dir)).resolves.toBe(true);
const chunked = await adapter.chunk(dir);
expect(chunked.workUnits.map((wu) => wu.unitKey)).toEqual(['live-database-public-orders']);
@ -56,4 +57,57 @@ describe('LiveDatabaseSourceAdapter', () => {
expect(adapter.source).toBe('live-database');
expect(adapter.skillNames).toEqual(['live_database_ingest']);
});
it('threads tableScope into the introspection port and applies a defensive final filter', async () => {
const extractSchema = vi.fn(
async (_connectionId: string, _options?: { tableScope?: ReadonlySet<KtxTableRefKey> }) => ({
connectionId: 'warehouse',
driver: 'snowflake' as const,
extractedAt: '2026-05-22T00:00:00.000Z',
scope: {},
metadata: {},
tables: [
{
catalog: 'A',
db: 'MARTS',
name: 'IN_SCOPE',
kind: 'table' as const,
comment: null,
estimatedRows: 0,
columns: [],
foreignKeys: [],
},
{
catalog: 'A',
db: 'MARTS',
name: 'OUT_OF_SCOPE',
kind: 'table' as const,
comment: null,
estimatedRows: 0,
columns: [],
foreignKeys: [],
},
],
}),
);
const scope = tableRefSet([{ catalog: 'A', db: 'MARTS', name: 'IN_SCOPE' }]);
const adapter = new LiveDatabaseSourceAdapter({
introspection: { extractSchema },
resolveTableScope: (connectionId) => (connectionId === 'warehouse' ? scope : undefined),
});
const stagedDir = await mkdtemp(join(tmpdir(), 'ktx-livedb-scope-'));
try {
await adapter.fetch(undefined, stagedDir, {
connectionId: 'warehouse',
sourceKey: 'live-database',
} as never);
expect(extractSchema).toHaveBeenCalledWith('warehouse', { tableScope: scope });
const tables = await readdir(join(stagedDir, 'tables'));
expect(tables).toHaveLength(1);
const table = JSON.parse(await readFile(join(stagedDir, 'tables', tables[0]!), 'utf8')) as { name?: string };
expect(table.name).toBe('IN_SCOPE');
} finally {
await rm(stagedDir, { recursive: true, force: true });
}
});
});

View file

@ -1,4 +1,5 @@
import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js';
import { filterSnapshotTables } from '../../../scan/enabled-tables.js';
import { chunkLiveDatabaseStagedDir } from './chunk.js';
import { detectLiveDatabaseStagedDir, writeLiveDatabaseSnapshot } from './stage.js';
import type { LiveDatabaseSourceAdapterDeps } from './types.js';
@ -14,11 +15,13 @@ export class LiveDatabaseSourceAdapter implements SourceAdapter {
}
async fetch(_pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise<void> {
const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId);
const tableScope = this.deps.resolveTableScope?.(ctx.connectionId);
const snapshot = await this.deps.introspection.extractSchema(ctx.connectionId, { tableScope });
const filtered = tableScope ? filterSnapshotTables(snapshot, tableScope) : snapshot;
await writeLiveDatabaseSnapshot(stagedDir, {
...snapshot,
...filtered,
connectionId: ctx.connectionId,
extractedAt: snapshot.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(),
extractedAt: filtered.extractedAt ?? (this.deps.now ?? (() => new Date()))().toISOString(),
});
}

View file

@ -1,10 +1,16 @@
import type { KtxSchemaSnapshot } from '../../../scan/types.js';
import type { KtxTableRefKey } from '../../../scan/table-ref.js';
export interface LiveDatabaseIntrospectionOptions {
tableScope?: ReadonlySet<KtxTableRefKey>;
}
export interface LiveDatabaseIntrospectionPort {
extractSchema(connectionId: string): Promise<KtxSchemaSnapshot>;
extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions): Promise<KtxSchemaSnapshot>;
}
export interface LiveDatabaseSourceAdapterDeps {
introspection: LiveDatabaseIntrospectionPort;
now?: () => Date;
resolveTableScope?: (connectionId: string) => ReadonlySet<KtxTableRefKey> | undefined;
}

View file

@ -4,6 +4,7 @@ import { notionConnectionToPullConfig, parseNotionConnectionConfig } from '../..
import { resolveKtxConfigReference } from '../core/config-reference.js';
import { ktxLocalStateDbPath } from '../../context/project/local-state-db.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { resolveEnabledTables } from '../../context/scan/enabled-tables.js';
import type { SqlAnalysisPort } from '../../context/sql-analysis/ports.js';
import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js';
import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
@ -90,6 +91,10 @@ export function createDefaultLocalIngestAdapters(
...options.databaseIntrospection,
...(options.databaseIntrospectionUrl ? { baseUrl: options.databaseIntrospectionUrl } : {}),
}),
resolveTableScope: (connectionId) => {
const connection = project.config.connections[connectionId];
return connection ? resolveEnabledTables(connection) ?? undefined : undefined;
},
}),
new LookmlSourceAdapter({
homeDir: join(project.projectDir, '.ktx/cache'),

View file

@ -15,12 +15,16 @@ import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters
import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js';
import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js';
import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js';
import type { LiveDatabaseIntrospectionPort } from './context/ingest/adapters/live-database/types.js';
import type {
LiveDatabaseIntrospectionOptions,
LiveDatabaseIntrospectionPort,
} from './context/ingest/adapters/live-database/types.js';
import { LiveDatabaseSourceAdapter } from './context/ingest/adapters/live-database/live-database.adapter.js';
import { PostgresPgssReader } from './context/ingest/adapters/historic-sql/postgres-pgss-reader.js';
import { SnowflakeHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/snowflake-query-history-reader.js';
import type { SourceAdapter } from './context/ingest/types.js';
import type { KtxLocalProject } from './context/project/project.js';
import { resolveEnabledTables } from './context/scan/enabled-tables.js';
import { createHttpSqlAnalysisPort } from './context/sql-analysis/http-sql-analysis-port.js';
import type { SqlAnalysisPort } from './context/sql-analysis/ports.js';
import {
@ -116,7 +120,7 @@ function createKtxCliLiveDatabaseIntrospection(
connections: project.config.connections,
});
return {
async extractSchema(connectionId: string) {
async extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions) {
const connection = project.config.connections[connectionId];
if (isKtxPostgresConnectionConfig(connection)) {
return postgres.extractSchema(connectionId);
@ -140,14 +144,15 @@ function createKtxCliLiveDatabaseIntrospection(
const { createSnowflakeLiveDatabaseIntrospection } = await import('./connectors/snowflake/live-database-introspection.js');
const { isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
if (!isKtxSnowflakeConnectionConfig(connection)) {
return daemon.extractSchema(connectionId);
return daemon.extractSchema(connectionId, options);
}
const snowflake = createSnowflakeLiveDatabaseIntrospection({
connections: project.config.connections,
projectDir: project.projectDir,
});
return snowflake.extractSchema(connectionId);
}
return daemon.extractSchema(connectionId);
return daemon.extractSchema(connectionId, options);
},
};
}
@ -263,6 +268,7 @@ async function createEphemeralSnowflakeHistoricSqlClient(
const connector = new connectorModule.KtxSnowflakeScanConnector({
connectionId,
connection,
projectDir: project.projectDir,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
@ -361,6 +367,10 @@ export function createKtxCliLocalIngestAdapters(
});
const liveDatabase = new LiveDatabaseSourceAdapter({
introspection: createKtxCliLiveDatabaseIntrospection(project, options),
resolveTableScope: (connectionId) => {
const connection = project.config.connections[connectionId];
return connection ? resolveEnabledTables(connection) ?? undefined : undefined;
},
});
return base.map((adapter) => (adapter.source === 'live-database' ? liveDatabase : adapter));
}