feat: Add duckdb connector (#308)

* refactor(duckdb): extract shared json-safe bigint helper

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(duckdb): add and register the duckdb primary connector

Add KtxDuckDbDialect, KtxDuckDbScanConnector (local file-backed, read-only,
never-create, main-schema introspection via information_schema and
duckdb_constraints() for foreign keys), and register the duckdb driver across
the dialect factory, driver registry, connection-type enum, warehouse descriptor,
config schema, scan normalization, connection test drivers, and status display.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(duckdb): route live-database ingest through the DuckDB connector

Add the DuckDB live-database introspection bridge and dispatch duckdb
connections to it in local-adapters, matching the SQLite path. Repoint the
config-rejection test off duckdb (now a valid driver) onto the no-driver case.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(duckdb): add duckdb to the setup database flow

Offer DuckDB in the interactive checklist and via ktx setup --database duckdb,
with a file-path prompt and duckdb-local default connection id, parallel to SQLite.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(duckdb): attach native duckdb files in federation

Native .duckdb members ATTACH with (READ_ONLY) and no TYPE/INSTALL/LOAD, since
the duckdb format needs no extension. attachTypeForDriver returns null for the
native case; buildAttachStatements builds load statements from non-null types
only and emits a conditional ATTACH clause.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* docs(duckdb): document the duckdb primary-source connector

Add a DuckDB section to the primary-sources integration page (config, read-only
never-create behavior, main-schema scope, federation) and update the
supported-driver assertion in dialects.test.ts to include duckdb.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(duckdb): use single-namespace display shape for main-only refs

DuckDB v1 introspects the main schema and sets db=null on every table, so its
display refs are single-namespace like SQLite. The ansi shape emitted a 1-part
table display it then refused to parse, breaking column-level display resolution.
Switch the dialect to the sqlite display shape and add a round-trip test plus a
composite-foreign-key test that were missing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor(duckdb): resolve connector dialect via getDialectForDriver

Route the connector's dialect through the shared factory like every other
connector, now that duckdb is registered. Single construction path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(duckdb): skip schema picker for single-file duckdb setup

DuckDB is a single-file, single-namespace ('main') database like SQLite,
but the setup scope step only skipped the schema picker for sqlite. DuckDB
fell into the multi-schema path with an empty schema list, rendering a
broken picker ("No matches found" for main). Extend the file-based-driver
early-return to cover duckdb so it ingests every table directly.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* refactor(duckdb): reuse shared config helper and derive scope skip

Route duckdb path resolution through the shared resolveStringReference
helper instead of a local third copy of env:/file: handling. Derive the
setup scope-picker skip from SCOPE_DISCOVERY_SPECS membership rather than
a hardcoded sqlite/duckdb driver list.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* test(duckdb): use a genuinely unknown driver in the rejection test

The merged "rejects unknown drivers" test used `driver: duckdb` as its
unknown-driver stand-in, which stopped being unknown once this branch
added the duckdb connector. Switch to `nonsense` so it again exercises
the unsupported-driver config error.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* test(duckdb): cover dialect, connector, and live-introspection branches

Codecov flagged uncovered branches as dead code; all are real connector,
dialect, and live-ingest behavior. Add unit tests instead of removing them.

- dialect: precedence ladder, sample/clause builders, profiling expressions
- connector: url/env config forms, error throws, never-create guard,
  cardinality cap branches, table-scope empty/non-empty paths
- live-introspection: full-schema and table-scope extraction

Functions 100%, lines ~99% across the duckdb connector dir.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* docs: add DuckDB to supported-driver references

The DuckDB connector PR documented the connector itself but left the
scattered supported-driver enumerations stale. Add duckdb to the
federation concept page (participation table, activation, table naming,
limitations), the ktx setup CLI reference, the ktx.yaml warehouse-driver
table, the primary-sources field reference, and the quickstart driver
list (which also restores the missing ClickHouse entry).

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Andrey Avtomonov <andreybavt@gmail.com>
This commit is contained in:
Kevin Messiaen 2026-07-01 19:06:02 +07:00 committed by GitHub
parent f21594c42a
commit 3c4fcc27c7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1366 additions and 59 deletions

View file

@ -38,6 +38,7 @@ function llmBackend(value: string): KtxSetupLlmBackend {
function databaseDriver(value: string): KtxSetupDatabaseDriver {
if (
value === 'sqlite' ||
value === 'duckdb' ||
value === 'postgres' ||
value === 'mysql' ||
value === 'clickhouse' ||

View file

@ -3,6 +3,7 @@ import type { KtxProjectConnectionConfig } from './context/project/config.js';
/** @internal Canonical SQL-warehouse driver ids; the dialect-notes coverage test derives its required coverage from this set. */
export const KTX_DATABASE_DRIVER_IDS = [
'sqlite',
'duckdb',
'postgres',
'mysql',
'clickhouse',

View file

@ -51,6 +51,7 @@ export interface KtxConnectionDeps {
const SUPPORTED_TEST_DRIVERS = [
'sqlite',
'duckdb',
'postgres',
'mysql',
'clickhouse',

View file

@ -0,0 +1,395 @@
import { DuckDBInstance, type DuckDBConnection } from '@duckdb/node-api';
import { existsSync, statSync } from 'node:fs';
import { isAbsolute, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { resolveStringReference } from '../shared/string-reference.js';
import { getSqlDialectForDriver } from '../../context/connections/dialects.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { normalizeQueryRows } from '../../context/connections/query-executor.js';
import { toJsonSafeRows } from '../shared/duckdb-json-safe.js';
import {
connectorTestFailure,
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxConnectorTestResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '../../context/scan/types.js';
import { scopedTableNames } from '../../context/scan/table-ref.js';
const MAIN_SCHEMA = 'main';
export interface KtxDuckDbConnectionConfig {
driver?: string;
path?: string;
url?: string;
[key: string]: unknown;
}
/** @internal */
export interface DuckDbDatabasePathInput {
connectionId: string;
projectDir?: string;
connection: KtxDuckDbConnectionConfig | undefined;
}
export interface KtxDuckDbScanConnectorOptions extends DuckDbDatabasePathInput {
now?: () => Date;
}
export interface KtxDuckDbColumnDistinctValuesOptions {
maxCardinality: number;
limit: number;
sampleSize?: number;
}
export interface KtxDuckDbColumnDistinctValuesResult {
values: string[] | null;
cardinality: number;
}
interface InfoSchemaTableRow {
table_name: string;
table_type: string;
}
interface InfoSchemaColumnRow {
column_name: string;
data_type: string;
is_nullable: string;
}
// `path` may be an env:/file: reference; `url` resolves env: only, since file:
// on a url is a native URI form (handled by duckDbPathFromUrl), not a file read.
function stringConfigValue(
connection: KtxDuckDbConnectionConfig | undefined,
key: 'path' | 'url',
): string | undefined {
const value = connection?.[key];
if (typeof value !== 'string' || value.trim().length === 0) {
return undefined;
}
const trimmed = value.trim();
if (key === 'url') {
return trimmed.startsWith('env:') ? (process.env[trimmed.slice('env:'.length)] ?? '') : trimmed;
}
return resolveStringReference(trimmed, process.env);
}
function duckDbPathFromUrl(url: string): string {
if (url.startsWith('file:')) {
return fileURLToPath(url);
}
if (url.startsWith('duckdb:')) {
const parsed = new URL(url);
return decodeURIComponent(parsed.pathname);
}
return url;
}
export function isKtxDuckDbConnectionConfig(
connection: KtxDuckDbConnectionConfig | undefined,
): connection is KtxDuckDbConnectionConfig {
return String(connection?.driver ?? '').toLowerCase() === 'duckdb';
}
/** @internal */
export function duckDbDatabasePathFromConfig(input: DuckDbDatabasePathInput): string {
const inputDriver = input.connection?.driver ?? 'unknown';
if (!isKtxDuckDbConnectionConfig(input.connection)) {
throw new Error(`Native DuckDB connector cannot run driver "${inputDriver}"`);
}
const configuredPath =
stringConfigValue(input.connection, 'path') ?? duckDbPathFromUrl(stringConfigValue(input.connection, 'url') ?? '');
if (!configuredPath) {
throw new Error(`Native DuckDB connector requires connections.${input.connectionId}.path or url`);
}
return isAbsolute(configuredPath) ? configuredPath : resolve(input.projectDir ?? process.cwd(), configuredPath);
}
export class KtxDuckDbScanConnector implements KtxScanConnector {
readonly id: string;
readonly driver = 'duckdb' as const;
readonly capabilities = createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: true,
nestedAnalysis: false,
formalForeignKeys: true,
estimatedRowCounts: true,
});
private readonly connectionId: string;
private readonly dbPath: string;
private readonly now: () => Date;
private readonly dialect = getSqlDialectForDriver('duckdb');
private instance: DuckDBInstance | null = null;
private connection: DuckDBConnection | null = null;
constructor(options: KtxDuckDbScanConnectorOptions) {
this.connectionId = options.connectionId;
this.dbPath = duckDbDatabasePathFromConfig(options);
this.now = options.now ?? (() => new Date());
this.id = `duckdb:${options.connectionId}`;
}
async testConnection(): Promise<KtxConnectorTestResult> {
try {
if (!existsSync(this.dbPath) || !statSync(this.dbPath).isFile()) {
return { success: false, error: `File not found: ${this.dbPath}` };
}
await this.query('SELECT 1');
return { success: true };
} catch (error) {
return connectorTestFailure(error);
}
}
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: null }) : null;
const tableRows = await this.readTableRows(scopedNames);
const tables: KtxSchemaTable[] = [];
for (const row of tableRows) {
tables.push(await this.readTable(row));
}
return {
connectionId: this.connectionId,
driver: 'duckdb' as const,
extractedAt: this.now().toISOString(),
scope: {},
metadata: {
file_path: this.dbPath,
table_count: tables.length,
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables,
};
}
async listSchemas(): Promise<string[]> {
return [MAIN_SCHEMA];
}
async listTables(_schemas?: string[]): Promise<KtxTableListEntry[]> {
const rows = await this.readTableRows(null);
return rows.map((row) => ({
catalog: null,
schema: MAIN_SCHEMA,
name: row.table_name,
kind: row.table_type === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns),
);
return { headers: result.headers, rows: result.rows, totalRows: result.totalRows };
}
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]);
return { values, nullCount: null, distinctCount: null };
}
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
return null;
}
async executeReadOnly(input: KtxReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
this.assertConnection(input.connectionId);
const result = await this.query(limitSqlForExecution(input.sql, input.maxRows));
return { ...result, rowCount: result.rows.length };
}
async getColumnDistinctValues(
table: KtxTableRef,
columnName: string,
options: KtxDuckDbColumnDistinctValuesOptions,
): Promise<KtxDuckDbColumnDistinctValuesResult | null> {
const sampleSize = options.sampleSize ?? 10000;
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinalityResult = await this.query(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize),
);
if (cardinalityResult.rows.length === 0) {
return null;
}
const cardinality = Number(cardinalityResult.rows[0][0]);
if (Number.isNaN(cardinality)) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valuesResult = await this.query(
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
);
return {
values: valuesResult.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => String(row[0])),
cardinality,
};
}
async getTableRowCount(tableName: string): Promise<number> {
const result = await this.query(`SELECT COUNT(*) AS count FROM ${this.dialect.quoteIdentifier(tableName)}`);
return Number(result.rows[0]?.[0] ?? 0);
}
qTableName(table: Pick<KtxTableRef, 'name'>): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
async cleanup(): Promise<void> {
this.connection?.closeSync();
this.instance?.closeSync();
this.connection = null;
this.instance = null;
}
private async db(): Promise<DuckDBConnection> {
if (!this.connection) {
// DuckDBInstance.create() creates the file if missing, so this pre-check
// enforces the never-create rule. Do not remove it.
if (!existsSync(this.dbPath) || !statSync(this.dbPath).isFile()) {
throw new Error(`File not found: ${this.dbPath}`);
}
this.instance = await DuckDBInstance.create(this.dbPath, { access_mode: 'read_only' });
this.connection = await this.instance.connect();
}
return this.connection;
}
private async query(sql: string): Promise<Omit<KtxQueryResult, 'rowCount'>> {
const connection = await this.db();
const reader = await connection.runAndReadAll(assertReadOnlySql(sql));
const rows = toJsonSafeRows(normalizeQueryRows(reader.getRows()));
return {
headers: reader.columnNames(),
rows,
totalRows: rows.length,
};
}
private async readTableRows(scopedNames: string[] | null): Promise<InfoSchemaTableRow[]> {
if (scopedNames && scopedNames.length === 0) {
return [];
}
const scopeClause = scopedNames
? `AND table_name IN (${scopedNames.map((name) => `'${name.replaceAll("'", "''")}'`).join(', ')})`
: '';
const result = await this.query(
`SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = '${MAIN_SCHEMA}' ${scopeClause}
ORDER BY table_name`,
);
return result.rows.map((row) => ({ table_name: String(row[0]), table_type: String(row[1]) }));
}
private async readTable(table: InfoSchemaTableRow): Promise<KtxSchemaTable> {
const columnsResult = await this.query(
`SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '${MAIN_SCHEMA}' AND table_name = '${table.table_name.replaceAll("'", "''")}'
ORDER BY ordinal_position`,
);
const columns = columnsResult.rows.map<InfoSchemaColumnRow>((row) => ({
column_name: String(row[0]),
data_type: String(row[1]),
is_nullable: String(row[2]),
}));
const primaryKeys = await this.readPrimaryKeyColumns(table.table_name);
const isView = table.table_type === 'VIEW';
const estimatedRows = isView ? null : await this.getTableRowCount(table.table_name);
return {
catalog: null,
db: null,
name: table.table_name,
kind: isView ? 'view' : 'table',
comment: null,
estimatedRows,
columns: columns.map((column) => ({
name: column.column_name,
nativeType: column.data_type,
normalizedType: this.dialect.mapDataType(column.data_type),
dimensionType: this.dialect.mapToDimensionType(column.data_type),
nullable: column.is_nullable === 'YES' && !primaryKeys.has(column.column_name),
primaryKey: primaryKeys.has(column.column_name),
comment: null,
})),
foreignKeys: await this.readForeignKeys(table.table_name),
};
}
private async readPrimaryKeyColumns(tableName: string): Promise<Set<string>> {
const result = await this.query(
`SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema
WHERE tc.table_schema = '${MAIN_SCHEMA}'
AND tc.table_name = '${tableName.replaceAll("'", "''")}'
AND tc.constraint_type = 'PRIMARY KEY'`,
);
return new Set(result.rows.map((row) => String(row[0])));
}
private async readForeignKeys(tableName: string): Promise<KtxSchemaForeignKey[]> {
// information_schema.constraint_column_usage in DuckDB returns the constrained
// columns (source), not the referenced columns. Use duckdb_constraints() which
// exposes constraint_column_names and referenced_column_names directly.
const result = await this.query(
`SELECT unnest(constraint_column_names) AS from_column,
referenced_table,
unnest(referenced_column_names) AS to_column,
constraint_name
FROM duckdb_constraints()
WHERE schema_name = '${MAIN_SCHEMA}'
AND table_name = '${tableName.replaceAll("'", "''")}'
AND constraint_type = 'FOREIGN KEY'`,
);
return result.rows.map((row) => ({
fromColumn: String(row[0]),
toCatalog: null,
toDb: null,
toTable: String(row[1]),
toColumn: String(row[2]),
constraintName: row[3] === null ? null : String(row[3]),
}));
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`ktx DuckDB connector ${this.id} cannot serve connection ${connectionId}`);
}
}
}

View file

@ -0,0 +1,192 @@
import type { KtxSqlDialect } from '../../context/connections/dialects.js';
import {
columnDisplayPartCount,
formatDialectDisplayRef,
formatDialectTableName,
limitOffsetClause,
parseDialectDisplayRef,
} from '../../context/connections/dialect-helpers.js';
import type { KtxSchemaDimensionType, KtxTableRef } from '../../context/scan/types.js';
type DuckDbTableNameRef = Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>;
/** @internal */
export class KtxDuckDbDialect implements KtxSqlDialect {
readonly type = 'duckdb' as const;
private readonly typeMappings: Record<string, KtxSchemaDimensionType> = {
TIMESTAMP: 'time',
'TIMESTAMP WITH TIME ZONE': 'time',
TIMESTAMPTZ: 'time',
DATE: 'time',
TIME: 'time',
BIGINT: 'number',
INTEGER: 'number',
SMALLINT: 'number',
TINYINT: 'number',
HUGEINT: 'number',
UBIGINT: 'number',
UINTEGER: 'number',
DECIMAL: 'number',
NUMERIC: 'number',
REAL: 'number',
FLOAT: 'number',
DOUBLE: 'number',
VARCHAR: 'string',
CHAR: 'string',
TEXT: 'string',
BLOB: 'string',
UUID: 'string',
BOOLEAN: 'boolean',
BOOL: 'boolean',
};
quoteIdentifier(identifier: string): string {
return `"${identifier.replace(/"/g, '""')}"`;
}
// v1 introspects the `main` schema only and sets db=null on every table, so
// refs are single-namespace like SQLite — use the matching display shape.
formatTableName(table: DuckDbTableNameRef): string {
return formatDialectTableName(table, this.quoteIdentifier.bind(this), 'sqlite');
}
formatDisplayRef(table: DuckDbTableNameRef): string {
return formatDialectDisplayRef(table, 'sqlite');
}
parseDisplayRef(display: string): KtxTableRef | null {
return parseDialectDisplayRef(display, 'sqlite');
}
columnDisplayTablePartCount(): 1 | 2 | 3 {
return columnDisplayPartCount('sqlite');
}
mapDataType(nativeType: string): string {
return nativeType;
}
mapToDimensionType(nativeType: string): KtxSchemaDimensionType {
if (!nativeType) {
return 'string';
}
let normalized = nativeType.toUpperCase().trim();
if (normalized.includes('(')) {
normalized = normalized.split('(')[0].trim();
}
if (this.typeMappings[normalized]) {
return this.typeMappings[normalized];
}
if (normalized.includes('TIME') || normalized.includes('DATE')) {
return 'time';
}
if (
normalized.includes('INT') ||
normalized.includes('DEC') ||
normalized.includes('NUM') ||
normalized.includes('REAL') ||
normalized.includes('FLOAT') ||
normalized.includes('DOUBLE')
) {
return 'number';
}
if (normalized.includes('BOOL')) {
return 'boolean';
}
return 'string';
}
generateSampleQuery(tableName: string, limit: number, columns?: string[]): string {
const columnList =
columns && columns.length > 0 ? columns.map((column) => this.quoteIdentifier(column)).join(', ') : '*';
return `SELECT ${columnList} FROM ${tableName} LIMIT ${limit}`;
}
generateColumnSampleQuery(tableName: string, columnName: string, limit: number): string {
const quoted = this.quoteIdentifier(columnName);
return `SELECT ${quoted} FROM ${tableName} WHERE ${quoted} IS NOT NULL AND TRIM(CAST(${quoted} AS VARCHAR)) != '' LIMIT ${limit}`;
}
getRandomSampleFilter(samplePct: number): string {
if (samplePct <= 0 || samplePct >= 1) {
return '';
}
return `RANDOM() < ${samplePct}`;
}
getTableSampleClause(samplePct: number): string {
if (samplePct <= 0 || samplePct >= 1) {
return '';
}
return `USING SAMPLE ${Math.round(samplePct * 100)} PERCENT (bernoulli)`;
}
getLimitOffsetClause(limit: number, offset?: number): string {
return limitOffsetClause(limit, offset);
}
getTopClause(_limit: number): string {
return '';
}
getNullCountExpression(column: string): string {
return `SUM(CASE WHEN ${column} IS NULL THEN 1 ELSE 0 END)`;
}
getDistinctCountExpression(column: string): string {
return `COUNT(DISTINCT ${column})`;
}
textLengthExpression(columnSql: string): string {
return `LENGTH(CAST(${columnSql} AS VARCHAR))`;
}
castToText(columnSql: string): string {
return `CAST(${columnSql} AS VARCHAR)`;
}
getSampleValueAggregation(innerSql: string): string {
return `(SELECT STRING_AGG(CAST(value AS VARCHAR), chr(31)) FROM (${innerSql}) AS relationship_profile_values)`;
}
generateCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
WITH sampled AS (
SELECT ${columnName} AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
LIMIT ${sampleSize}
)
SELECT COUNT(DISTINCT val) AS cardinality
FROM sampled
`;
}
generateDistinctValuesQuery(tableName: string, columnName: string, limit: number): string {
return `
SELECT DISTINCT CAST(${columnName} AS VARCHAR) AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
ORDER BY val
LIMIT ${limit}
`;
}
generateColumnStatisticsQuery(_schemaName: string, _tableName: string): string | null {
return null;
}
generateRandomizedCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
WITH sampled AS (
SELECT ${columnName} AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
USING SAMPLE ${sampleSize} ROWS
)
SELECT COUNT(DISTINCT val) AS cardinality
FROM sampled
`;
}
}

View file

@ -4,6 +4,7 @@ import {
mysqlConnectionPoolConfigFromConfig,
type KtxMysqlConnectionConfig,
} from '../mysql/connector.js';
import { duckDbDatabasePathFromConfig, type KtxDuckDbConnectionConfig } from '../../connectors/duckdb/connector.js';
import type { FederatedMember } from '../../context/connections/federation.js';
function kvKeyword(value: string): string {
@ -74,6 +75,12 @@ function mysqlAttachString(member: FederatedMember, env: NodeJS.ProcessEnv): str
*/
export function federatedAttachTarget(member: FederatedMember, env: NodeJS.ProcessEnv): string {
switch (member.driver.toLowerCase()) {
case 'duckdb':
return duckDbDatabasePathFromConfig({
connectionId: member.connectionId,
projectDir: member.projectDir,
connection: member.connection as KtxDuckDbConnectionConfig,
});
case 'sqlite':
return sqliteDatabasePathFromConfig({
connectionId: member.connectionId,

View file

@ -7,25 +7,12 @@ import type {
import { normalizeQueryRows } from '../../context/connections/query-executor.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { attachTypeForDriver, type FederatedMember } from '../../context/connections/federation.js';
import { toJsonSafeRows } from '../shared/duckdb-json-safe.js';
function quoteDuckdbIdentifier(id: string): string {
return `"${id.replaceAll('"', '""')}"`;
}
const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER);
const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER);
// DuckDB returns integer columns as JS bigint (unserializable by JSON). Values
// in Number's safe range become Number; larger magnitudes become strings so a
// BIGINT beyond 2^53 keeps its exact value instead of silently rounding.
function jsonSafeBigint(value: bigint): number | string {
return value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT ? Number(value) : value.toString();
}
function toJsonSafeRows(rows: unknown[][]): unknown[][] {
return rows.map((row) => row.map((cell) => (typeof cell === 'bigint' ? jsonSafeBigint(cell) : cell)));
}
/** @internal */
export function buildAttachStatements(members: FederatedMember[], env: NodeJS.ProcessEnv): string[] {
const attachments = members.map((member) => ({
@ -34,13 +21,12 @@ export function buildAttachStatements(members: FederatedMember[], env: NodeJS.Pr
alias: member.connectionId,
}));
const loadStatements = [...new Set(attachments.map((a) => a.type))].map(
(type) => `INSTALL ${type}; LOAD ${type};`,
);
const attachStatements = attachments.map(
({ type, url, alias }) =>
`ATTACH '${url.replaceAll("'", "''")}' AS ${quoteDuckdbIdentifier(alias)} (TYPE ${type}, READ_ONLY);`,
);
const loadTypes = [...new Set(attachments.map((a) => a.type).filter((type): type is string => type !== null))];
const loadStatements = loadTypes.map((type) => `INSTALL ${type}; LOAD ${type};`);
const attachStatements = attachments.map(({ type, url, alias }) => {
const options = type === null ? 'READ_ONLY' : `TYPE ${type}, READ_ONLY`;
return `ATTACH '${url.replaceAll("'", "''")}' AS ${quoteDuckdbIdentifier(alias)} (${options});`;
});
return [...loadStatements, ...attachStatements];
}

View file

@ -0,0 +1,40 @@
import type {
LiveDatabaseIntrospectionOptions,
LiveDatabaseIntrospectionPort,
} from '../../context/ingest/adapters/live-database/types.js';
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
import { KtxDuckDbScanConnector, type KtxDuckDbConnectionConfig } from './connector.js';
export interface CreateDuckDbLiveDatabaseIntrospectionOptions {
projectDir?: string;
connections: Record<string, KtxProjectConnectionConfig>;
now?: () => Date;
}
export function createDuckDbLiveDatabaseIntrospection(
options: CreateDuckDbLiveDatabaseIntrospectionOptions,
): LiveDatabaseIntrospectionPort {
return {
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
const connection = options.connections[connectionId] as KtxDuckDbConnectionConfig | undefined;
const connector = new KtxDuckDbScanConnector({
connectionId,
connection,
projectDir: options.projectDir,
now: options.now,
});
try {
return await connector.introspect(
{
connectionId,
driver: 'duckdb',
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
},
{ runId: `duckdb-${connectionId}` },
);
} finally {
await connector.cleanup();
}
},
};
}

View file

@ -0,0 +1,14 @@
const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER);
const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER);
// DuckDB returns integer columns as JS bigint (unserializable by JSON). Values
// in Number's safe range become Number; larger magnitudes become strings so a
// BIGINT beyond 2^53 keeps its exact value instead of silently rounding.
/** @internal */
export function jsonSafeBigint(value: bigint): number | string {
return value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT ? Number(value) : value.toString();
}
export function toJsonSafeRows(rows: unknown[][]): unknown[][] {
return rows.map((row) => row.map((cell) => (typeof cell === 'bigint' ? jsonSafeBigint(cell) : cell)));
}

View file

@ -3,6 +3,7 @@ import { z } from 'zod';
export const connectionTypeSchema = z.enum([
'POSTGRESQL',
'SQLITE',
'DUCKDB',
'SQLSERVER',
'BIGQUERY',
'SNOWFLAKE',

View file

@ -1,5 +1,6 @@
import { KtxBigQueryDialect } from '../../connectors/bigquery/dialect.js';
import { KtxClickHouseDialect } from '../../connectors/clickhouse/dialect.js';
import { KtxDuckDbDialect } from '../../connectors/duckdb/dialect.js';
import { KtxMongoDbDialect } from '../../connectors/mongodb/dialect.js';
import { KtxMysqlDialect } from '../../connectors/mysql/dialect.js';
import { KtxPostgresDialect } from '../../connectors/postgres/dialect.js';
@ -55,6 +56,7 @@ type KtxSqlDriver = Exclude<KtxConnectionDriver, 'mongodb'>;
const sqlDialectFactories: Record<KtxSqlDriver, () => KtxSqlDialect> = {
bigquery: () => new KtxBigQueryDialect(),
clickhouse: () => new KtxClickHouseDialect(),
duckdb: () => new KtxDuckDbDialect(),
mysql: () => new KtxMysqlDialect(),
postgres: () => new KtxPostgresDialect(),
sqlite: () => new KtxSqliteDialect(),

View file

@ -68,6 +68,27 @@ export const driverRegistrations: Record<KtxConnectionDriver, KtxDriverRegistrat
};
},
},
duckdb: {
driver: 'duckdb',
scopeConfigKey: null,
hasHistoricSqlReader: false,
load: async () => {
const m = await import('../../connectors/duckdb/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxDuckDbConnectionConfig>[0];
return m.isKtxDuckDbConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection, projectDir }) => {
const typedConnection = connection as Parameters<typeof m.isKtxDuckDbConnectionConfig>[0];
if (!m.isKtxDuckDbConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('duckdb');
}
return new m.KtxDuckDbScanConnector({ connectionId, connection: typedConnection, projectDir });
},
};
},
},
mongodb: {
driver: 'mongodb',
scopeConfigKey: 'databases',

View file

@ -4,18 +4,19 @@ import type { KtxProjectConnectionConfig } from '../project/config.js';
export const FEDERATED_CONNECTION_ID = '_ktx_federated';
/**
* Drivers DuckDB can ATTACH for federation. The driver name doubles as the
* DuckDB extension/TYPE name, so this set is the single source of truth for
* both membership (a driver participates iff it appears here) and attach type.
* Drivers DuckDB can ATTACH for federation. Membership is governed by this set;
* the attach TYPE is governed by attachTypeForDriver, which returns the driver
* name for extension-backed engines and null for a native DuckDB file (attached
* with no INSTALL/LOAD and no TYPE).
*/
const ATTACH_COMPATIBLE_DRIVERS = new Set(['postgres', 'mysql', 'sqlite']);
const ATTACH_COMPATIBLE_DRIVERS = new Set(['postgres', 'mysql', 'sqlite', 'duckdb']);
export function attachTypeForDriver(driver: string): string {
export function attachTypeForDriver(driver: string): string | null {
const normalized = driver.toLowerCase();
if (!ATTACH_COMPATIBLE_DRIVERS.has(normalized)) {
throw new Error(`Driver "${driver}" cannot be attached by DuckDB federation.`);
}
return normalized;
return normalized === 'duckdb' ? null : normalized;
}
export interface FederatedMember {

View file

@ -23,6 +23,7 @@ export interface LocalConnectionInfo {
const DRIVER_TO_CONNECTION_TYPE: Record<string, ConnectionType> = {
postgres: 'POSTGRESQL',
sqlite: 'SQLITE',
duckdb: 'DUCKDB',
sqlserver: 'SQLSERVER',
mysql: 'MYSQL',
clickhouse: 'CLICKHOUSE',

View file

@ -11,6 +11,7 @@ const warehouseDrivers = [
'snowflake',
'bigquery',
'sqlite',
'duckdb',
'clickhouse',
'sqlserver',
] as const;
@ -52,6 +53,7 @@ const warehouseConnectionSchemas = [
warehouseConnectionSchema('snowflake'),
warehouseConnectionSchema('bigquery'),
warehouseConnectionSchema('sqlite'),
warehouseConnectionSchema('duckdb'),
warehouseConnectionSchema('clickhouse'),
warehouseConnectionSchema('sqlserver'),
] as const;

View file

@ -141,6 +141,7 @@ function normalizeDriver(driver: string | undefined): KtxConnectionDriver {
if (
normalized === 'postgres' ||
normalized === 'sqlite' ||
normalized === 'duckdb' ||
normalized === 'mysql' ||
normalized === 'clickhouse' ||
normalized === 'sqlserver' ||
@ -151,7 +152,7 @@ function normalizeDriver(driver: string | undefined): KtxConnectionDriver {
return normalized;
}
throw new Error(
`Standalone ktx scan supports postgres/sqlite/mysql/clickhouse/sqlserver/bigquery/snowflake/mongodb in this phase, received "${driver ?? 'unknown'}"`,
`Standalone ktx scan supports postgres/sqlite/duckdb/mysql/clickhouse/sqlserver/bigquery/snowflake/mongodb in this phase, received "${driver ?? 'unknown'}"`,
);
}

View file

@ -2,6 +2,7 @@ import type { KtxTableRefKey } from './table-ref.js';
export type KtxConnectionDriver =
| 'sqlite'
| 'duckdb'
| 'postgres'
| 'sqlserver'
| 'bigquery'

View file

@ -6,8 +6,7 @@ import type { SqlAnalysisDialect } from './ports.js';
// dialect), served by the sql_dialect_notes MCP tool. They are package-internal:
// copy-runtime-assets.mjs ships them to dist, and they are never installed onto an
// agent target. The set covers every dialect reachable from a configured warehouse
// driver; duckdb/databricks are intentionally absent because no connector produces
// them.
// driver; databricks is intentionally absent because no connector produces it.
/** @internal Dialects with an authored ./dialects/<dialect>.md file. */
export const DIALECTS_WITH_NOTES = [
@ -16,6 +15,7 @@ export const DIALECTS_WITH_NOTES = [
'snowflake',
'bigquery',
'sqlite',
'duckdb',
'clickhouse',
'tsql',
] as const;

View file

@ -0,0 +1,10 @@
**duckdb** SQL conventions:
- **FQTN:** `schema.table` within one database (e.g. `main.orders`); a bare `table` resolves against `main`. A second, attached database is `db.schema.table` (e.g. `sales.main.orders`).
- **Identifiers:** case-insensitive; double-quote (`"Name"`) to keep a name with spaces or a reserved word.
- **Date/time:** native `DATE`/`TIMESTAMP`. Bucket with `date_trunc('month', ts)`, pull parts with `EXTRACT(YEAR FROM ts)`, format with `strftime(ts, '%Y-%m')`, and use `CURRENT_DATE`; cast text with `col::DATE` (or `TRY_CAST(col AS DATE)` to null bad values).
- **Series:** `FROM generate_series(DATE '2023-01-01', DATE '2023-12-01', INTERVAL 1 MONTH) AS s(d)` builds a date spine (use `range(...)` for integers), then `LEFT JOIN` the aggregated facts onto it so empty periods still appear.
- **Rolling window over time:** a native calendar-range frame spans real dates and tolerates gaps — `AVG(amount) OVER (ORDER BY day RANGE BETWEEN INTERVAL 29 DAYS PRECEDING AND CURRENT ROW)` is a trailing 30-day average without a spine; guard minimum periods with `COUNT(*) OVER (<same frame>)`.
- **Integer division:** unlike postgres, `/` is true division (`5 / 2``2.5`), so a ratio keeps its fraction; use `//` for floor division (`5 // 2``2`) when you want the integer quotient, and round only in the final projection.
- **Safe cast:** duckdb has `TRY_CAST``TRY_CAST(x AS DOUBLE)` yields `NULL` for a value that does not parse instead of raising, so counting residual `NULL`s among non-sentinel rows catches an encoding the sample missed without a regex guard.
- **Top-N / windows:** filter a window inline with `QUALIFY``SELECT ... QUALIFY ROW_NUMBER() OVER (PARTITION BY key ORDER BY x DESC) = 1` returns one row per key without a wrapping CTE; use `ORDER BY ... LIMIT n` for a global top-N.
- **JSON / semi-structured:** `col->'k'` returns JSON, `col->>'k'` returns text, deep path `json_extract(col, '$.a.b')`; duckdb also has native `STRUCT`, `LIST`, and `MAP` — read a struct field with `col.field` and a list element with `col[1]` (1-indexed).

View file

@ -9,6 +9,8 @@ import { isKtxPostgresConnectionConfig, type KtxPostgresConnectionConfig } from
import { KtxPostgresHistoricSqlQueryClient } from './connectors/postgres/historic-sql-query-client.js';
import { createSqliteLiveDatabaseIntrospection } from './connectors/sqlite/live-database-introspection.js';
import { isKtxSqliteConnectionConfig } from './connectors/sqlite/connector.js';
import { createDuckDbLiveDatabaseIntrospection } from './connectors/duckdb/live-database-introspection.js';
import { isKtxDuckDbConnectionConfig } from './connectors/duckdb/connector.js';
import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js';
import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js';
import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js';
@ -104,6 +106,10 @@ function createKtxCliLiveDatabaseIntrospection(
projectDir: project.projectDir,
connections: project.config.connections,
});
const duckdb = createDuckDbLiveDatabaseIntrospection({
projectDir: project.projectDir,
connections: project.config.connections,
});
const mysql = createMysqlLiveDatabaseIntrospection({
connections: project.config.connections,
});
@ -139,6 +145,9 @@ function createKtxCliLiveDatabaseIntrospection(
if (isKtxSqliteConnectionConfig(connection)) {
return sqlite.extractSchema(connectionId, options);
}
if (isKtxDuckDbConnectionConfig(connection)) {
return duckdb.extractSchema(connectionId, options);
}
if (isKtxMysqlConnectionConfig(connection)) {
return mysql.extractSchema(connectionId, options);
}

View file

@ -64,6 +64,7 @@ const execFileAsync = promisify(execFileCallback);
export type KtxSetupDatabaseDriver =
| 'sqlite'
| 'duckdb'
| 'postgres'
| 'mysql'
| 'clickhouse'
@ -159,6 +160,7 @@ const DRIVER_OPTIONS: Array<{ value: KtxSetupDatabaseDriver; label: string }> =
{ value: 'sqlserver', label: 'SQL Server' },
{ value: 'mongodb', label: 'MongoDB' },
{ value: 'sqlite', label: 'SQLite' },
{ value: 'duckdb', label: 'DuckDB' },
];
const DRIVER_LABELS = Object.fromEntries(DRIVER_OPTIONS.map((option) => [option.value, option.label])) as Record<
@ -174,6 +176,7 @@ const HISTORIC_SQL_DIALECT_BY_DRIVER: Partial<Record<KtxSetupDatabaseDriver, His
const DEFAULT_CONNECTION_IDS: Record<KtxSetupDatabaseDriver, string> = {
sqlite: 'sqlite-local',
duckdb: 'duckdb-local',
postgres: 'postgres-warehouse',
mysql: 'mysql-warehouse',
clickhouse: 'clickhouse-warehouse',
@ -813,6 +816,18 @@ async function buildConnectionConfig(input: {
if (path === undefined) return 'back';
return path ? { driver: 'sqlite', path } : null;
}
if (driver === 'duckdb') {
if (args.inputMode === 'disabled' && !args.databaseUrl) return null;
const path =
args.databaseUrl ??
(await promptText(
prompts,
'DuckDB database file\nEnter a relative or absolute path, for example ./warehouse.duckdb.',
stringConfigField(input.existingConnection, 'path'),
));
if (path === undefined) return 'back';
return path ? { driver: 'duckdb', path } : null;
}
if (driver === 'postgres' || driver === 'mysql' || driver === 'clickhouse' || driver === 'sqlserver') {
return await buildUrlConnectionConfig({
driver,
@ -1417,9 +1432,11 @@ async function maybeConfigureDatabaseScope(input: {
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
const driver = normalizeDriver(connection?.driver);
if (!driver || driver === 'sqlite') return okValidateResult();
const spec = driver ? SCOPE_DISCOVERY_SPECS[driver] : undefined;
// Drivers with no scope spec are single-namespace (sqlite, duckdb): there is no
// schema to choose, so skip the scope picker and ingest every table.
if (!driver || !spec) return okValidateResult();
const spec = SCOPE_DISCOVERY_SPECS[driver];
const existingTables = connection?.enabled_tables;
const hasExistingTables = Array.isArray(existingTables) && existingTables.length > 0;
const existingScope = spec ? configuredScopeValues(connection, spec) : [];

View file

@ -412,6 +412,7 @@ function buildConnectionStatus(
const hint = envHint((conn as Record<string, unknown>).credentials_json);
return warn(hint ? `credentials missing (env: ${hint})` : 'credentials not set', hint ? `Set ${hint}` : 'Rerun `ktx setup`');
}
case 'duckdb':
case 'sqlite': {
const path = (conn as Record<string, unknown>).path;
if (typeof path === 'string' && path.length > 0) return ok(`path: ${path}`);
@ -553,7 +554,7 @@ async function buildQueryHistoryStatus(
}
const ADAPTER_DRIVER_REQUIREMENT: Record<string, string[]> = {
'live-database': ['postgres', 'mysql', 'snowflake', 'bigquery', 'clickhouse', 'sqlite', 'sqlserver'],
'live-database': ['postgres', 'mysql', 'snowflake', 'bigquery', 'clickhouse', 'sqlite', 'duckdb', 'sqlserver'],
dbt: ['dbt', 'dbt-core', 'dbt-cloud'],
notion: ['notion'],
metabase: ['metabase'],

View file

@ -688,7 +688,7 @@ describe('runKtxConnection', () => {
await initKtxProject({ projectDir });
await writeFile(
join(projectDir, 'ktx.yaml'),
'connections:\n mystery:\n driver: duckdb\n',
'connections:\n mystery:\n driver: nonsense\n',
'utf-8',
);
const io = makeIo();

View file

@ -0,0 +1,280 @@
import { DuckDBInstance } from '@duckdb/node-api';
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { pathToFileURL } from 'node:url';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import {
KtxDuckDbScanConnector,
duckDbDatabasePathFromConfig,
isKtxDuckDbConnectionConfig,
} from '../../../src/connectors/duckdb/connector.js';
import { tableRefSet } from '../../../src/context/scan/table-ref.js';
let dir: string;
let dbPath: string;
beforeAll(async () => {
dir = await mkdtemp(join(tmpdir(), 'ktx-duckdb-'));
dbPath = join(dir, 'warehouse.duckdb');
const instance = await DuckDBInstance.create(dbPath);
const connection = await instance.connect();
await connection.run('CREATE TABLE customers (id BIGINT PRIMARY KEY, name VARCHAR, big BIGINT)');
await connection.run(
`INSERT INTO customers VALUES (1, 'Ada', 9223372036854775807), (2, 'Lin', 10)`,
);
await connection.run('CREATE TABLE orders (id BIGINT, customer_id BIGINT REFERENCES customers(id))');
await connection.run('INSERT INTO orders VALUES (1, 1), (2, 2)');
// Composite primary key + composite foreign key, to exercise the parallel
// unnest() zip of constraint/referenced column names in readForeignKeys.
await connection.run('CREATE TABLE regions (country VARCHAR, code VARCHAR, PRIMARY KEY (country, code))');
await connection.run(
'CREATE TABLE stores (id BIGINT, country VARCHAR, code VARCHAR, FOREIGN KEY (country, code) REFERENCES regions(country, code))',
);
await connection.run('CREATE TABLE empty_table (id BIGINT)');
connection.closeSync();
instance.closeSync();
});
afterAll(async () => {
await rm(dir, { recursive: true, force: true });
});
function connector(connection: Record<string, unknown> = { driver: 'duckdb', path: dbPath }) {
return new KtxDuckDbScanConnector({ connectionId: 'warehouse', connection, projectDir: dir });
}
describe('isKtxDuckDbConnectionConfig', () => {
it('accepts duckdb driver, rejects others', () => {
expect(isKtxDuckDbConnectionConfig({ driver: 'duckdb' })).toBe(true);
expect(isKtxDuckDbConnectionConfig({ driver: 'sqlite' })).toBe(false);
});
});
describe('duckDbDatabasePathFromConfig', () => {
it('resolves a relative path against projectDir', () => {
const resolved = duckDbDatabasePathFromConfig({
connectionId: 'warehouse',
projectDir: dir,
connection: { driver: 'duckdb', path: 'warehouse.duckdb' },
});
expect(resolved).toBe(dbPath);
});
it('derives the path from a file: url', () => {
const resolved = duckDbDatabasePathFromConfig({
connectionId: 'warehouse',
connection: { driver: 'duckdb', url: pathToFileURL(dbPath).href },
});
expect(resolved).toBe(dbPath);
});
it('derives the path from a duckdb: url', () => {
const resolved = duckDbDatabasePathFromConfig({
connectionId: 'warehouse',
connection: { driver: 'duckdb', url: `duckdb://${dbPath}` },
});
expect(resolved).toBe(dbPath);
});
it('resolves an env: reference in path', () => {
process.env.KTX_TEST_DUCKDB_PATH = dbPath;
try {
const resolved = duckDbDatabasePathFromConfig({
connectionId: 'warehouse',
connection: { driver: 'duckdb', path: 'env:KTX_TEST_DUCKDB_PATH' },
});
expect(resolved).toBe(dbPath);
} finally {
delete process.env.KTX_TEST_DUCKDB_PATH;
}
});
it('rejects a non-duckdb driver', () => {
expect(() =>
duckDbDatabasePathFromConfig({
connectionId: 'warehouse',
connection: { driver: 'sqlite', path: dbPath },
}),
).toThrow(/cannot run driver "sqlite"/);
});
it('requires a path or url', () => {
expect(() =>
duckDbDatabasePathFromConfig({
connectionId: 'warehouse',
connection: { driver: 'duckdb' },
}),
).toThrow(/requires connections\.warehouse\.path or url/);
});
});
describe('KtxDuckDbScanConnector', () => {
it('testConnection succeeds for an existing file', async () => {
const c = connector();
expect(await c.testConnection()).toEqual({ success: true });
await c.cleanup();
});
it('testConnection fails (never creating) for a missing file', async () => {
const c = connector({ driver: 'duckdb', path: join(dir, 'absent.duckdb') });
const result = await c.testConnection();
expect(result.success).toBe(false);
await c.cleanup();
});
it('introspects main-schema tables, columns, and foreign keys', async () => {
const c = connector();
const snapshot = await c.introspect({ connectionId: 'warehouse', driver: 'duckdb' }, { runId: 't' });
const names = snapshot.tables.map((t) => t.name).sort();
expect(names).toEqual(['customers', 'empty_table', 'orders', 'regions', 'stores']);
const orders = snapshot.tables.find((t) => t.name === 'orders');
expect(orders?.foreignKeys[0]).toMatchObject({ fromColumn: 'customer_id', toTable: 'customers', toColumn: 'id' });
await c.cleanup();
});
it('maps a composite foreign key column-for-column to the referenced table', async () => {
const c = connector();
const snapshot = await c.introspect({ connectionId: 'warehouse', driver: 'duckdb' }, { runId: 't' });
const stores = snapshot.tables.find((t) => t.name === 'stores');
const fks = stores?.foreignKeys.map((fk) => ({ fromColumn: fk.fromColumn, toTable: fk.toTable, toColumn: fk.toColumn }));
expect(fks).toEqual([
{ fromColumn: 'country', toTable: 'regions', toColumn: 'country' },
{ fromColumn: 'code', toTable: 'regions', toColumn: 'code' },
]);
await c.cleanup();
});
it('lists tables', async () => {
const c = connector();
const tables = (await c.listTables()).map((t) => t.name).sort();
expect(tables).toEqual(['customers', 'empty_table', 'orders', 'regions', 'stores']);
await c.cleanup();
});
it('samples a table', async () => {
const c = connector();
const sample = await c.sampleTable(
{ connectionId: 'warehouse', table: { name: 'customers', catalog: null, db: null }, limit: 1 },
{ runId: 't' },
);
expect(sample.rows.length).toBe(1);
await c.cleanup();
});
it('stringifies BIGINT beyond 2^53 in read-only results', async () => {
const c = connector();
const result = await c.executeReadOnly(
{ connectionId: 'warehouse', sql: 'SELECT big FROM customers WHERE id = 1', maxRows: 10 },
{ runId: 't' },
);
expect(result.rows[0][0]).toBe('9223372036854775807');
await c.cleanup();
});
it('rejects non-read-only SQL', async () => {
const c = connector();
await expect(
c.executeReadOnly({ connectionId: 'warehouse', sql: 'DELETE FROM customers', maxRows: 10 }, { runId: 't' }),
).rejects.toThrow();
await c.cleanup();
});
it('returns distinct values under the cardinality cap', async () => {
const c = connector();
const distinct = await c.getColumnDistinctValues({ name: 'customers', catalog: null, db: null }, 'name', {
maxCardinality: 10,
limit: 10,
});
expect(distinct?.values?.sort()).toEqual(['Ada', 'Lin']);
await c.cleanup();
});
it('withholds values but reports the count when cardinality exceeds the cap', async () => {
const c = connector();
const distinct = await c.getColumnDistinctValues({ name: 'customers', catalog: null, db: null }, 'name', {
maxCardinality: 1,
limit: 10,
});
expect(distinct).toEqual({ values: null, cardinality: 2 });
await c.cleanup();
});
it('samples a single column, dropping null rows', async () => {
const c = connector();
const sample = await c.sampleColumn(
{ connectionId: 'warehouse', table: { name: 'customers', catalog: null, db: null }, column: 'name', limit: 10 },
{ runId: 't' },
);
expect(sample.values.sort()).toEqual(['Ada', 'Lin']);
expect(sample.nullCount).toBeNull();
await c.cleanup();
});
it('counts table rows', async () => {
const c = connector();
expect(await c.getTableRowCount('customers')).toBe(2);
await c.cleanup();
});
it('lists only the main schema and reports no column stats', async () => {
const c = connector();
expect(await c.listSchemas()).toEqual(['main']);
expect(await c.columnStats({ connectionId: 'warehouse', table: { name: 'customers', catalog: null, db: null }, column: 'id' }, { runId: 't' })).toBeNull();
await c.cleanup();
});
it('rejects operations for a mismatched connection id', async () => {
const c = connector();
await expect(
c.executeReadOnly({ connectionId: 'other', sql: 'SELECT 1', maxRows: 1 }, { runId: 't' }),
).rejects.toThrow(/cannot serve connection other/);
await c.cleanup();
});
it('exposes the dialect identifier quoting', () => {
expect(connector().quoteIdentifier('a"b')).toBe('"a""b"');
});
// Opening a connection must never create the file: the db() guard throws
// rather than letting DuckDBInstance.create() materialize a missing path.
it('refuses to open (never creating) a missing file when a query runs', async () => {
const c = connector({ driver: 'duckdb', path: join(dir, 'absent.duckdb') });
await expect(c.listTables()).rejects.toThrow(/File not found/);
await c.cleanup();
});
it('returns no tables for an empty table scope', async () => {
const c = connector();
const snapshot = await c.introspect(
{ connectionId: 'warehouse', driver: 'duckdb', tableScope: new Set() },
{ runId: 't' },
);
expect(snapshot.tables).toEqual([]);
await c.cleanup();
});
it('restricts introspection to the named tables in a non-empty scope', async () => {
const c = connector();
const snapshot = await c.introspect(
{
connectionId: 'warehouse',
driver: 'duckdb',
tableScope: tableRefSet([{ catalog: null, db: null, name: 'customers' }]),
},
{ runId: 't' },
);
expect(snapshot.tables.map((t) => t.name)).toEqual(['customers']);
await c.cleanup();
});
it('reports zero cardinality and an empty value list for an empty column', async () => {
const c = connector();
const distinct = await c.getColumnDistinctValues({ name: 'empty_table', catalog: null, db: null }, 'id', {
maxCardinality: 10,
limit: 10,
});
expect(distinct).toEqual({ values: [], cardinality: 0 });
await c.cleanup();
});
});

View file

@ -0,0 +1,108 @@
import { describe, expect, it } from 'vitest';
import { KtxDuckDbDialect } from '../../../src/connectors/duckdb/dialect.js';
describe('KtxDuckDbDialect', () => {
const dialect = new KtxDuckDbDialect();
it('quotes identifiers with double quotes and escapes embedded quotes', () => {
expect(dialect.quoteIdentifier('order"s')).toBe('"order""s"');
});
it('maps integer types to number dimension', () => {
expect(dialect.mapToDimensionType('BIGINT')).toBe('number');
expect(dialect.mapToDimensionType('DOUBLE')).toBe('number');
});
it('maps timestamp types to time dimension', () => {
expect(dialect.mapToDimensionType('TIMESTAMP')).toBe('time');
expect(dialect.mapToDimensionType('DATE')).toBe('time');
});
it('maps text types to string dimension', () => {
expect(dialect.mapToDimensionType('VARCHAR')).toBe('string');
});
it('maps boolean types to boolean dimension', () => {
expect(dialect.mapToDimensionType('BOOLEAN')).toBe('boolean');
expect(dialect.mapToDimensionType('BOOL')).toBe('boolean');
});
it('falls back to string for an empty or unknown native type', () => {
expect(dialect.mapToDimensionType('')).toBe('string');
expect(dialect.mapToDimensionType('JSON')).toBe('string');
});
// The precedence ladder strips parameters before substring rules fire, so a
// parameterized DECIMAL still resolves through the numeric branch rather than
// the string fallback.
it('strips type parameters before resolving the dimension', () => {
expect(dialect.mapToDimensionType('DECIMAL(10,2)')).toBe('number');
expect(dialect.mapToDimensionType('VARCHAR(255)')).toBe('string');
});
// Types absent from the exact-match table still resolve via substring rules:
// TIMESTAMP_NS (time), UINT128/HUGEINT-like (number), and lowercase input.
it('resolves unlisted types through substring matching, case-insensitively', () => {
expect(dialect.mapToDimensionType('timestamp_ns')).toBe('time');
expect(dialect.mapToDimensionType('INT128')).toBe('number');
expect(dialect.mapToDimensionType(' double ')).toBe('number');
});
it('generates a limited sample query', () => {
expect(dialect.generateSampleQuery('"t"', 5)).toBe('SELECT * FROM "t" LIMIT 5');
});
it('quotes selected columns in a sample query', () => {
expect(dialect.generateSampleQuery('"t"', 5, ['a', 'b'])).toBe('SELECT "a", "b" FROM "t" LIMIT 5');
});
it('builds a non-null, non-blank column sample query', () => {
expect(dialect.generateColumnSampleQuery('"t"', 'email', 3)).toBe(
`SELECT "email" FROM "t" WHERE "email" IS NOT NULL AND TRIM(CAST("email" AS VARCHAR)) != '' LIMIT 3`,
);
});
// A degenerate sample percentage (<=0 or >=1) means "no sampling", so both the
// random filter and the TABLESAMPLE clause must collapse to an empty string.
it('returns empty sample clauses outside the (0,1) range and real clauses inside it', () => {
expect(dialect.getRandomSampleFilter(0)).toBe('');
expect(dialect.getRandomSampleFilter(1)).toBe('');
expect(dialect.getRandomSampleFilter(0.25)).toBe('RANDOM() < 0.25');
expect(dialect.getTableSampleClause(0)).toBe('');
expect(dialect.getTableSampleClause(0.1)).toBe('USING SAMPLE 10 PERCENT (bernoulli)');
});
// A type missing from the exact-match table but containing BOOL still resolves
// through the substring branch rather than the string fallback.
it('resolves a BOOL-substring type to boolean', () => {
expect(dialect.mapToDimensionType('MYBOOL')).toBe('boolean');
});
it('builds limit/offset, sample-value aggregation, and randomized cardinality clauses', () => {
expect(dialect.getLimitOffsetClause(10, 5)).toContain('LIMIT 10');
expect(dialect.getSampleValueAggregation('SELECT 1')).toContain('STRING_AGG');
expect(dialect.generateRandomizedCardinalitySampleQuery('"t"', 'c', 100)).toContain('USING SAMPLE 100 ROWS');
});
it('exposes profiling expressions and a null column-statistics query', () => {
expect(dialect.getNullCountExpression('c')).toBe('SUM(CASE WHEN c IS NULL THEN 1 ELSE 0 END)');
expect(dialect.getDistinctCountExpression('c')).toBe('COUNT(DISTINCT c)');
expect(dialect.textLengthExpression('c')).toBe('LENGTH(CAST(c AS VARCHAR))');
expect(dialect.castToText('c')).toBe('CAST(c AS VARCHAR)');
expect(dialect.mapDataType('BIGINT')).toBe('BIGINT');
expect(dialect.getTopClause(5)).toBe('');
expect(dialect.generateColumnStatisticsQuery('main', 't')).toBeNull();
});
// Guards the single-namespace (db=null) display shape: v1 introspects only
// `main`, so a display ref must round-trip as a bare table name. An ANSI shape
// would emit a 1-part name it then refuses to parse, breaking column lookups.
it('round-trips a single-namespace display ref and reports a 1-part column shape', () => {
const table = { catalog: null, db: null, name: 'orders' };
const display = dialect.formatDisplayRef(table);
expect(display).toBe('orders');
expect(dialect.parseDisplayRef(display)).toMatchObject({ name: 'orders' });
expect(dialect.columnDisplayTablePartCount()).toBe(1);
expect(dialect.formatTableName(table)).toBe('"orders"');
});
});

View file

@ -135,6 +135,14 @@ describe('federatedAttachTarget', () => {
expect(target).toContain('ssl_mode=REQUIRED');
});
it('resolves a duckdb member to its database file path', () => {
const target = federatedAttachTarget(
{ connectionId: 'dux', driver: 'duckdb', projectDir: '/p', connection: { driver: 'duckdb', path: 'a.duckdb' } },
{},
);
expect(target).toBe('/p/a.duckdb');
});
it('throws for an unsupported driver', () => {
expect(() => federatedAttachTarget(member({ driver: 'snowflake', connection: { driver: 'snowflake' } }), {})).toThrow(
/cannot be attached/i,

View file

@ -67,4 +67,27 @@ describe('buildAttachStatements', () => {
);
expect(stmts.at(-1)).toBe('ATTACH \'postgresql://u:it\'\'s@h/db\' AS "pg" (TYPE postgres, READ_ONLY);');
});
it('attaches a native duckdb member with no TYPE and no INSTALL/LOAD', () => {
const statements = buildAttachStatements(
[{ connectionId: 'dux', driver: 'duckdb', projectDir: '/p', connection: { driver: 'duckdb', path: '/p/a.duckdb' } }],
{},
);
expect(statements.some((s) => s.startsWith('INSTALL'))).toBe(false);
expect(statements.find((s) => s.startsWith('ATTACH'))).toContain('(READ_ONLY)');
expect(statements.find((s) => s.startsWith('ATTACH'))).not.toContain('TYPE');
});
it('mixes a duckdb member with a postgres member, loading only postgres', () => {
const statements = buildAttachStatements(
[
{ connectionId: 'dux', driver: 'duckdb', projectDir: '/p', connection: { driver: 'duckdb', path: '/p/a.duckdb' } },
{ connectionId: 'pg', driver: 'postgres', projectDir: '/p', connection: { driver: 'postgres', url: 'postgres://h/db' } },
],
{},
);
expect(statements).toContain('INSTALL postgres; LOAD postgres;');
expect(statements.some((s) => s.includes('INSTALL duckdb'))).toBe(false);
expect(statements.filter((s) => s.startsWith('ATTACH')).length).toBe(2);
});
});

View file

@ -0,0 +1,45 @@
import { DuckDBInstance } from '@duckdb/node-api';
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import { createDuckDbLiveDatabaseIntrospection } from '../../../src/connectors/duckdb/live-database-introspection.js';
import { tableRefSet } from '../../../src/context/scan/table-ref.js';
let dir: string;
let dbPath: string;
beforeAll(async () => {
dir = await mkdtemp(join(tmpdir(), 'ktx-duckdb-live-'));
dbPath = join(dir, 'warehouse.duckdb');
const instance = await DuckDBInstance.create(dbPath);
const connection = await instance.connect();
await connection.run('CREATE TABLE customers (id BIGINT, name VARCHAR)');
await connection.run('CREATE TABLE orders (id BIGINT)');
connection.closeSync();
instance.closeSync();
});
afterAll(async () => {
await rm(dir, { recursive: true, force: true });
});
function port() {
return createDuckDbLiveDatabaseIntrospection({
projectDir: dir,
connections: { warehouse: { driver: 'duckdb', path: dbPath } },
});
}
describe('createDuckDbLiveDatabaseIntrospection', () => {
it('extracts the full schema for a connection', async () => {
const snapshot = await port().extractSchema('warehouse');
expect(snapshot.tables.map((t) => t.name).sort()).toEqual(['customers', 'orders']);
});
it('restricts extraction to a table scope', async () => {
const tableScope = tableRefSet([{ catalog: null, db: null, name: 'customers' }]);
const snapshot = await port().extractSchema('warehouse', { tableScope });
expect(snapshot.tables.map((t) => t.name)).toEqual(['customers']);
});
});

View file

@ -0,0 +1,17 @@
import { describe, expect, it } from 'vitest';
import { jsonSafeBigint, toJsonSafeRows } from '../../../src/connectors/shared/duckdb-json-safe.js';
describe('duckdb json-safe bigint', () => {
it('keeps safe-range bigints as numbers', () => {
expect(jsonSafeBigint(42n)).toBe(42);
});
it('stringifies bigints beyond Number.MAX_SAFE_INTEGER', () => {
const big = BigInt(Number.MAX_SAFE_INTEGER) + 10n;
expect(jsonSafeBigint(big)).toBe(big.toString());
});
it('converts only bigint cells in a row matrix', () => {
expect(toJsonSafeRows([[1n, 'a', null]])).toEqual([[1, 'a', null]]);
});
});

View file

@ -305,7 +305,7 @@ describe('getDialectForDriver', () => {
it('throws with a supported-driver list for unknown drivers', () => {
expect(() => getDialectForDriver('oracle')).toThrow(
'Unsupported driver "oracle". Supported drivers: bigquery, clickhouse, mongodb, mysql, postgres, snowflake, sqlite, sqlserver',
'Unsupported driver "oracle". Supported drivers: bigquery, clickhouse, duckdb, mongodb, mysql, postgres, snowflake, sqlite, sqlserver',
);
});

View file

@ -22,6 +22,7 @@ const connectionFixtures: Record<KtxConnectionDriver, FixtureFactory> = {
schemas: ['public'],
}),
sqlite: () => ({ driver: 'sqlite', path: 'warehouse.db' }),
duckdb: (projectDir) => ({ driver: 'duckdb', path: join(projectDir, 'warehouse.duckdb') }),
mongodb: () => ({
driver: 'mongodb',
url: 'mongodb://localhost:27017/app',
@ -101,6 +102,7 @@ describe('driverRegistrations', () => {
expect(listSupportedDrivers()).toEqual([
'bigquery',
'clickhouse',
'duckdb',
'mongodb',
'mysql',
'postgres',
@ -138,7 +140,7 @@ describe('driverRegistrations', () => {
expect(connector.listTables).toEqual(expect.any(Function));
await connector.cleanup?.();
if (registration.driver === 'sqlite') {
if (registration.driver === 'sqlite' || registration.driver === 'duckdb') {
expect(registration.scopeConfigKey).toBeNull();
} else {
expect(registration.scopeConfigKey).not.toBeNull();

View file

@ -33,8 +33,7 @@ describe('per-dialect SQL notes', () => {
});
it('does not author notes for unreachable dialects', () => {
// duckdb/databricks appear in the resolver map but no connector produces them.
expect(DIALECTS_WITH_NOTES).not.toContain('duckdb');
// databricks appears in the resolver map but no connector produces it.
expect(DIALECTS_WITH_NOTES).not.toContain('databricks');
});

View file

@ -92,7 +92,7 @@ describe('createKtxCliScanConnector', () => {
expect(bigQueryMock.constructorInputs[0]).not.toHaveProperty('maxBytesBilled');
});
it('rejects daemon-only fallback driver configs at config parse time', async () => {
it('resolves a duckdb connection to the DuckDB scan connector', async () => {
await initKtxProject({ projectDir: tempDir });
await writeFile(
join(tempDir, 'ktx.yaml'),
@ -105,10 +105,12 @@ describe('createKtxCliScanConnector', () => {
].join('\n'),
'utf-8',
);
const project = await loadKtxProject({ projectDir: tempDir });
await expect(loadKtxProject({ projectDir: tempDir })).rejects.toThrow(
/connections\.warehouse\.driver:.*Invalid discriminator value/,
);
const connector = await createKtxCliScanConnector(project, 'warehouse');
expect(connector.id).toBe('duckdb:warehouse');
expect(connector.driver).toBe('duckdb');
});
it('rejects connection blocks with no driver field at config parse time', async () => {

View file

@ -245,6 +245,7 @@ describe('setup databases step', () => {
{ value: 'sqlserver', label: 'SQL Server' },
{ value: 'mongodb', label: 'MongoDB' },
{ value: 'sqlite', label: 'SQLite' },
{ value: 'duckdb', label: 'DuckDB' },
],
required: true,
});
@ -3515,4 +3516,72 @@ describe('setup databases step', () => {
expect(io.stdout()).toContain('ktx cannot work until you add a database.');
expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:');
});
it('adds one non-interactive DuckDB connection from --database-url without prompting', async () => {
const io = makeIo();
const prompts = makePromptAdapter({});
const testConnection = vi.fn(async () => 0);
const scanConnection = vi.fn(async () => 0);
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
databaseDrivers: ['duckdb'],
databaseConnectionId: 'duckdb-local',
databaseUrl: './warehouse.duckdb',
databaseSchemas: [],
skipDatabases: false,
},
io.io,
{ prompts, testConnection, scanConnection },
);
expect(result.status).toBe('ready');
expect(prompts.text).not.toHaveBeenCalled();
expect(testConnection).toHaveBeenCalledWith(tempDir, 'duckdb-local', expect.anything());
expect(scanConnection).toHaveBeenCalledWith(tempDir, 'duckdb-local', expect.anything());
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections['duckdb-local']).toEqual({
driver: 'duckdb',
path: './warehouse.duckdb',
});
expect(config.setup).toEqual({
database_connection_ids: ['duckdb-local'],
});
expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases');
});
it('adds an interactive DuckDB connection without prompting for a schema', async () => {
const prompts = makePromptAdapter({
selectValues: ['no'],
textValues: ['', './warehouse.duckdb'],
});
const pickers = makePickerStubs();
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'auto',
databaseDrivers: ['duckdb'],
databaseSchemas: [],
skipDatabases: false,
},
makeIo().io,
{
prompts,
testConnection: vi.fn(async () => 0),
scanConnection: vi.fn(async () => 0),
pickDatabaseScope: pickers.pickDatabaseScope,
},
);
expect(result.status).toBe('ready');
expect(pickers.scopeCalls).toHaveLength(0);
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections['duckdb-local']).toEqual({
driver: 'duckdb',
path: './warehouse.duckdb',
});
});
});