feat: implement duckdb connector runtime

This commit is contained in:
Andrey Avtomonov 2026-05-18 15:13:26 +02:00
parent 1b92789c01
commit 9f095457dc
5 changed files with 614 additions and 3 deletions

View file

@ -99,3 +99,119 @@ describe('DuckDB connection config and path resolution', () => {
await expect(readFile(directory)).rejects.toThrow();
});
});
async function createDuckDbFixture(dbPath: string): Promise<void> {
const { DuckDBInstance } = await import('@duckdb/node-api');
const instance = await DuckDBInstance.create(dbPath);
const connection = await instance.connect();
try {
await connection.run(`
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
segment VARCHAR NOT NULL
)
`);
await connection.run(`
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
amount DOUBLE,
status VARCHAR
)
`);
await connection.run(`CREATE VIEW paid_orders AS SELECT id, customer_id, amount FROM orders WHERE status = 'paid'`);
await connection.run(`INSERT INTO customers VALUES (1, 'enterprise'), (2, 'self-serve')`);
await connection.run(`INSERT INTO orders VALUES (10, 1, 25.5, 'paid'), (11, 1, 5.0, 'open'), (12, 2, NULL, 'paid')`);
} finally {
connection.disconnectSync();
instance.closeSync();
}
}
describe('KtxDuckDbScanConnector runtime behavior', () => {
let tempDir: string;
let dbPath: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-duckdb-runtime-'));
dbPath = join(tempDir, 'warehouse.duckdb');
await createDuckDbFixture(dbPath);
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
function connector() {
return new KtxDuckDbScanConnector({
connectionId: 'warehouse',
projectDir: tempDir,
connection: { driver: 'duckdb', path: 'warehouse.duckdb' },
now: () => new Date('2026-05-18T12:00:00.000Z'),
});
}
it('tests the connection without mutating the database', async () => {
const c = connector();
await expect(c.testConnection()).resolves.toEqual({ success: true });
await c.cleanup();
});
it('introspects tables, views, primary keys, foreign keys, and row counts', async () => {
const c = connector();
const snapshot = await c.introspect({ connectionId: 'warehouse', driver: 'duckdb' as never }, { runId: 'test' });
await c.cleanup();
expect(snapshot).toMatchObject({
connectionId: 'warehouse',
driver: 'duckdb',
extractedAt: '2026-05-18T12:00:00.000Z',
metadata: { table_count: 3 },
});
const orders = snapshot.tables.find((table) => table.name === 'orders');
expect(orders?.kind).toBe('table');
expect(orders?.estimatedRows).toBe(3);
expect(orders?.columns.find((column) => column.name === 'id')?.primaryKey).toBe(true);
expect(orders?.foreignKeys).toContainEqual(
expect.objectContaining({
fromColumn: 'customer_id',
toTable: 'customers',
toColumn: 'id',
}),
);
expect(snapshot.tables.find((table) => table.name === 'paid_orders')?.kind).toBe('view');
});
it('samples tables, samples columns, returns distinct values, and counts rows', async () => {
const c = connector();
await expect(
c.sampleTable?.(
{ connectionId: 'warehouse', table: { catalog: null, db: 'main', name: 'orders' }, columns: ['id', 'status'], limit: 2 },
{ runId: 'test' },
),
).resolves.toMatchObject({ headers: ['id', 'status'], totalRows: 2 });
await expect(
c.sampleColumn?.(
{ connectionId: 'warehouse', table: { catalog: null, db: 'main', name: 'orders' }, column: 'status', limit: 2 },
{ runId: 'test' },
),
).resolves.toMatchObject({ values: ['paid', 'open'] });
await expect(c.getColumnDistinctValues({ catalog: null, db: 'main', name: 'orders' }, 'status', {
maxCardinality: 10,
limit: 10,
})).resolves.toEqual({ values: ['open', 'paid'], cardinality: 2 });
await expect(c.getTableRowCount('orders')).resolves.toBe(3);
await c.cleanup();
});
it('executes read-only SQL and rejects mutating SQL before execution', async () => {
const c = connector();
await expect(
c.executeReadOnly?.({ connectionId: 'warehouse', sql: 'select id from orders order by id', maxRows: 2 }, { runId: 'test' }),
).resolves.toMatchObject({ headers: ['id'], rows: [[10], [11]], rowCount: 2 });
await expect(
c.executeReadOnly?.({ connectionId: 'warehouse', sql: 'create table created_by_test(id int)' }, { runId: 'test' }),
).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally.');
await c.cleanup();
});
});

View file

@ -2,13 +2,59 @@ import { existsSync, readFileSync, statSync } from 'node:fs';
import { homedir } from 'node:os';
import { isAbsolute, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import type { DuckDBConnection, DuckDBInstance } from '@duckdb/node-api';
import {
assertReadOnlySql,
limitSqlForExecution,
normalizeQueryRows,
type KtxSqlQueryExecutionInput,
type KtxSqlQueryExecutionResult,
type KtxSqlQueryExecutorPort,
} from '@ktx/context/connections';
import {
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxConnectionDriver,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import { KtxDuckDbDialect } from './dialect.js';
import { loadDuckDbNodeApi, type DuckDbNativeLoader } from './native.js';
const TABLES_SQL = `
SELECT table_catalog AS catalog, table_schema AS db, table_name AS name, table_type AS type
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY table_schema, table_name
`;
const COLUMNS_SQL = `
SELECT table_catalog AS catalog, table_schema AS db, table_name, column_name, data_type, is_nullable, ordinal_position
FROM information_schema.columns
WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY table_schema, table_name, ordinal_position
`;
const CONSTRAINTS_SQL = `
SELECT database_name, schema_name, table_name, constraint_type, constraint_name,
constraint_column_names, referenced_table, referenced_column_names
FROM duckdb_constraints()
WHERE constraint_type IN ('PRIMARY KEY', 'FOREIGN KEY')
`;
export interface KtxDuckDbConnectionConfig {
driver?: string;
path?: string;
@ -27,6 +73,53 @@ export interface KtxDuckDbScanConnectorOptions extends DuckDbDatabasePathInput {
nativeLoader?: DuckDbNativeLoader;
}
export interface KtxDuckDbReadOnlyQueryInput extends KtxReadOnlyQueryInput {}
export interface KtxDuckDbColumnDistinctValuesOptions {
maxCardinality: number;
limit: number;
sampleSize?: number;
}
export interface KtxDuckDbColumnDistinctValuesResult {
values: string[] | null;
cardinality: number;
}
interface DuckDbConnectionState {
instance: DuckDBInstance;
connection: DuckDBConnection;
}
interface DuckDbTableRow {
catalog: string | null;
db: string | null;
name: string;
type: string;
}
interface DuckDbColumnRow {
catalog: string | null;
db: string | null;
name: string;
tableName: string;
columnName: string;
dataType: string;
isNullable: string;
}
interface DuckDbConstraintRow {
catalog: string | null;
db: string | null;
name: string;
tableName: string;
constraintType: string;
constraintName: string | null;
columnNames: string[];
referencedTable: string | null;
referencedColumnNames: string[];
}
function resolveTilde(path: string): string {
return path.startsWith('~') ? resolve(homedir(), path.slice(1)) : path;
}
@ -112,11 +205,15 @@ export class KtxDuckDbScanConnector implements KtxScanConnector {
private readonly connectionId: string;
private readonly dbPath: string;
private readonly now: () => Date;
private readonly dialect = new KtxDuckDbDialect();
private readonly nativeLoader: DuckDbNativeLoader;
private state: DuckDbConnectionState | null = null;
constructor(options: KtxDuckDbScanConnectorOptions) {
this.connectionId = options.connectionId;
this.dbPath = duckDbDatabasePathFromConfig(options);
this.now = options.now ?? (() => new Date());
this.nativeLoader = options.nativeLoader ?? { load: loadDuckDbNodeApi };
this.id = `duckdb:${options.connectionId}`;
}
@ -139,9 +236,297 @@ export class KtxDuckDbScanConnector implements KtxScanConnector {
}
}
async introspect(): Promise<never> {
throw new Error('DuckDB schema introspection is implemented in Task 2.');
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const tableRows = (await this.query(TABLES_SQL)).rows.map(tableRowFromQueryRow);
const columnRows = (await this.query(COLUMNS_SQL)).rows.map(columnRowFromQueryRow);
const constraintRows = (await this.query(CONSTRAINTS_SQL)).rows.map(constraintRowFromQueryRow);
const columnsByTable = groupByTableKey(columnRows);
const constraintsByTable = groupByTableKey(constraintRows);
const tables = await Promise.all(
tableRows.map(async (table) => this.readTable(table, columnsByTable.get(tableKey(table)) ?? [], constraintsByTable.get(tableKey(table)) ?? [])),
);
const fileStats = existsSync(this.dbPath) ? statSync(this.dbPath) : null;
return {
connectionId: this.connectionId,
driver: this.driver,
extractedAt: this.now().toISOString(),
scope: {},
metadata: {
file_path: this.dbPath,
file_size: fileStats ? fileStats.size : 0,
table_count: tables.length,
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables,
};
}
async cleanup(): Promise<void> {}
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
return { headers: result.headers, rows: result.rows, totalRows: result.totalRows };
}
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]);
return { values, nullCount: null, distinctCount: null };
}
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
return null;
}
async executeReadOnly(input: KtxDuckDbReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
this.assertConnection(input.connectionId);
const result = await this.query(limitSqlForExecution(input.sql, input.maxRows));
return { ...result, rowCount: result.rows.length };
}
async getColumnDistinctValues(
table: KtxTableRef,
columnName: string,
options: KtxDuckDbColumnDistinctValuesOptions,
): Promise<KtxDuckDbColumnDistinctValuesResult | null> {
const sampleSize = options.sampleSize ?? 10000;
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinalityResult = await this.query(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize),
);
if (cardinalityResult.rows.length === 0) {
return null;
}
const cardinality = Number(cardinalityResult.rows[0][0]);
if (Number.isNaN(cardinality)) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valuesResult = await this.query(this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit));
return {
values: valuesResult.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => String(row[0])),
cardinality,
};
}
async getTableRowCount(tableName: string): Promise<number> {
const result = await this.query(`SELECT COUNT(*) AS count FROM ${this.dialect.quoteIdentifier(tableName)}`);
return Number(result.rows[0]?.[0] ?? 0);
}
qTableName(table: Pick<KtxTableRef, 'catalog' | 'db' | 'name'>): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
async cleanup(): Promise<void> {
if (this.state) {
this.state.connection.disconnectSync();
this.state.instance.closeSync();
this.state = null;
}
}
private async database(): Promise<DuckDbConnectionState> {
if (!this.state) {
assertDuckDbDatabaseFile(this.dbPath);
const { DuckDBInstance } = await this.nativeLoader.load();
const instance = await DuckDBInstance.create(this.dbPath, { access_mode: 'READ_ONLY' });
const connection = await instance.connect();
this.state = { instance, connection };
}
return this.state;
}
private async query(sql: string): Promise<Omit<KtxQueryResult, 'rowCount'>> {
const { connection } = await this.database();
const reader = await connection.runAndReadAll(assertReadOnlySql(sql));
const rows = normalizeQueryRows(reader.getRowsJS()).map((row) => row.map(normalizeDuckDbValue));
return {
headers: reader.columnNames(),
rows,
totalRows: rows.length,
};
}
private async readTable(
table: DuckDbTableRow,
columns: DuckDbColumnRow[],
constraints: DuckDbConstraintRow[],
): Promise<KtxSchemaTable> {
const primaryKeyColumns = new Set(
constraints
.filter((constraint) => constraint.constraintType === 'PRIMARY KEY')
.flatMap((constraint) => constraint.columnNames),
);
const estimatedRows =
table.type.toUpperCase().includes('VIEW')
? null
: Number(
(await this.query(`SELECT COUNT(*) AS count FROM ${this.qTableName(table)}`)).rows[0]?.[0] ?? 0,
);
return {
catalog: table.catalog,
db: table.db,
name: table.name,
kind: table.type.toUpperCase().includes('VIEW') ? 'view' : 'table',
comment: null,
estimatedRows,
columns: columns.map((column) => this.mapColumn(column, primaryKeyColumns)),
foreignKeys: this.mapForeignKeys(constraints),
};
}
private mapColumn(column: DuckDbColumnRow, primaryKeyColumns: Set<string>): KtxSchemaColumn {
return {
name: column.columnName,
nativeType: column.dataType,
normalizedType: this.dialect.mapDataType(column.dataType),
dimensionType: this.dialect.mapToDimensionType(column.dataType),
nullable: column.isNullable.toUpperCase() === 'YES' && !primaryKeyColumns.has(column.columnName),
primaryKey: primaryKeyColumns.has(column.columnName),
comment: null,
};
}
private mapForeignKeys(rows: DuckDbConstraintRow[]): KtxSchemaForeignKey[] {
const foreignKeys: KtxSchemaForeignKey[] = [];
for (const row of rows) {
if (row.constraintType !== 'FOREIGN KEY' || !row.referencedTable) continue;
row.columnNames.forEach((fromColumn, index) => {
const toColumn = row.referencedColumnNames[index];
if (!fromColumn || !toColumn || !row.referencedTable) return;
foreignKeys.push({
fromColumn,
toCatalog: null,
toDb: row.db,
toTable: row.referencedTable,
toColumn,
constraintName: row.constraintName,
});
});
}
return foreignKeys;
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`KTX DuckDB connector ${this.id} cannot serve connection ${connectionId}`);
}
}
}
export function createDuckDbQueryExecutor(): KtxSqlQueryExecutorPort {
return {
async execute(input: KtxSqlQueryExecutionInput): Promise<KtxSqlQueryExecutionResult> {
const connector = new KtxDuckDbScanConnector({
connectionId: input.connectionId,
projectDir: input.projectDir,
connection: input.connection as KtxDuckDbConnectionConfig | undefined,
});
try {
const result = await connector.executeReadOnly(
{ connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows },
{ runId: 'duckdb-query-executor' },
);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
command: 'SELECT',
rowCount: result.rowCount,
};
} finally {
await connector.cleanup();
}
},
};
}
function normalizeDuckDbValue(value: unknown): unknown {
if (typeof value === 'bigint') {
return Number.isSafeInteger(Number(value)) ? Number(value) : value.toString();
}
if (Array.isArray(value)) {
return value.map(normalizeDuckDbValue);
}
if (value && typeof value === 'object' && value.constructor === Object) {
return Object.fromEntries(
Object.entries(value as Record<string, unknown>).map(([key, nested]) => [key, normalizeDuckDbValue(nested)]),
);
}
return value;
}
function stringArray(value: unknown): string[] {
return Array.isArray(value) ? value.filter((item): item is string => typeof item === 'string') : [];
}
function nullableString(value: unknown): string | null {
return typeof value === 'string' && value.length > 0 ? value : null;
}
function tableKey(table: Pick<DuckDbTableRow, 'catalog' | 'db' | 'name'>): string {
return `${table.catalog ?? ''}\0${table.db ?? ''}\0${table.name}`;
}
function groupByTableKey<T extends Pick<DuckDbTableRow, 'catalog' | 'db' | 'name'>>(rows: T[]): Map<string, T[]> {
const byTable = new Map<string, T[]>();
for (const row of rows) {
const key = tableKey(row);
const current = byTable.get(key);
if (current) {
current.push(row);
} else {
byTable.set(key, [row]);
}
}
return byTable;
}
function tableRowFromQueryRow(row: unknown[]): DuckDbTableRow {
return {
catalog: nullableString(row[0]),
db: nullableString(row[1]),
name: String(row[2]),
type: String(row[3]),
};
}
function columnRowFromQueryRow(row: unknown[]): DuckDbColumnRow {
return {
catalog: nullableString(row[0]),
db: nullableString(row[1]),
name: String(row[2]),
tableName: String(row[2]),
columnName: String(row[3]),
dataType: String(row[4]),
isNullable: String(row[5]),
};
}
function constraintRowFromQueryRow(row: unknown[]): DuckDbConstraintRow {
return {
catalog: nullableString(row[0]),
db: nullableString(row[1]),
name: String(row[2]),
tableName: String(row[2]),
constraintType: String(row[3]),
constraintName: nullableString(row[4]),
columnNames: stringArray(row[5]),
referencedTable: nullableString(row[6]),
referencedColumnNames: stringArray(row[7]),
};
}

View file

@ -0,0 +1,71 @@
import type { KtxSchemaDimensionType, KtxTableRef } from '@ktx/context/scan';
export class KtxDuckDbDialect {
readonly type = 'duckdb';
quoteIdentifier(identifier: string): string {
return `"${identifier.replace(/"/g, '""')}"`;
}
formatTableName(table: Pick<KtxTableRef, 'catalog' | 'db' | 'name'>): string {
return [table.catalog, table.db, table.name]
.filter((part): part is string => !!part)
.map((part) => this.quoteIdentifier(part))
.join('.');
}
mapDataType(nativeType: string): string {
return nativeType;
}
mapToDimensionType(nativeType: string): KtxSchemaDimensionType {
const normalized = nativeType.toUpperCase().trim();
if (normalized.includes('DATE') || normalized.includes('TIME')) return 'time';
if (
normalized.includes('INT') ||
normalized.includes('DECIMAL') ||
normalized.includes('DOUBLE') ||
normalized.includes('FLOAT') ||
normalized.includes('NUMERIC') ||
normalized.includes('REAL')
) {
return 'number';
}
if (normalized.includes('BOOL')) return 'boolean';
return 'string';
}
generateSampleQuery(tableName: string, limit: number, columns?: string[]): string {
const columnList =
columns && columns.length > 0 ? columns.map((column) => this.quoteIdentifier(column)).join(', ') : '*';
return `SELECT ${columnList} FROM ${tableName} LIMIT ${limit}`;
}
generateColumnSampleQuery(tableName: string, columnName: string, limit: number): string {
const quoted = this.quoteIdentifier(columnName);
return `SELECT ${quoted} FROM ${tableName} WHERE ${quoted} IS NOT NULL AND TRIM(CAST(${quoted} AS VARCHAR)) != '' LIMIT ${limit}`;
}
generateCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
WITH sampled AS (
SELECT ${columnName} AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
LIMIT ${sampleSize}
)
SELECT COUNT(DISTINCT val) AS cardinality
FROM sampled
`;
}
generateDistinctValuesQuery(tableName: string, columnName: string, limit: number): string {
return `
SELECT DISTINCT CAST(${columnName} AS VARCHAR) AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
ORDER BY val
LIMIT ${limit}
`;
}
}

View file

@ -1,12 +1,21 @@
export {
assertDuckDbDatabaseFile,
createDuckDbQueryExecutor,
duckDbDatabasePathFromConfig,
isKtxDuckDbConnectionConfig,
KtxDuckDbScanConnector,
type KtxDuckDbColumnDistinctValuesOptions,
type KtxDuckDbColumnDistinctValuesResult,
type DuckDbDatabasePathInput,
type KtxDuckDbConnectionConfig,
type KtxDuckDbReadOnlyQueryInput,
type KtxDuckDbScanConnectorOptions,
} from './connector.js';
export { KtxDuckDbDialect } from './dialect.js';
export {
createDuckDbLiveDatabaseIntrospection,
type CreateDuckDbLiveDatabaseIntrospectionOptions,
} from './live-database-introspection.js';
export {
assertSupportedDuckDbPlatform,
currentDuckDbPlatform,

View file

@ -0,0 +1,30 @@
import type { LiveDatabaseIntrospectionPort } from '@ktx/context/ingest';
import type { KtxProjectConnectionConfig } from '@ktx/context/project';
import { KtxDuckDbScanConnector, type KtxDuckDbConnectionConfig } from './connector.js';
export interface CreateDuckDbLiveDatabaseIntrospectionOptions {
projectDir?: string;
connections: Record<string, KtxProjectConnectionConfig>;
now?: () => Date;
}
export function createDuckDbLiveDatabaseIntrospection(
options: CreateDuckDbLiveDatabaseIntrospectionOptions,
): LiveDatabaseIntrospectionPort {
return {
async extractSchema(connectionId: string) {
const connection = options.connections[connectionId] as KtxDuckDbConnectionConfig | undefined;
const connector = new KtxDuckDbScanConnector({
connectionId,
connection,
projectDir: options.projectDir,
now: options.now,
});
try {
return await connector.introspect({ connectionId, driver: 'duckdb' as never }, { runId: `duckdb-${connectionId}` });
} finally {
await connector.cleanup();
}
},
};
}