Add connector table listing

This commit is contained in:
Luca Martial 2026-05-12 18:22:05 -07:00
parent 52ddb061a4
commit 9704d8632b
7 changed files with 165 additions and 0 deletions

View file

@ -14,6 +14,7 @@ import {
type KtxSchemaColumn,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
@ -63,6 +64,7 @@ export interface KtxBigQueryQueryJob {
export interface KtxBigQueryTableRef {
id?: string;
metadata?: { type?: string };
get(): Promise<
[
{
@ -369,6 +371,25 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
return datasets.map((dataset) => dataset.id).filter((id): id is string => Boolean(id));
}
async listTables(datasetIds?: string[]): Promise<KtxTableListEntry[]> {
const filterDatasets = datasetIds ?? (await this.listDatasets());
const entries: KtxTableListEntry[] = [];
for (const datasetId of filterDatasets) {
const dataset = this.getClient().dataset(datasetId);
const [tables] = await dataset.getTables();
for (const table of tables) {
if (!table.id) continue;
entries.push({
schema: datasetId,
name: table.id,
kind: table.metadata?.type === 'VIEW' ? 'view' : 'table',
});
}
}
entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name));
return entries;
}
async cleanup(): Promise<void> {
this.client = null;
}

View file

@ -16,6 +16,7 @@ import {
type KtxSchemaTable,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableListEntry,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import { readFileSync } from 'node:fs';
@ -128,6 +129,12 @@ interface ClickHouseDatabaseRow {
name: string;
}
interface ClickHouseTableListRow {
database: string;
name: string;
engine: string;
}
interface ClickHouseCompactResponse {
meta?: Array<{ name: string; type: string }>;
data?: unknown[][];
@ -417,6 +424,25 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
return rows.map((row) => row.name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryEachRow<ClickHouseTableListRow>(
`
SELECT database, name, engine
FROM system.tables
WHERE database IN ({schemas:Array(String)})
ORDER BY database, name
`,
{ schemas: filterSchemas },
);
return rows.map((row) => ({
schema: row.database,
name: row.name,
kind: row.engine === 'View' || row.engine === 'MaterializedView' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.client) {
await this.client.close();

View file

@ -15,6 +15,7 @@ import {
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxTableListEntry,
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
@ -129,6 +130,12 @@ interface MysqlSchemaRow extends RowDataPacket {
SCHEMA_NAME: string;
}
interface MysqlTableListRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
TABLE_TYPE: string;
}
interface MysqlCountRow extends RowDataPacket {
count?: unknown;
cardinality?: unknown;
@ -466,6 +473,27 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
return rows.map((row) => row.SCHEMA_NAME);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const placeholders = filterSchemas.map(() => '?').join(', ');
const rows = await this.queryRaw<MysqlTableListRow>(
`
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA IN (${placeholders})
AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_SCHEMA, TABLE_NAME
`,
filterSchemas,
);
return rows.map((row) => ({
schema: row.TABLE_SCHEMA,
name: row.TABLE_NAME,
kind: row.TABLE_TYPE === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();

View file

@ -17,6 +17,7 @@ import {
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
@ -179,6 +180,12 @@ interface PostgresSchemaRow {
schema_name: string;
}
interface PostgresTableListRow {
schema_name: string;
table_name: string;
table_kind: string;
}
interface PostgresCountRow {
count?: unknown;
cardinality?: unknown;
@ -523,6 +530,27 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
return rows.map((row) => row.schema_name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryRaw<PostgresTableListRow>(
`
SELECT n.nspname AS schema_name, c.relname AS table_name, c.relkind AS table_kind
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ANY($1)
AND c.relkind IN ('r', 'v')
ORDER BY n.nspname, c.relname
`,
[filterSchemas],
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_kind === 'v' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();

View file

@ -60,6 +60,10 @@ function fakeDriverFactory(): KtxSnowflakeDriverFactory {
},
]),
listSchemas: vi.fn(async () => ['PUBLIC', 'MART']),
listTables: vi.fn(async () => [
{ schema: 'PUBLIC', name: 'ORDERS', kind: 'table' as const },
{ schema: 'PUBLIC', name: 'ORDER_SUMMARY', kind: 'view' as const },
]),
cleanup: vi.fn(async () => undefined),
};
return { createDriver: vi.fn(() => driver) };

View file

@ -19,6 +19,7 @@ import {
type KtxSchemaTable,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableListEntry,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import * as snowflake from 'snowflake-sdk';
@ -75,6 +76,7 @@ export interface KtxSnowflakeDriver {
query(sql: string, params?: unknown): Promise<KtxQueryResult>;
getSchemaMetadata(schemaName?: string): Promise<KtxSnowflakeRawTableMetadata[]>;
listSchemas(): Promise<string[]>;
listTables(schemas?: string[]): Promise<KtxTableListEntry[]>;
cleanup(): Promise<void>;
}
@ -344,6 +346,31 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
return result.rows.map((row) => String(row[1])).filter((name) => name !== 'INFORMATION_SCHEMA');
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const entries: KtxTableListEntry[] = [];
for (const schemaName of filterSchemas) {
const result = await this.query(
`
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
ORDER BY TABLE_NAME
`,
[schemaName, this.resolved.database],
);
for (const row of result.rows) {
entries.push({
schema: schemaName,
name: String(row[0]),
kind: String(row[1]) === 'VIEW' ? 'view' : 'table',
});
}
}
return entries;
}
async cleanup(): Promise<void> {
const closers = this.closeSdkOptions;
this.closeSdkOptions = [];
@ -594,6 +621,10 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
return this.getDriver().listSchemas();
}
listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
return this.getDriver().listTables(schemas);
}
async cleanup(): Promise<void> {
if (this.driverInstance) {
await this.driverInstance.cleanup();

View file

@ -14,6 +14,7 @@ import {
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
@ -441,6 +442,32 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
return rows.map((row) => row.schema_name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const params: Record<string, unknown> = {};
const placeholders = filterSchemas.map((s, i) => {
params[`schema${i}`] = s;
return `@schema${i}`;
});
const rows = await this.queryRaw<{ schema_name: string; table_name: string; table_type: string }>(
`
SELECT s.name AS schema_name, o.name AS table_name, o.type_desc AS table_type
FROM sys.objects o
JOIN sys.schemas s ON o.schema_id = s.schema_id
WHERE o.type IN ('U', 'V')
AND s.name IN (${placeholders.join(', ')})
ORDER BY s.name, o.name
`,
params,
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_type === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.close();