From 9f095457dced21747bd339822001da7bd288b211 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 18 May 2026 15:13:26 +0200 Subject: [PATCH] feat: implement duckdb connector runtime --- .../connector-duckdb/src/connector.test.ts | 116 ++++++ packages/connector-duckdb/src/connector.ts | 391 +++++++++++++++++- packages/connector-duckdb/src/dialect.ts | 71 ++++ packages/connector-duckdb/src/index.ts | 9 + .../src/live-database-introspection.ts | 30 ++ 5 files changed, 614 insertions(+), 3 deletions(-) create mode 100644 packages/connector-duckdb/src/dialect.ts create mode 100644 packages/connector-duckdb/src/live-database-introspection.ts diff --git a/packages/connector-duckdb/src/connector.test.ts b/packages/connector-duckdb/src/connector.test.ts index 25ba7739..626598dd 100644 --- a/packages/connector-duckdb/src/connector.test.ts +++ b/packages/connector-duckdb/src/connector.test.ts @@ -99,3 +99,119 @@ describe('DuckDB connection config and path resolution', () => { await expect(readFile(directory)).rejects.toThrow(); }); }); + +async function createDuckDbFixture(dbPath: string): Promise { + const { DuckDBInstance } = await import('@duckdb/node-api'); + const instance = await DuckDBInstance.create(dbPath); + const connection = await instance.connect(); + try { + await connection.run(` + CREATE TABLE customers ( + id INTEGER PRIMARY KEY, + segment VARCHAR NOT NULL + ) + `); + await connection.run(` + CREATE TABLE orders ( + id INTEGER PRIMARY KEY, + customer_id INTEGER REFERENCES customers(id), + amount DOUBLE, + status VARCHAR + ) + `); + await connection.run(`CREATE VIEW paid_orders AS SELECT id, customer_id, amount FROM orders WHERE status = 'paid'`); + await connection.run(`INSERT INTO customers VALUES (1, 'enterprise'), (2, 'self-serve')`); + await connection.run(`INSERT INTO orders VALUES (10, 1, 25.5, 'paid'), (11, 1, 5.0, 'open'), (12, 2, NULL, 'paid')`); + } finally { + connection.disconnectSync(); + instance.closeSync(); + } +} + +describe('KtxDuckDbScanConnector runtime behavior', () => { + let tempDir: string; + let dbPath: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-duckdb-runtime-')); + dbPath = join(tempDir, 'warehouse.duckdb'); + await createDuckDbFixture(dbPath); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + function connector() { + return new KtxDuckDbScanConnector({ + connectionId: 'warehouse', + projectDir: tempDir, + connection: { driver: 'duckdb', path: 'warehouse.duckdb' }, + now: () => new Date('2026-05-18T12:00:00.000Z'), + }); + } + + it('tests the connection without mutating the database', async () => { + const c = connector(); + await expect(c.testConnection()).resolves.toEqual({ success: true }); + await c.cleanup(); + }); + + it('introspects tables, views, primary keys, foreign keys, and row counts', async () => { + const c = connector(); + const snapshot = await c.introspect({ connectionId: 'warehouse', driver: 'duckdb' as never }, { runId: 'test' }); + await c.cleanup(); + + expect(snapshot).toMatchObject({ + connectionId: 'warehouse', + driver: 'duckdb', + extractedAt: '2026-05-18T12:00:00.000Z', + metadata: { table_count: 3 }, + }); + const orders = snapshot.tables.find((table) => table.name === 'orders'); + expect(orders?.kind).toBe('table'); + expect(orders?.estimatedRows).toBe(3); + expect(orders?.columns.find((column) => column.name === 'id')?.primaryKey).toBe(true); + expect(orders?.foreignKeys).toContainEqual( + expect.objectContaining({ + fromColumn: 'customer_id', + toTable: 'customers', + toColumn: 'id', + }), + ); + expect(snapshot.tables.find((table) => table.name === 'paid_orders')?.kind).toBe('view'); + }); + + it('samples tables, samples columns, returns distinct values, and counts rows', async () => { + const c = connector(); + await expect( + c.sampleTable?.( + { connectionId: 'warehouse', table: { catalog: null, db: 'main', name: 'orders' }, columns: ['id', 'status'], limit: 2 }, + { runId: 'test' }, + ), + ).resolves.toMatchObject({ headers: ['id', 'status'], totalRows: 2 }); + await expect( + c.sampleColumn?.( + { connectionId: 'warehouse', table: { catalog: null, db: 'main', name: 'orders' }, column: 'status', limit: 2 }, + { runId: 'test' }, + ), + ).resolves.toMatchObject({ values: ['paid', 'open'] }); + await expect(c.getColumnDistinctValues({ catalog: null, db: 'main', name: 'orders' }, 'status', { + maxCardinality: 10, + limit: 10, + })).resolves.toEqual({ values: ['open', 'paid'], cardinality: 2 }); + await expect(c.getTableRowCount('orders')).resolves.toBe(3); + await c.cleanup(); + }); + + it('executes read-only SQL and rejects mutating SQL before execution', async () => { + const c = connector(); + await expect( + c.executeReadOnly?.({ connectionId: 'warehouse', sql: 'select id from orders order by id', maxRows: 2 }, { runId: 'test' }), + ).resolves.toMatchObject({ headers: ['id'], rows: [[10], [11]], rowCount: 2 }); + await expect( + c.executeReadOnly?.({ connectionId: 'warehouse', sql: 'create table created_by_test(id int)' }, { runId: 'test' }), + ).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally.'); + await c.cleanup(); + }); +}); diff --git a/packages/connector-duckdb/src/connector.ts b/packages/connector-duckdb/src/connector.ts index 9f4aaf9e..52fef8f9 100644 --- a/packages/connector-duckdb/src/connector.ts +++ b/packages/connector-duckdb/src/connector.ts @@ -2,13 +2,59 @@ import { existsSync, readFileSync, statSync } from 'node:fs'; import { homedir } from 'node:os'; import { isAbsolute, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; +import type { DuckDBConnection, DuckDBInstance } from '@duckdb/node-api'; +import { + assertReadOnlySql, + limitSqlForExecution, + normalizeQueryRows, + type KtxSqlQueryExecutionInput, + type KtxSqlQueryExecutionResult, + type KtxSqlQueryExecutorPort, +} from '@ktx/context/connections'; import { createKtxConnectorCapabilities, + type KtxColumnSampleInput, + type KtxColumnSampleResult, + type KtxColumnStatsInput, + type KtxColumnStatsResult, type KtxConnectionDriver, + type KtxQueryResult, + type KtxReadOnlyQueryInput, type KtxScanConnector, + type KtxScanContext, + type KtxScanInput, + type KtxSchemaColumn, + type KtxSchemaForeignKey, + type KtxSchemaSnapshot, + type KtxSchemaTable, + type KtxTableRef, + type KtxTableSampleInput, + type KtxTableSampleResult, } from '@ktx/context/scan'; +import { KtxDuckDbDialect } from './dialect.js'; import { loadDuckDbNodeApi, type DuckDbNativeLoader } from './native.js'; +const TABLES_SQL = ` + SELECT table_catalog AS catalog, table_schema AS db, table_name AS name, table_type AS type + FROM information_schema.tables + WHERE table_schema NOT IN ('information_schema', 'pg_catalog') + ORDER BY table_schema, table_name +`; + +const COLUMNS_SQL = ` + SELECT table_catalog AS catalog, table_schema AS db, table_name, column_name, data_type, is_nullable, ordinal_position + FROM information_schema.columns + WHERE table_schema NOT IN ('information_schema', 'pg_catalog') + ORDER BY table_schema, table_name, ordinal_position +`; + +const CONSTRAINTS_SQL = ` + SELECT database_name, schema_name, table_name, constraint_type, constraint_name, + constraint_column_names, referenced_table, referenced_column_names + FROM duckdb_constraints() + WHERE constraint_type IN ('PRIMARY KEY', 'FOREIGN KEY') +`; + export interface KtxDuckDbConnectionConfig { driver?: string; path?: string; @@ -27,6 +73,53 @@ export interface KtxDuckDbScanConnectorOptions extends DuckDbDatabasePathInput { nativeLoader?: DuckDbNativeLoader; } +export interface KtxDuckDbReadOnlyQueryInput extends KtxReadOnlyQueryInput {} + +export interface KtxDuckDbColumnDistinctValuesOptions { + maxCardinality: number; + limit: number; + sampleSize?: number; +} + +export interface KtxDuckDbColumnDistinctValuesResult { + values: string[] | null; + cardinality: number; +} + +interface DuckDbConnectionState { + instance: DuckDBInstance; + connection: DuckDBConnection; +} + +interface DuckDbTableRow { + catalog: string | null; + db: string | null; + name: string; + type: string; +} + +interface DuckDbColumnRow { + catalog: string | null; + db: string | null; + name: string; + tableName: string; + columnName: string; + dataType: string; + isNullable: string; +} + +interface DuckDbConstraintRow { + catalog: string | null; + db: string | null; + name: string; + tableName: string; + constraintType: string; + constraintName: string | null; + columnNames: string[]; + referencedTable: string | null; + referencedColumnNames: string[]; +} + function resolveTilde(path: string): string { return path.startsWith('~') ? resolve(homedir(), path.slice(1)) : path; } @@ -112,11 +205,15 @@ export class KtxDuckDbScanConnector implements KtxScanConnector { private readonly connectionId: string; private readonly dbPath: string; + private readonly now: () => Date; + private readonly dialect = new KtxDuckDbDialect(); private readonly nativeLoader: DuckDbNativeLoader; + private state: DuckDbConnectionState | null = null; constructor(options: KtxDuckDbScanConnectorOptions) { this.connectionId = options.connectionId; this.dbPath = duckDbDatabasePathFromConfig(options); + this.now = options.now ?? (() => new Date()); this.nativeLoader = options.nativeLoader ?? { load: loadDuckDbNodeApi }; this.id = `duckdb:${options.connectionId}`; } @@ -139,9 +236,297 @@ export class KtxDuckDbScanConnector implements KtxScanConnector { } } - async introspect(): Promise { - throw new Error('DuckDB schema introspection is implemented in Task 2.'); + async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const tableRows = (await this.query(TABLES_SQL)).rows.map(tableRowFromQueryRow); + const columnRows = (await this.query(COLUMNS_SQL)).rows.map(columnRowFromQueryRow); + const constraintRows = (await this.query(CONSTRAINTS_SQL)).rows.map(constraintRowFromQueryRow); + const columnsByTable = groupByTableKey(columnRows); + const constraintsByTable = groupByTableKey(constraintRows); + const tables = await Promise.all( + tableRows.map(async (table) => this.readTable(table, columnsByTable.get(tableKey(table)) ?? [], constraintsByTable.get(tableKey(table)) ?? [])), + ); + const fileStats = existsSync(this.dbPath) ? statSync(this.dbPath) : null; + return { + connectionId: this.connectionId, + driver: this.driver, + extractedAt: this.now().toISOString(), + scope: {}, + metadata: { + file_path: this.dbPath, + file_size: fileStats ? fileStats.size : 0, + table_count: tables.length, + total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0), + }, + tables, + }; } - async cleanup(): Promise {} + async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns)); + return { headers: result.headers, rows: result.rows, totalRows: result.totalRows }; + } + + async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const result = await this.query( + this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit), + ); + const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]); + return { values, nullCount: null, distinctCount: null }; + } + + async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise { + return null; + } + + async executeReadOnly(input: KtxDuckDbReadOnlyQueryInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const result = await this.query(limitSqlForExecution(input.sql, input.maxRows)); + return { ...result, rowCount: result.rows.length }; + } + + async getColumnDistinctValues( + table: KtxTableRef, + columnName: string, + options: KtxDuckDbColumnDistinctValuesOptions, + ): Promise { + const sampleSize = options.sampleSize ?? 10000; + const tableName = this.qTableName(table); + const quotedColumn = this.dialect.quoteIdentifier(columnName); + const cardinalityResult = await this.query( + this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize), + ); + if (cardinalityResult.rows.length === 0) { + return null; + } + const cardinality = Number(cardinalityResult.rows[0][0]); + if (Number.isNaN(cardinality)) { + return null; + } + if (cardinality === 0) { + return { values: [], cardinality: 0 }; + } + if (cardinality > options.maxCardinality) { + return { values: null, cardinality }; + } + const valuesResult = await this.query(this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit)); + return { + values: valuesResult.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => String(row[0])), + cardinality, + }; + } + + async getTableRowCount(tableName: string): Promise { + const result = await this.query(`SELECT COUNT(*) AS count FROM ${this.dialect.quoteIdentifier(tableName)}`); + return Number(result.rows[0]?.[0] ?? 0); + } + + qTableName(table: Pick): string { + return this.dialect.formatTableName(table); + } + + quoteIdentifier(identifier: string): string { + return this.dialect.quoteIdentifier(identifier); + } + + async cleanup(): Promise { + if (this.state) { + this.state.connection.disconnectSync(); + this.state.instance.closeSync(); + this.state = null; + } + } + + private async database(): Promise { + if (!this.state) { + assertDuckDbDatabaseFile(this.dbPath); + const { DuckDBInstance } = await this.nativeLoader.load(); + const instance = await DuckDBInstance.create(this.dbPath, { access_mode: 'READ_ONLY' }); + const connection = await instance.connect(); + this.state = { instance, connection }; + } + return this.state; + } + + private async query(sql: string): Promise> { + const { connection } = await this.database(); + const reader = await connection.runAndReadAll(assertReadOnlySql(sql)); + const rows = normalizeQueryRows(reader.getRowsJS()).map((row) => row.map(normalizeDuckDbValue)); + return { + headers: reader.columnNames(), + rows, + totalRows: rows.length, + }; + } + + private async readTable( + table: DuckDbTableRow, + columns: DuckDbColumnRow[], + constraints: DuckDbConstraintRow[], + ): Promise { + const primaryKeyColumns = new Set( + constraints + .filter((constraint) => constraint.constraintType === 'PRIMARY KEY') + .flatMap((constraint) => constraint.columnNames), + ); + const estimatedRows = + table.type.toUpperCase().includes('VIEW') + ? null + : Number( + (await this.query(`SELECT COUNT(*) AS count FROM ${this.qTableName(table)}`)).rows[0]?.[0] ?? 0, + ); + return { + catalog: table.catalog, + db: table.db, + name: table.name, + kind: table.type.toUpperCase().includes('VIEW') ? 'view' : 'table', + comment: null, + estimatedRows, + columns: columns.map((column) => this.mapColumn(column, primaryKeyColumns)), + foreignKeys: this.mapForeignKeys(constraints), + }; + } + + private mapColumn(column: DuckDbColumnRow, primaryKeyColumns: Set): KtxSchemaColumn { + return { + name: column.columnName, + nativeType: column.dataType, + normalizedType: this.dialect.mapDataType(column.dataType), + dimensionType: this.dialect.mapToDimensionType(column.dataType), + nullable: column.isNullable.toUpperCase() === 'YES' && !primaryKeyColumns.has(column.columnName), + primaryKey: primaryKeyColumns.has(column.columnName), + comment: null, + }; + } + + private mapForeignKeys(rows: DuckDbConstraintRow[]): KtxSchemaForeignKey[] { + const foreignKeys: KtxSchemaForeignKey[] = []; + for (const row of rows) { + if (row.constraintType !== 'FOREIGN KEY' || !row.referencedTable) continue; + row.columnNames.forEach((fromColumn, index) => { + const toColumn = row.referencedColumnNames[index]; + if (!fromColumn || !toColumn || !row.referencedTable) return; + foreignKeys.push({ + fromColumn, + toCatalog: null, + toDb: row.db, + toTable: row.referencedTable, + toColumn, + constraintName: row.constraintName, + }); + }); + } + return foreignKeys; + } + + private assertConnection(connectionId: string): void { + if (connectionId !== this.connectionId) { + throw new Error(`KTX DuckDB connector ${this.id} cannot serve connection ${connectionId}`); + } + } +} + +export function createDuckDbQueryExecutor(): KtxSqlQueryExecutorPort { + return { + async execute(input: KtxSqlQueryExecutionInput): Promise { + const connector = new KtxDuckDbScanConnector({ + connectionId: input.connectionId, + projectDir: input.projectDir, + connection: input.connection as KtxDuckDbConnectionConfig | undefined, + }); + try { + const result = await connector.executeReadOnly( + { connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows }, + { runId: 'duckdb-query-executor' }, + ); + return { + headers: result.headers, + rows: result.rows, + totalRows: result.totalRows, + command: 'SELECT', + rowCount: result.rowCount, + }; + } finally { + await connector.cleanup(); + } + }, + }; +} + +function normalizeDuckDbValue(value: unknown): unknown { + if (typeof value === 'bigint') { + return Number.isSafeInteger(Number(value)) ? Number(value) : value.toString(); + } + if (Array.isArray(value)) { + return value.map(normalizeDuckDbValue); + } + if (value && typeof value === 'object' && value.constructor === Object) { + return Object.fromEntries( + Object.entries(value as Record).map(([key, nested]) => [key, normalizeDuckDbValue(nested)]), + ); + } + return value; +} + +function stringArray(value: unknown): string[] { + return Array.isArray(value) ? value.filter((item): item is string => typeof item === 'string') : []; +} + +function nullableString(value: unknown): string | null { + return typeof value === 'string' && value.length > 0 ? value : null; +} + +function tableKey(table: Pick): string { + return `${table.catalog ?? ''}\0${table.db ?? ''}\0${table.name}`; +} + +function groupByTableKey>(rows: T[]): Map { + const byTable = new Map(); + for (const row of rows) { + const key = tableKey(row); + const current = byTable.get(key); + if (current) { + current.push(row); + } else { + byTable.set(key, [row]); + } + } + return byTable; +} + +function tableRowFromQueryRow(row: unknown[]): DuckDbTableRow { + return { + catalog: nullableString(row[0]), + db: nullableString(row[1]), + name: String(row[2]), + type: String(row[3]), + }; +} + +function columnRowFromQueryRow(row: unknown[]): DuckDbColumnRow { + return { + catalog: nullableString(row[0]), + db: nullableString(row[1]), + name: String(row[2]), + tableName: String(row[2]), + columnName: String(row[3]), + dataType: String(row[4]), + isNullable: String(row[5]), + }; +} + +function constraintRowFromQueryRow(row: unknown[]): DuckDbConstraintRow { + return { + catalog: nullableString(row[0]), + db: nullableString(row[1]), + name: String(row[2]), + tableName: String(row[2]), + constraintType: String(row[3]), + constraintName: nullableString(row[4]), + columnNames: stringArray(row[5]), + referencedTable: nullableString(row[6]), + referencedColumnNames: stringArray(row[7]), + }; } diff --git a/packages/connector-duckdb/src/dialect.ts b/packages/connector-duckdb/src/dialect.ts new file mode 100644 index 00000000..91d89c1e --- /dev/null +++ b/packages/connector-duckdb/src/dialect.ts @@ -0,0 +1,71 @@ +import type { KtxSchemaDimensionType, KtxTableRef } from '@ktx/context/scan'; + +export class KtxDuckDbDialect { + readonly type = 'duckdb'; + + quoteIdentifier(identifier: string): string { + return `"${identifier.replace(/"/g, '""')}"`; + } + + formatTableName(table: Pick): string { + return [table.catalog, table.db, table.name] + .filter((part): part is string => !!part) + .map((part) => this.quoteIdentifier(part)) + .join('.'); + } + + mapDataType(nativeType: string): string { + return nativeType; + } + + mapToDimensionType(nativeType: string): KtxSchemaDimensionType { + const normalized = nativeType.toUpperCase().trim(); + if (normalized.includes('DATE') || normalized.includes('TIME')) return 'time'; + if ( + normalized.includes('INT') || + normalized.includes('DECIMAL') || + normalized.includes('DOUBLE') || + normalized.includes('FLOAT') || + normalized.includes('NUMERIC') || + normalized.includes('REAL') + ) { + return 'number'; + } + if (normalized.includes('BOOL')) return 'boolean'; + return 'string'; + } + + generateSampleQuery(tableName: string, limit: number, columns?: string[]): string { + const columnList = + columns && columns.length > 0 ? columns.map((column) => this.quoteIdentifier(column)).join(', ') : '*'; + return `SELECT ${columnList} FROM ${tableName} LIMIT ${limit}`; + } + + generateColumnSampleQuery(tableName: string, columnName: string, limit: number): string { + const quoted = this.quoteIdentifier(columnName); + return `SELECT ${quoted} FROM ${tableName} WHERE ${quoted} IS NOT NULL AND TRIM(CAST(${quoted} AS VARCHAR)) != '' LIMIT ${limit}`; + } + + generateCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string { + return ` + WITH sampled AS ( + SELECT ${columnName} AS val + FROM ${tableName} + WHERE ${columnName} IS NOT NULL + LIMIT ${sampleSize} + ) + SELECT COUNT(DISTINCT val) AS cardinality + FROM sampled + `; + } + + generateDistinctValuesQuery(tableName: string, columnName: string, limit: number): string { + return ` + SELECT DISTINCT CAST(${columnName} AS VARCHAR) AS val + FROM ${tableName} + WHERE ${columnName} IS NOT NULL + ORDER BY val + LIMIT ${limit} + `; + } +} diff --git a/packages/connector-duckdb/src/index.ts b/packages/connector-duckdb/src/index.ts index 88bdddd2..17f55027 100644 --- a/packages/connector-duckdb/src/index.ts +++ b/packages/connector-duckdb/src/index.ts @@ -1,12 +1,21 @@ export { assertDuckDbDatabaseFile, + createDuckDbQueryExecutor, duckDbDatabasePathFromConfig, isKtxDuckDbConnectionConfig, KtxDuckDbScanConnector, + type KtxDuckDbColumnDistinctValuesOptions, + type KtxDuckDbColumnDistinctValuesResult, type DuckDbDatabasePathInput, type KtxDuckDbConnectionConfig, + type KtxDuckDbReadOnlyQueryInput, type KtxDuckDbScanConnectorOptions, } from './connector.js'; +export { KtxDuckDbDialect } from './dialect.js'; +export { + createDuckDbLiveDatabaseIntrospection, + type CreateDuckDbLiveDatabaseIntrospectionOptions, +} from './live-database-introspection.js'; export { assertSupportedDuckDbPlatform, currentDuckDbPlatform, diff --git a/packages/connector-duckdb/src/live-database-introspection.ts b/packages/connector-duckdb/src/live-database-introspection.ts new file mode 100644 index 00000000..2815d59d --- /dev/null +++ b/packages/connector-duckdb/src/live-database-introspection.ts @@ -0,0 +1,30 @@ +import type { LiveDatabaseIntrospectionPort } from '@ktx/context/ingest'; +import type { KtxProjectConnectionConfig } from '@ktx/context/project'; +import { KtxDuckDbScanConnector, type KtxDuckDbConnectionConfig } from './connector.js'; + +export interface CreateDuckDbLiveDatabaseIntrospectionOptions { + projectDir?: string; + connections: Record; + now?: () => Date; +} + +export function createDuckDbLiveDatabaseIntrospection( + options: CreateDuckDbLiveDatabaseIntrospectionOptions, +): LiveDatabaseIntrospectionPort { + return { + async extractSchema(connectionId: string) { + const connection = options.connections[connectionId] as KtxDuckDbConnectionConfig | undefined; + const connector = new KtxDuckDbScanConnector({ + connectionId, + connection, + projectDir: options.projectDir, + now: options.now, + }); + try { + return await connector.introspect({ connectionId, driver: 'duckdb' as never }, { runId: `duckdb-${connectionId}` }); + } finally { + await connector.cleanup(); + } + }, + }; +}