From 3c4fcc27c790624d45ebaff23e84722d22b3573f Mon Sep 17 00:00:00 2001 From: Kevin Messiaen <114553769+kevinmessiaen@users.noreply.github.com> Date: Wed, 1 Jul 2026 19:06:02 +0700 Subject: [PATCH] feat: Add duckdb connector (#308) * refactor(duckdb): extract shared json-safe bigint helper Co-Authored-By: Claude Opus 4.8 * feat(duckdb): add and register the duckdb primary connector Add KtxDuckDbDialect, KtxDuckDbScanConnector (local file-backed, read-only, never-create, main-schema introspection via information_schema and duckdb_constraints() for foreign keys), and register the duckdb driver across the dialect factory, driver registry, connection-type enum, warehouse descriptor, config schema, scan normalization, connection test drivers, and status display. Co-Authored-By: Claude Opus 4.8 * feat(duckdb): route live-database ingest through the DuckDB connector Add the DuckDB live-database introspection bridge and dispatch duckdb connections to it in local-adapters, matching the SQLite path. Repoint the config-rejection test off duckdb (now a valid driver) onto the no-driver case. Co-Authored-By: Claude Opus 4.8 * feat(duckdb): add duckdb to the setup database flow Offer DuckDB in the interactive checklist and via ktx setup --database duckdb, with a file-path prompt and duckdb-local default connection id, parallel to SQLite. Co-Authored-By: Claude Opus 4.8 * feat(duckdb): attach native duckdb files in federation Native .duckdb members ATTACH with (READ_ONLY) and no TYPE/INSTALL/LOAD, since the duckdb format needs no extension. attachTypeForDriver returns null for the native case; buildAttachStatements builds load statements from non-null types only and emits a conditional ATTACH clause. Co-Authored-By: Claude Opus 4.8 * docs(duckdb): document the duckdb primary-source connector Add a DuckDB section to the primary-sources integration page (config, read-only never-create behavior, main-schema scope, federation) and update the supported-driver assertion in dialects.test.ts to include duckdb. Co-Authored-By: Claude Opus 4.8 * fix(duckdb): use single-namespace display shape for main-only refs DuckDB v1 introspects the main schema and sets db=null on every table, so its display refs are single-namespace like SQLite. The ansi shape emitted a 1-part table display it then refused to parse, breaking column-level display resolution. Switch the dialect to the sqlite display shape and add a round-trip test plus a composite-foreign-key test that were missing. Co-Authored-By: Claude Opus 4.8 * refactor(duckdb): resolve connector dialect via getDialectForDriver Route the connector's dialect through the shared factory like every other connector, now that duckdb is registered. Single construction path. Co-Authored-By: Claude Opus 4.8 * fix(duckdb): skip schema picker for single-file duckdb setup DuckDB is a single-file, single-namespace ('main') database like SQLite, but the setup scope step only skipped the schema picker for sqlite. DuckDB fell into the multi-schema path with an empty schema list, rendering a broken picker ("No matches found" for main). Extend the file-based-driver early-return to cover duckdb so it ingests every table directly. Co-Authored-By: Claude Opus 4.8 * refactor(duckdb): reuse shared config helper and derive scope skip Route duckdb path resolution through the shared resolveStringReference helper instead of a local third copy of env:/file: handling. Derive the setup scope-picker skip from SCOPE_DISCOVERY_SPECS membership rather than a hardcoded sqlite/duckdb driver list. Co-Authored-By: Claude Opus 4.8 * test(duckdb): use a genuinely unknown driver in the rejection test The merged "rejects unknown drivers" test used `driver: duckdb` as its unknown-driver stand-in, which stopped being unknown once this branch added the duckdb connector. Switch to `nonsense` so it again exercises the unsupported-driver config error. Co-Authored-By: Claude Opus 4.8 * test(duckdb): cover dialect, connector, and live-introspection branches Codecov flagged uncovered branches as dead code; all are real connector, dialect, and live-ingest behavior. Add unit tests instead of removing them. - dialect: precedence ladder, sample/clause builders, profiling expressions - connector: url/env config forms, error throws, never-create guard, cardinality cap branches, table-scope empty/non-empty paths - live-introspection: full-schema and table-scope extraction Functions 100%, lines ~99% across the duckdb connector dir. Co-Authored-By: Claude Opus 4.8 * docs: add DuckDB to supported-driver references The DuckDB connector PR documented the connector itself but left the scattered supported-driver enumerations stale. Add duckdb to the federation concept page (participation table, activation, table naming, limitations), the ktx setup CLI reference, the ktx.yaml warehouse-driver table, the primary-sources field reference, and the quickstart driver list (which also restores the missing ClickHouse entry). --------- Co-authored-by: Claude Opus 4.8 Co-authored-by: Andrey Avtomonov --- .../content/docs/cli-reference/ktx-setup.mdx | 4 +- .../concepts/cross-database-federation.mdx | 24 +- .../content/docs/configuration/ktx-yaml.mdx | 1 + .../docs/getting-started/quickstart.mdx | 3 +- .../docs/integrations/primary-sources.mdx | 52 ++- packages/cli/src/commands/setup-commands.ts | 1 + packages/cli/src/connection-drivers.ts | 1 + packages/cli/src/connection.ts | 1 + .../cli/src/connectors/duckdb/connector.ts | 395 ++++++++++++++++++ packages/cli/src/connectors/duckdb/dialect.ts | 192 +++++++++ .../src/connectors/duckdb/federated-attach.ts | 7 + .../connectors/duckdb/federated-executor.ts | 28 +- .../duckdb/live-database-introspection.ts | 40 ++ .../src/connectors/shared/duckdb-json-safe.ts | 14 + .../context/connections/connection-type.ts | 1 + .../cli/src/context/connections/dialects.ts | 2 + .../cli/src/context/connections/drivers.ts | 21 + .../cli/src/context/connections/federation.ts | 13 +- .../connections/local-warehouse-descriptor.ts | 1 + .../cli/src/context/project/driver-schemas.ts | 2 + packages/cli/src/context/scan/local-scan.ts | 3 +- packages/cli/src/context/scan/types.ts | 1 + .../src/context/sql-analysis/dialect-notes.ts | 4 +- .../context/sql-analysis/dialects/duckdb.md | 10 + packages/cli/src/local-adapters.ts | 9 + packages/cli/src/setup-databases.ts | 21 +- packages/cli/src/status-project.ts | 3 +- packages/cli/test/connection.test.ts | 2 +- .../test/connectors/duckdb/connector.test.ts | 280 +++++++++++++ .../test/connectors/duckdb/dialect.test.ts | 108 +++++ .../duckdb/federated-attach.test.ts | 8 + .../duckdb/federated-executor.test.ts | 23 + .../live-database-introspection.test.ts | 45 ++ .../shared/duckdb-json-safe.test.ts | 17 + .../test/context/connections/dialects.test.ts | 2 +- .../test/context/connections/drivers.test.ts | 4 +- .../test/context/mcp/dialect-notes.test.ts | 3 +- .../cli/test/local-scan-connectors.test.ts | 10 +- packages/cli/test/setup-databases.test.ts | 69 +++ 39 files changed, 1366 insertions(+), 59 deletions(-) create mode 100644 packages/cli/src/connectors/duckdb/connector.ts create mode 100644 packages/cli/src/connectors/duckdb/dialect.ts create mode 100644 packages/cli/src/connectors/duckdb/live-database-introspection.ts create mode 100644 packages/cli/src/connectors/shared/duckdb-json-safe.ts create mode 100644 packages/cli/src/context/sql-analysis/dialects/duckdb.md create mode 100644 packages/cli/test/connectors/duckdb/connector.test.ts create mode 100644 packages/cli/test/connectors/duckdb/dialect.test.ts create mode 100644 packages/cli/test/connectors/duckdb/live-database-introspection.test.ts create mode 100644 packages/cli/test/connectors/shared/duckdb-json-safe.test.ts diff --git a/docs-site/content/docs/cli-reference/ktx-setup.mdx b/docs-site/content/docs/cli-reference/ktx-setup.mdx index a424626f..d127525a 100644 --- a/docs-site/content/docs/cli-reference/ktx-setup.mdx +++ b/docs-site/content/docs/cli-reference/ktx-setup.mdx @@ -120,9 +120,9 @@ runtime features are missing. | Flag | Description | |------|-------------| -| `--database ` | Database driver to configure; repeatable. Choices: `sqlite`, `postgres`, `mysql`, `clickhouse`, `sqlserver`, `bigquery`, `snowflake` | +| `--database ` | Database driver to configure; repeatable. Choices: `sqlite`, `duckdb`, `postgres`, `mysql`, `clickhouse`, `sqlserver`, `bigquery`, `snowflake` | | `--database-connection-id ` | Existing selected connection id; repeatable. With `--database` or `--database-url`, connection id for the new connection. | -| `--database-url ` | URL, `env:NAME`, or `file:/path` for one new URL-style database connection; also used as the SQLite path | +| `--database-url ` | URL, `env:NAME`, or `file:/path` for one new URL-style database connection; also used as the SQLite or DuckDB path | | `--database-schema ` | Database schema or dataset to include; repeatable | | `--skip-databases` | Leave database setup incomplete | diff --git a/docs-site/content/docs/concepts/cross-database-federation.mdx b/docs-site/content/docs/concepts/cross-database-federation.mdx index ee3884f2..b4a055d1 100644 --- a/docs-site/content/docs/concepts/cross-database-federation.mdx +++ b/docs-site/content/docs/concepts/cross-database-federation.mdx @@ -1,6 +1,6 @@ --- title: Cross-database federation -description: How ktx federates postgres, mysql, and sqlite connections so a single read-only SQL query can join across them without copying data. +description: How ktx federates postgres, mysql, sqlite, and duckdb connections so a single read-only SQL query can join across them without copying data. --- Cross-database federation lets a single read-only SQL query join tables that @@ -20,13 +20,14 @@ block to add. With zero or one compatible connection the behavior is unchanged. ## Which connections participate -The v1 federation engine supports three drivers: +The v1 federation engine supports four drivers: | Driver | Participates in federation | |--------|---------------------------| | `postgres` | Yes | | `mysql` | Yes | | `sqlite` | Yes | +| `duckdb` | Yes | | `snowflake` | No — standalone connection | | `bigquery` | No — standalone connection | | `clickhouse` | No — standalone connection | @@ -38,7 +39,7 @@ queried independently; they do not appear as federation members. ## How it activates **ktx** inspects the connections in `ktx.yaml` at startup. When it finds two or -more connections whose driver is `postgres`, `mysql`, or `sqlite`, it +more connections whose driver is `postgres`, `mysql`, `sqlite`, or `duckdb`, it instantiates the DuckDB federation engine and attaches each one read-only. There is no `federation:` key, no opt-in flag, and no connection-level setting to enable. The engine is derived entirely from what is already declared. @@ -60,9 +61,10 @@ Two attach-compatible connections are present, so federation is active. ## Table naming in federated queries Inside a federated query, postgres and mysql tables use a three-part name: -`connectionId.schema.table`. SQLite tables, which have no schema layer in -DuckDB, use the two-part form `connectionId.table`. In both cases the -connection's `id` field in `ktx.yaml` becomes the catalog name inside DuckDB. +`connectionId.schema.table`. SQLite and DuckDB tables use the two-part form +`connectionId.table`, since ktx addresses both as single-namespace members. In +both cases the connection's `id` field in `ktx.yaml` becomes the catalog name +inside DuckDB. If a connection `id` is not a bare SQL identifier — for example it contains a hyphen, like `books-db` — double-quote it in the query the same way DuckDB @@ -131,8 +133,8 @@ ktx sql -c _ktx_federated \ Table names follow the rules from [Table naming in federated queries](#table-naming-in-federated-queries): three-part `connectionId.schema.table` for postgres and mysql, two-part -`connectionId.table` for sqlite. The `_ktx_federated` id is virtual — it is -never written to `ktx.yaml` and only exists when two or more attach-compatible +`connectionId.table` for sqlite and duckdb. The `_ktx_federated` id is virtual — +it is never written to `ktx.yaml` and only exists when two or more attach-compatible connections are declared. It surfaces in `ktx connection` and in the agent's connection list so the id is discoverable. Querying a single member database directly with its own connection id (`ktx sql -c pg_books ...`) is unchanged. @@ -149,6 +151,6 @@ database through the federation engine. them in a source's `joins:` block and automatic discovery of cross-database relationships are not available yet. Intra-database relationship discovery for each member connection is unchanged. -- **postgres, mysql, and sqlite only.** Other drivers (snowflake, bigquery, - clickhouse, sqlserver) do not participate in federation in this version. They - remain usable as standalone connections. +- **postgres, mysql, sqlite, and duckdb only.** Other drivers (snowflake, + bigquery, clickhouse, sqlserver) do not participate in federation in this + version. They remain usable as standalone connections. diff --git a/docs-site/content/docs/configuration/ktx-yaml.mdx b/docs-site/content/docs/configuration/ktx-yaml.mdx index 2ff54cbd..0f67877d 100644 --- a/docs-site/content/docs/configuration/ktx-yaml.mdx +++ b/docs-site/content/docs/configuration/ktx-yaml.mdx @@ -109,6 +109,7 @@ context-source drivers share the map. | `postgres` | Warehouse | `driver` | `url`, `enabled_tables`, `historicSql`, `context.queryHistory` | | `mysql` | Warehouse | `driver` | `url`, `enabled_tables` | | `sqlite` | Warehouse | `driver` | `url` or `path`, `enabled_tables` | +| `duckdb` | Warehouse | `driver` | `url` or `path`, `enabled_tables` | | `sqlserver` | Warehouse | `driver` | `url`, `enabled_tables` | | `bigquery` | Warehouse | `driver` | `credentials_json`, `dataset_ids`, `enabled_tables`, `historicSql` | | `snowflake` | Warehouse | `driver` | `schema_names`, `enabled_tables`, `historicSql` | diff --git a/docs-site/content/docs/getting-started/quickstart.mdx b/docs-site/content/docs/getting-started/quickstart.mdx index 2b8cbdfb..16669794 100644 --- a/docs-site/content/docs/getting-started/quickstart.mdx +++ b/docs-site/content/docs/getting-started/quickstart.mdx @@ -218,7 +218,8 @@ The wizard walks you through everything **ktx** needs in one pass: 3. **Embeddings** - picks an embeddings backend. Choose OpenAI for hosted embeddings or `sentence-transformers` to run locally without an API key. 4. **Database** - adds at least one primary connection. Supported drivers: - SQLite, PostgreSQL, MySQL, SQL Server, BigQuery, and Snowflake. + PostgreSQL, Snowflake, BigQuery, MySQL, ClickHouse, SQL Server, SQLite, and + DuckDB. 5. **Context sources** - optionally adds dbt, MetricFlow, LookML, Looker, Metabase, or Notion. You can skip and add them later. 6. **Build** - offers to run the first ingest so semantic sources and wiki diff --git a/docs-site/content/docs/integrations/primary-sources.mdx b/docs-site/content/docs/integrations/primary-sources.mdx index 40af30e0..270275f1 100644 --- a/docs-site/content/docs/integrations/primary-sources.mdx +++ b/docs-site/content/docs/integrations/primary-sources.mdx @@ -1,6 +1,6 @@ --- title: Primary Sources -description: Connect ktx to PostgreSQL, Snowflake, BigQuery, MySQL, ClickHouse, SQL Server, SQLite, or MongoDB. +description: Connect ktx to PostgreSQL, Snowflake, BigQuery, MySQL, ClickHouse, SQL Server, SQLite, DuckDB, or MongoDB. --- **ktx** connects to your data warehouse or database to build schema context, @@ -26,14 +26,14 @@ Agents should prefer environment or file references over literal secrets. | Field | Required | Applies to | Description | |-------|----------|------------|-------------| -| `driver` | Yes | all connections | Connector driver such as `postgres`, `snowflake`, `bigquery`, `mysql`, `clickhouse`, `sqlserver`, `sqlite`, or `mongodb` | +| `driver` | Yes | all connections | Connector driver such as `postgres`, `snowflake`, `bigquery`, `mysql`, `clickhouse`, `sqlserver`, `sqlite`, `duckdb`, or `mongodb` | | `url` | One of the connection methods | URL-style connectors | Database URL, `env:NAME`, or `file:/path/to/secret` | | `host`, `port`, `database`, `username`, `password` | One of the connection methods | PostgreSQL, MySQL, SQL Server | Field-by-field connection values | | `schema` or `schemas` | No | schema-aware warehouses | Single schema or list of schemas to scan | | `databases` | No | ClickHouse, MongoDB | List of databases to scan | | `sample_size`, `order_by` | No | MongoDB | Schema-inference sampling controls (recent documents, sort field) | | `context.queryHistory` | No | PostgreSQL, Snowflake, BigQuery | Enables query-history ingestion when the warehouse supports it | -| `path` | Yes for path-style SQLite | SQLite | Local SQLite database path or `env:NAME` reference | +| `path` | Yes for path-style SQLite/DuckDB | SQLite, DuckDB | Local SQLite or DuckDB database path or `env:NAME` reference | | `max_bytes_billed` | No | BigQuery | Maximum bytes billed per query job | | `query_timeout_ms` | No | all warehouses | Maximum execution time for a single read-only query, in milliseconds (default 30000). A query exceeding it is cancelled server-side (or, for SQLite, by terminating the off-process executor) and returns a `query exceeded Ns` error so the agent can revise. | | `project_id` | No | BigQuery | Optional local descriptor and mapping metadata; not used for BigQuery authentication | @@ -545,6 +545,52 @@ No authentication required - SQLite is file-based. The file must be readable by --- +## DuckDB + +File-based connector using the DuckDB Node.js API. Ideal for local analytics, embedded warehouses, and cross-database federation. + +### Connection config + +```yaml title="ktx.yaml" +connections: + warehouse: + driver: duckdb + path: data/warehouse.duckdb +``` + +`path` is resolved relative to the project directory. The `.duckdb` file must already exist — **ktx** never creates a missing database file. + +### Authentication + +No authentication required — DuckDB is file-based. The `.duckdb` file must be readable by the process running **ktx**. + +### Features + +| Feature | Supported | Notes | +|---------|-----------|-------| +| Tables & views | Yes | Via `information_schema` on the `main` schema | +| Primary keys | Yes | Via `information_schema.table_constraints` | +| Foreign keys | Yes | Via DuckDB's `duckdb_constraints()` catalog function | +| Row count estimates | Yes | Exact count via `SELECT COUNT(*)` | +| Column statistics | No | - | +| Query history | No | - | +| Table sampling | Yes | - | +| Nested analysis | No | - | + +### Dialect notes + +- Introspection scans the `main` schema only +- Execution is read-only; **ktx** opens the file without write access +- Parameter binding uses positional `?` placeholders +- Uses `LIMIT X OFFSET Y` for pagination +- Database file must exist before `ktx connection test` or ingest runs + +### Cross-database federation + +When a project declares two or more attach-compatible connections — any combination of `postgres`, `mysql`, `sqlite`, and `duckdb` — **ktx** derives a cross-database federation connection. That connection can ATTACH a native `.duckdb` file, allowing semantic queries to join across sources without manually copying data. + +--- + ## MongoDB Connects to MongoDB as a primary context source. **ktx** treats each collection diff --git a/packages/cli/src/commands/setup-commands.ts b/packages/cli/src/commands/setup-commands.ts index ff8f015d..d838e472 100644 --- a/packages/cli/src/commands/setup-commands.ts +++ b/packages/cli/src/commands/setup-commands.ts @@ -38,6 +38,7 @@ function llmBackend(value: string): KtxSetupLlmBackend { function databaseDriver(value: string): KtxSetupDatabaseDriver { if ( value === 'sqlite' || + value === 'duckdb' || value === 'postgres' || value === 'mysql' || value === 'clickhouse' || diff --git a/packages/cli/src/connection-drivers.ts b/packages/cli/src/connection-drivers.ts index 511637f5..836c8458 100644 --- a/packages/cli/src/connection-drivers.ts +++ b/packages/cli/src/connection-drivers.ts @@ -3,6 +3,7 @@ import type { KtxProjectConnectionConfig } from './context/project/config.js'; /** @internal Canonical SQL-warehouse driver ids; the dialect-notes coverage test derives its required coverage from this set. */ export const KTX_DATABASE_DRIVER_IDS = [ 'sqlite', + 'duckdb', 'postgres', 'mysql', 'clickhouse', diff --git a/packages/cli/src/connection.ts b/packages/cli/src/connection.ts index f134a4a4..809eea96 100644 --- a/packages/cli/src/connection.ts +++ b/packages/cli/src/connection.ts @@ -51,6 +51,7 @@ export interface KtxConnectionDeps { const SUPPORTED_TEST_DRIVERS = [ 'sqlite', + 'duckdb', 'postgres', 'mysql', 'clickhouse', diff --git a/packages/cli/src/connectors/duckdb/connector.ts b/packages/cli/src/connectors/duckdb/connector.ts new file mode 100644 index 00000000..5b904c14 --- /dev/null +++ b/packages/cli/src/connectors/duckdb/connector.ts @@ -0,0 +1,395 @@ +import { DuckDBInstance, type DuckDBConnection } from '@duckdb/node-api'; +import { existsSync, statSync } from 'node:fs'; +import { isAbsolute, resolve } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { resolveStringReference } from '../shared/string-reference.js'; +import { getSqlDialectForDriver } from '../../context/connections/dialects.js'; +import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; +import { normalizeQueryRows } from '../../context/connections/query-executor.js'; +import { toJsonSafeRows } from '../shared/duckdb-json-safe.js'; +import { + connectorTestFailure, + createKtxConnectorCapabilities, + type KtxColumnSampleInput, + type KtxColumnSampleResult, + type KtxColumnStatsInput, + type KtxColumnStatsResult, + type KtxConnectorTestResult, + type KtxQueryResult, + type KtxReadOnlyQueryInput, + type KtxScanConnector, + type KtxScanContext, + type KtxScanInput, + type KtxSchemaForeignKey, + type KtxSchemaSnapshot, + type KtxSchemaTable, + type KtxTableListEntry, + type KtxTableRef, + type KtxTableSampleInput, + type KtxTableSampleResult, +} from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; + +const MAIN_SCHEMA = 'main'; + +export interface KtxDuckDbConnectionConfig { + driver?: string; + path?: string; + url?: string; + [key: string]: unknown; +} + +/** @internal */ +export interface DuckDbDatabasePathInput { + connectionId: string; + projectDir?: string; + connection: KtxDuckDbConnectionConfig | undefined; +} + +export interface KtxDuckDbScanConnectorOptions extends DuckDbDatabasePathInput { + now?: () => Date; +} + +export interface KtxDuckDbColumnDistinctValuesOptions { + maxCardinality: number; + limit: number; + sampleSize?: number; +} + +export interface KtxDuckDbColumnDistinctValuesResult { + values: string[] | null; + cardinality: number; +} + +interface InfoSchemaTableRow { + table_name: string; + table_type: string; +} + +interface InfoSchemaColumnRow { + column_name: string; + data_type: string; + is_nullable: string; +} + +// `path` may be an env:/file: reference; `url` resolves env: only, since file: +// on a url is a native URI form (handled by duckDbPathFromUrl), not a file read. +function stringConfigValue( + connection: KtxDuckDbConnectionConfig | undefined, + key: 'path' | 'url', +): string | undefined { + const value = connection?.[key]; + if (typeof value !== 'string' || value.trim().length === 0) { + return undefined; + } + const trimmed = value.trim(); + if (key === 'url') { + return trimmed.startsWith('env:') ? (process.env[trimmed.slice('env:'.length)] ?? '') : trimmed; + } + return resolveStringReference(trimmed, process.env); +} + +function duckDbPathFromUrl(url: string): string { + if (url.startsWith('file:')) { + return fileURLToPath(url); + } + if (url.startsWith('duckdb:')) { + const parsed = new URL(url); + return decodeURIComponent(parsed.pathname); + } + return url; +} + +export function isKtxDuckDbConnectionConfig( + connection: KtxDuckDbConnectionConfig | undefined, +): connection is KtxDuckDbConnectionConfig { + return String(connection?.driver ?? '').toLowerCase() === 'duckdb'; +} + +/** @internal */ +export function duckDbDatabasePathFromConfig(input: DuckDbDatabasePathInput): string { + const inputDriver = input.connection?.driver ?? 'unknown'; + if (!isKtxDuckDbConnectionConfig(input.connection)) { + throw new Error(`Native DuckDB connector cannot run driver "${inputDriver}"`); + } + const configuredPath = + stringConfigValue(input.connection, 'path') ?? duckDbPathFromUrl(stringConfigValue(input.connection, 'url') ?? ''); + if (!configuredPath) { + throw new Error(`Native DuckDB connector requires connections.${input.connectionId}.path or url`); + } + return isAbsolute(configuredPath) ? configuredPath : resolve(input.projectDir ?? process.cwd(), configuredPath); +} + +export class KtxDuckDbScanConnector implements KtxScanConnector { + readonly id: string; + readonly driver = 'duckdb' as const; + readonly capabilities = createKtxConnectorCapabilities({ + tableSampling: true, + columnSampling: true, + columnStats: false, + readOnlySql: true, + nestedAnalysis: false, + formalForeignKeys: true, + estimatedRowCounts: true, + }); + + private readonly connectionId: string; + private readonly dbPath: string; + private readonly now: () => Date; + private readonly dialect = getSqlDialectForDriver('duckdb'); + private instance: DuckDBInstance | null = null; + private connection: DuckDBConnection | null = null; + + constructor(options: KtxDuckDbScanConnectorOptions) { + this.connectionId = options.connectionId; + this.dbPath = duckDbDatabasePathFromConfig(options); + this.now = options.now ?? (() => new Date()); + this.id = `duckdb:${options.connectionId}`; + } + + async testConnection(): Promise { + try { + if (!existsSync(this.dbPath) || !statSync(this.dbPath).isFile()) { + return { success: false, error: `File not found: ${this.dbPath}` }; + } + await this.query('SELECT 1'); + return { success: true }; + } catch (error) { + return connectorTestFailure(error); + } + } + + async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: null }) : null; + const tableRows = await this.readTableRows(scopedNames); + const tables: KtxSchemaTable[] = []; + for (const row of tableRows) { + tables.push(await this.readTable(row)); + } + return { + connectionId: this.connectionId, + driver: 'duckdb' as const, + extractedAt: this.now().toISOString(), + scope: {}, + metadata: { + file_path: this.dbPath, + table_count: tables.length, + total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0), + }, + tables, + }; + } + + async listSchemas(): Promise { + return [MAIN_SCHEMA]; + } + + async listTables(_schemas?: string[]): Promise { + const rows = await this.readTableRows(null); + return rows.map((row) => ({ + catalog: null, + schema: MAIN_SCHEMA, + name: row.table_name, + kind: row.table_type === 'VIEW' ? ('view' as const) : ('table' as const), + })); + } + + async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const result = await this.query( + this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns), + ); + return { headers: result.headers, rows: result.rows, totalRows: result.totalRows }; + } + + async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const result = await this.query( + this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit), + ); + const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]); + return { values, nullCount: null, distinctCount: null }; + } + + async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise { + return null; + } + + async executeReadOnly(input: KtxReadOnlyQueryInput, _ctx: KtxScanContext): Promise { + this.assertConnection(input.connectionId); + const result = await this.query(limitSqlForExecution(input.sql, input.maxRows)); + return { ...result, rowCount: result.rows.length }; + } + + async getColumnDistinctValues( + table: KtxTableRef, + columnName: string, + options: KtxDuckDbColumnDistinctValuesOptions, + ): Promise { + const sampleSize = options.sampleSize ?? 10000; + const tableName = this.qTableName(table); + const quotedColumn = this.dialect.quoteIdentifier(columnName); + const cardinalityResult = await this.query( + this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize), + ); + if (cardinalityResult.rows.length === 0) { + return null; + } + const cardinality = Number(cardinalityResult.rows[0][0]); + if (Number.isNaN(cardinality)) { + return null; + } + if (cardinality === 0) { + return { values: [], cardinality: 0 }; + } + if (cardinality > options.maxCardinality) { + return { values: null, cardinality }; + } + const valuesResult = await this.query( + this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit), + ); + return { + values: valuesResult.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => String(row[0])), + cardinality, + }; + } + + async getTableRowCount(tableName: string): Promise { + const result = await this.query(`SELECT COUNT(*) AS count FROM ${this.dialect.quoteIdentifier(tableName)}`); + return Number(result.rows[0]?.[0] ?? 0); + } + + qTableName(table: Pick): string { + return this.dialect.formatTableName(table); + } + + quoteIdentifier(identifier: string): string { + return this.dialect.quoteIdentifier(identifier); + } + + async cleanup(): Promise { + this.connection?.closeSync(); + this.instance?.closeSync(); + this.connection = null; + this.instance = null; + } + + private async db(): Promise { + if (!this.connection) { + // DuckDBInstance.create() creates the file if missing, so this pre-check + // enforces the never-create rule. Do not remove it. + if (!existsSync(this.dbPath) || !statSync(this.dbPath).isFile()) { + throw new Error(`File not found: ${this.dbPath}`); + } + this.instance = await DuckDBInstance.create(this.dbPath, { access_mode: 'read_only' }); + this.connection = await this.instance.connect(); + } + return this.connection; + } + + private async query(sql: string): Promise> { + const connection = await this.db(); + const reader = await connection.runAndReadAll(assertReadOnlySql(sql)); + const rows = toJsonSafeRows(normalizeQueryRows(reader.getRows())); + return { + headers: reader.columnNames(), + rows, + totalRows: rows.length, + }; + } + + private async readTableRows(scopedNames: string[] | null): Promise { + if (scopedNames && scopedNames.length === 0) { + return []; + } + const scopeClause = scopedNames + ? `AND table_name IN (${scopedNames.map((name) => `'${name.replaceAll("'", "''")}'`).join(', ')})` + : ''; + const result = await this.query( + `SELECT table_name, table_type + FROM information_schema.tables + WHERE table_schema = '${MAIN_SCHEMA}' ${scopeClause} + ORDER BY table_name`, + ); + return result.rows.map((row) => ({ table_name: String(row[0]), table_type: String(row[1]) })); + } + + private async readTable(table: InfoSchemaTableRow): Promise { + const columnsResult = await this.query( + `SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_schema = '${MAIN_SCHEMA}' AND table_name = '${table.table_name.replaceAll("'", "''")}' + ORDER BY ordinal_position`, + ); + const columns = columnsResult.rows.map((row) => ({ + column_name: String(row[0]), + data_type: String(row[1]), + is_nullable: String(row[2]), + })); + const primaryKeys = await this.readPrimaryKeyColumns(table.table_name); + const isView = table.table_type === 'VIEW'; + const estimatedRows = isView ? null : await this.getTableRowCount(table.table_name); + return { + catalog: null, + db: null, + name: table.table_name, + kind: isView ? 'view' : 'table', + comment: null, + estimatedRows, + columns: columns.map((column) => ({ + name: column.column_name, + nativeType: column.data_type, + normalizedType: this.dialect.mapDataType(column.data_type), + dimensionType: this.dialect.mapToDimensionType(column.data_type), + nullable: column.is_nullable === 'YES' && !primaryKeys.has(column.column_name), + primaryKey: primaryKeys.has(column.column_name), + comment: null, + })), + foreignKeys: await this.readForeignKeys(table.table_name), + }; + } + + private async readPrimaryKeyColumns(tableName: string): Promise> { + const result = await this.query( + `SELECT kcu.column_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema + WHERE tc.table_schema = '${MAIN_SCHEMA}' + AND tc.table_name = '${tableName.replaceAll("'", "''")}' + AND tc.constraint_type = 'PRIMARY KEY'`, + ); + return new Set(result.rows.map((row) => String(row[0]))); + } + + private async readForeignKeys(tableName: string): Promise { + // information_schema.constraint_column_usage in DuckDB returns the constrained + // columns (source), not the referenced columns. Use duckdb_constraints() which + // exposes constraint_column_names and referenced_column_names directly. + const result = await this.query( + `SELECT unnest(constraint_column_names) AS from_column, + referenced_table, + unnest(referenced_column_names) AS to_column, + constraint_name + FROM duckdb_constraints() + WHERE schema_name = '${MAIN_SCHEMA}' + AND table_name = '${tableName.replaceAll("'", "''")}' + AND constraint_type = 'FOREIGN KEY'`, + ); + return result.rows.map((row) => ({ + fromColumn: String(row[0]), + toCatalog: null, + toDb: null, + toTable: String(row[1]), + toColumn: String(row[2]), + constraintName: row[3] === null ? null : String(row[3]), + })); + } + + private assertConnection(connectionId: string): void { + if (connectionId !== this.connectionId) { + throw new Error(`ktx DuckDB connector ${this.id} cannot serve connection ${connectionId}`); + } + } +} diff --git a/packages/cli/src/connectors/duckdb/dialect.ts b/packages/cli/src/connectors/duckdb/dialect.ts new file mode 100644 index 00000000..9fef67dd --- /dev/null +++ b/packages/cli/src/connectors/duckdb/dialect.ts @@ -0,0 +1,192 @@ +import type { KtxSqlDialect } from '../../context/connections/dialects.js'; +import { + columnDisplayPartCount, + formatDialectDisplayRef, + formatDialectTableName, + limitOffsetClause, + parseDialectDisplayRef, +} from '../../context/connections/dialect-helpers.js'; +import type { KtxSchemaDimensionType, KtxTableRef } from '../../context/scan/types.js'; + +type DuckDbTableNameRef = Pick & Partial>; + +/** @internal */ +export class KtxDuckDbDialect implements KtxSqlDialect { + readonly type = 'duckdb' as const; + + private readonly typeMappings: Record = { + TIMESTAMP: 'time', + 'TIMESTAMP WITH TIME ZONE': 'time', + TIMESTAMPTZ: 'time', + DATE: 'time', + TIME: 'time', + BIGINT: 'number', + INTEGER: 'number', + SMALLINT: 'number', + TINYINT: 'number', + HUGEINT: 'number', + UBIGINT: 'number', + UINTEGER: 'number', + DECIMAL: 'number', + NUMERIC: 'number', + REAL: 'number', + FLOAT: 'number', + DOUBLE: 'number', + VARCHAR: 'string', + CHAR: 'string', + TEXT: 'string', + BLOB: 'string', + UUID: 'string', + BOOLEAN: 'boolean', + BOOL: 'boolean', + }; + + quoteIdentifier(identifier: string): string { + return `"${identifier.replace(/"/g, '""')}"`; + } + + // v1 introspects the `main` schema only and sets db=null on every table, so + // refs are single-namespace like SQLite — use the matching display shape. + formatTableName(table: DuckDbTableNameRef): string { + return formatDialectTableName(table, this.quoteIdentifier.bind(this), 'sqlite'); + } + + formatDisplayRef(table: DuckDbTableNameRef): string { + return formatDialectDisplayRef(table, 'sqlite'); + } + + parseDisplayRef(display: string): KtxTableRef | null { + return parseDialectDisplayRef(display, 'sqlite'); + } + + columnDisplayTablePartCount(): 1 | 2 | 3 { + return columnDisplayPartCount('sqlite'); + } + + mapDataType(nativeType: string): string { + return nativeType; + } + + mapToDimensionType(nativeType: string): KtxSchemaDimensionType { + if (!nativeType) { + return 'string'; + } + let normalized = nativeType.toUpperCase().trim(); + if (normalized.includes('(')) { + normalized = normalized.split('(')[0].trim(); + } + if (this.typeMappings[normalized]) { + return this.typeMappings[normalized]; + } + if (normalized.includes('TIME') || normalized.includes('DATE')) { + return 'time'; + } + if ( + normalized.includes('INT') || + normalized.includes('DEC') || + normalized.includes('NUM') || + normalized.includes('REAL') || + normalized.includes('FLOAT') || + normalized.includes('DOUBLE') + ) { + return 'number'; + } + if (normalized.includes('BOOL')) { + return 'boolean'; + } + return 'string'; + } + + generateSampleQuery(tableName: string, limit: number, columns?: string[]): string { + const columnList = + columns && columns.length > 0 ? columns.map((column) => this.quoteIdentifier(column)).join(', ') : '*'; + return `SELECT ${columnList} FROM ${tableName} LIMIT ${limit}`; + } + + generateColumnSampleQuery(tableName: string, columnName: string, limit: number): string { + const quoted = this.quoteIdentifier(columnName); + return `SELECT ${quoted} FROM ${tableName} WHERE ${quoted} IS NOT NULL AND TRIM(CAST(${quoted} AS VARCHAR)) != '' LIMIT ${limit}`; + } + + getRandomSampleFilter(samplePct: number): string { + if (samplePct <= 0 || samplePct >= 1) { + return ''; + } + return `RANDOM() < ${samplePct}`; + } + + getTableSampleClause(samplePct: number): string { + if (samplePct <= 0 || samplePct >= 1) { + return ''; + } + return `USING SAMPLE ${Math.round(samplePct * 100)} PERCENT (bernoulli)`; + } + + getLimitOffsetClause(limit: number, offset?: number): string { + return limitOffsetClause(limit, offset); + } + + getTopClause(_limit: number): string { + return ''; + } + + getNullCountExpression(column: string): string { + return `SUM(CASE WHEN ${column} IS NULL THEN 1 ELSE 0 END)`; + } + + getDistinctCountExpression(column: string): string { + return `COUNT(DISTINCT ${column})`; + } + + textLengthExpression(columnSql: string): string { + return `LENGTH(CAST(${columnSql} AS VARCHAR))`; + } + + castToText(columnSql: string): string { + return `CAST(${columnSql} AS VARCHAR)`; + } + + getSampleValueAggregation(innerSql: string): string { + return `(SELECT STRING_AGG(CAST(value AS VARCHAR), chr(31)) FROM (${innerSql}) AS relationship_profile_values)`; + } + + generateCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string { + return ` + WITH sampled AS ( + SELECT ${columnName} AS val + FROM ${tableName} + WHERE ${columnName} IS NOT NULL + LIMIT ${sampleSize} + ) + SELECT COUNT(DISTINCT val) AS cardinality + FROM sampled + `; + } + + generateDistinctValuesQuery(tableName: string, columnName: string, limit: number): string { + return ` + SELECT DISTINCT CAST(${columnName} AS VARCHAR) AS val + FROM ${tableName} + WHERE ${columnName} IS NOT NULL + ORDER BY val + LIMIT ${limit} + `; + } + + generateColumnStatisticsQuery(_schemaName: string, _tableName: string): string | null { + return null; + } + + generateRandomizedCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string { + return ` + WITH sampled AS ( + SELECT ${columnName} AS val + FROM ${tableName} + WHERE ${columnName} IS NOT NULL + USING SAMPLE ${sampleSize} ROWS + ) + SELECT COUNT(DISTINCT val) AS cardinality + FROM sampled + `; + } +} diff --git a/packages/cli/src/connectors/duckdb/federated-attach.ts b/packages/cli/src/connectors/duckdb/federated-attach.ts index edcb94eb..c0b9cda0 100644 --- a/packages/cli/src/connectors/duckdb/federated-attach.ts +++ b/packages/cli/src/connectors/duckdb/federated-attach.ts @@ -4,6 +4,7 @@ import { mysqlConnectionPoolConfigFromConfig, type KtxMysqlConnectionConfig, } from '../mysql/connector.js'; +import { duckDbDatabasePathFromConfig, type KtxDuckDbConnectionConfig } from '../../connectors/duckdb/connector.js'; import type { FederatedMember } from '../../context/connections/federation.js'; function kvKeyword(value: string): string { @@ -74,6 +75,12 @@ function mysqlAttachString(member: FederatedMember, env: NodeJS.ProcessEnv): str */ export function federatedAttachTarget(member: FederatedMember, env: NodeJS.ProcessEnv): string { switch (member.driver.toLowerCase()) { + case 'duckdb': + return duckDbDatabasePathFromConfig({ + connectionId: member.connectionId, + projectDir: member.projectDir, + connection: member.connection as KtxDuckDbConnectionConfig, + }); case 'sqlite': return sqliteDatabasePathFromConfig({ connectionId: member.connectionId, diff --git a/packages/cli/src/connectors/duckdb/federated-executor.ts b/packages/cli/src/connectors/duckdb/federated-executor.ts index 7972166c..58894cc4 100644 --- a/packages/cli/src/connectors/duckdb/federated-executor.ts +++ b/packages/cli/src/connectors/duckdb/federated-executor.ts @@ -7,25 +7,12 @@ import type { import { normalizeQueryRows } from '../../context/connections/query-executor.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { attachTypeForDriver, type FederatedMember } from '../../context/connections/federation.js'; +import { toJsonSafeRows } from '../shared/duckdb-json-safe.js'; function quoteDuckdbIdentifier(id: string): string { return `"${id.replaceAll('"', '""')}"`; } -const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER); -const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER); - -// DuckDB returns integer columns as JS bigint (unserializable by JSON). Values -// in Number's safe range become Number; larger magnitudes become strings so a -// BIGINT beyond 2^53 keeps its exact value instead of silently rounding. -function jsonSafeBigint(value: bigint): number | string { - return value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT ? Number(value) : value.toString(); -} - -function toJsonSafeRows(rows: unknown[][]): unknown[][] { - return rows.map((row) => row.map((cell) => (typeof cell === 'bigint' ? jsonSafeBigint(cell) : cell))); -} - /** @internal */ export function buildAttachStatements(members: FederatedMember[], env: NodeJS.ProcessEnv): string[] { const attachments = members.map((member) => ({ @@ -34,13 +21,12 @@ export function buildAttachStatements(members: FederatedMember[], env: NodeJS.Pr alias: member.connectionId, })); - const loadStatements = [...new Set(attachments.map((a) => a.type))].map( - (type) => `INSTALL ${type}; LOAD ${type};`, - ); - const attachStatements = attachments.map( - ({ type, url, alias }) => - `ATTACH '${url.replaceAll("'", "''")}' AS ${quoteDuckdbIdentifier(alias)} (TYPE ${type}, READ_ONLY);`, - ); + const loadTypes = [...new Set(attachments.map((a) => a.type).filter((type): type is string => type !== null))]; + const loadStatements = loadTypes.map((type) => `INSTALL ${type}; LOAD ${type};`); + const attachStatements = attachments.map(({ type, url, alias }) => { + const options = type === null ? 'READ_ONLY' : `TYPE ${type}, READ_ONLY`; + return `ATTACH '${url.replaceAll("'", "''")}' AS ${quoteDuckdbIdentifier(alias)} (${options});`; + }); return [...loadStatements, ...attachStatements]; } diff --git a/packages/cli/src/connectors/duckdb/live-database-introspection.ts b/packages/cli/src/connectors/duckdb/live-database-introspection.ts new file mode 100644 index 00000000..4edded29 --- /dev/null +++ b/packages/cli/src/connectors/duckdb/live-database-introspection.ts @@ -0,0 +1,40 @@ +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; +import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; +import { KtxDuckDbScanConnector, type KtxDuckDbConnectionConfig } from './connector.js'; + +export interface CreateDuckDbLiveDatabaseIntrospectionOptions { + projectDir?: string; + connections: Record; + now?: () => Date; +} + +export function createDuckDbLiveDatabaseIntrospection( + options: CreateDuckDbLiveDatabaseIntrospectionOptions, +): LiveDatabaseIntrospectionPort { + return { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { + const connection = options.connections[connectionId] as KtxDuckDbConnectionConfig | undefined; + const connector = new KtxDuckDbScanConnector({ + connectionId, + connection, + projectDir: options.projectDir, + now: options.now, + }); + try { + return await connector.introspect( + { + connectionId, + driver: 'duckdb', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, + { runId: `duckdb-${connectionId}` }, + ); + } finally { + await connector.cleanup(); + } + }, + }; +} diff --git a/packages/cli/src/connectors/shared/duckdb-json-safe.ts b/packages/cli/src/connectors/shared/duckdb-json-safe.ts new file mode 100644 index 00000000..d6ea046f --- /dev/null +++ b/packages/cli/src/connectors/shared/duckdb-json-safe.ts @@ -0,0 +1,14 @@ +const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER); +const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER); + +// DuckDB returns integer columns as JS bigint (unserializable by JSON). Values +// in Number's safe range become Number; larger magnitudes become strings so a +// BIGINT beyond 2^53 keeps its exact value instead of silently rounding. +/** @internal */ +export function jsonSafeBigint(value: bigint): number | string { + return value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT ? Number(value) : value.toString(); +} + +export function toJsonSafeRows(rows: unknown[][]): unknown[][] { + return rows.map((row) => row.map((cell) => (typeof cell === 'bigint' ? jsonSafeBigint(cell) : cell))); +} diff --git a/packages/cli/src/context/connections/connection-type.ts b/packages/cli/src/context/connections/connection-type.ts index 6cd48042..d122e950 100644 --- a/packages/cli/src/context/connections/connection-type.ts +++ b/packages/cli/src/context/connections/connection-type.ts @@ -3,6 +3,7 @@ import { z } from 'zod'; export const connectionTypeSchema = z.enum([ 'POSTGRESQL', 'SQLITE', + 'DUCKDB', 'SQLSERVER', 'BIGQUERY', 'SNOWFLAKE', diff --git a/packages/cli/src/context/connections/dialects.ts b/packages/cli/src/context/connections/dialects.ts index 30608f3e..ad739d1d 100644 --- a/packages/cli/src/context/connections/dialects.ts +++ b/packages/cli/src/context/connections/dialects.ts @@ -1,5 +1,6 @@ import { KtxBigQueryDialect } from '../../connectors/bigquery/dialect.js'; import { KtxClickHouseDialect } from '../../connectors/clickhouse/dialect.js'; +import { KtxDuckDbDialect } from '../../connectors/duckdb/dialect.js'; import { KtxMongoDbDialect } from '../../connectors/mongodb/dialect.js'; import { KtxMysqlDialect } from '../../connectors/mysql/dialect.js'; import { KtxPostgresDialect } from '../../connectors/postgres/dialect.js'; @@ -55,6 +56,7 @@ type KtxSqlDriver = Exclude; const sqlDialectFactories: Record KtxSqlDialect> = { bigquery: () => new KtxBigQueryDialect(), clickhouse: () => new KtxClickHouseDialect(), + duckdb: () => new KtxDuckDbDialect(), mysql: () => new KtxMysqlDialect(), postgres: () => new KtxPostgresDialect(), sqlite: () => new KtxSqliteDialect(), diff --git a/packages/cli/src/context/connections/drivers.ts b/packages/cli/src/context/connections/drivers.ts index 76ac408e..1ba17f54 100644 --- a/packages/cli/src/context/connections/drivers.ts +++ b/packages/cli/src/context/connections/drivers.ts @@ -68,6 +68,27 @@ export const driverRegistrations: Record { + const m = await import('../../connectors/duckdb/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxDuckDbConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection, projectDir }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxDuckDbConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('duckdb'); + } + return new m.KtxDuckDbScanConnector({ connectionId, connection: typedConnection, projectDir }); + }, + }; + }, + }, mongodb: { driver: 'mongodb', scopeConfigKey: 'databases', diff --git a/packages/cli/src/context/connections/federation.ts b/packages/cli/src/context/connections/federation.ts index 74036e2f..21fb6f75 100644 --- a/packages/cli/src/context/connections/federation.ts +++ b/packages/cli/src/context/connections/federation.ts @@ -4,18 +4,19 @@ import type { KtxProjectConnectionConfig } from '../project/config.js'; export const FEDERATED_CONNECTION_ID = '_ktx_federated'; /** - * Drivers DuckDB can ATTACH for federation. The driver name doubles as the - * DuckDB extension/TYPE name, so this set is the single source of truth for - * both membership (a driver participates iff it appears here) and attach type. + * Drivers DuckDB can ATTACH for federation. Membership is governed by this set; + * the attach TYPE is governed by attachTypeForDriver, which returns the driver + * name for extension-backed engines and null for a native DuckDB file (attached + * with no INSTALL/LOAD and no TYPE). */ -const ATTACH_COMPATIBLE_DRIVERS = new Set(['postgres', 'mysql', 'sqlite']); +const ATTACH_COMPATIBLE_DRIVERS = new Set(['postgres', 'mysql', 'sqlite', 'duckdb']); -export function attachTypeForDriver(driver: string): string { +export function attachTypeForDriver(driver: string): string | null { const normalized = driver.toLowerCase(); if (!ATTACH_COMPATIBLE_DRIVERS.has(normalized)) { throw new Error(`Driver "${driver}" cannot be attached by DuckDB federation.`); } - return normalized; + return normalized === 'duckdb' ? null : normalized; } export interface FederatedMember { diff --git a/packages/cli/src/context/connections/local-warehouse-descriptor.ts b/packages/cli/src/context/connections/local-warehouse-descriptor.ts index 0e5d0b9d..b9674e09 100644 --- a/packages/cli/src/context/connections/local-warehouse-descriptor.ts +++ b/packages/cli/src/context/connections/local-warehouse-descriptor.ts @@ -23,6 +23,7 @@ export interface LocalConnectionInfo { const DRIVER_TO_CONNECTION_TYPE: Record = { postgres: 'POSTGRESQL', sqlite: 'SQLITE', + duckdb: 'DUCKDB', sqlserver: 'SQLSERVER', mysql: 'MYSQL', clickhouse: 'CLICKHOUSE', diff --git a/packages/cli/src/context/project/driver-schemas.ts b/packages/cli/src/context/project/driver-schemas.ts index a19fee0f..9f808d00 100644 --- a/packages/cli/src/context/project/driver-schemas.ts +++ b/packages/cli/src/context/project/driver-schemas.ts @@ -11,6 +11,7 @@ const warehouseDrivers = [ 'snowflake', 'bigquery', 'sqlite', + 'duckdb', 'clickhouse', 'sqlserver', ] as const; @@ -52,6 +53,7 @@ const warehouseConnectionSchemas = [ warehouseConnectionSchema('snowflake'), warehouseConnectionSchema('bigquery'), warehouseConnectionSchema('sqlite'), + warehouseConnectionSchema('duckdb'), warehouseConnectionSchema('clickhouse'), warehouseConnectionSchema('sqlserver'), ] as const; diff --git a/packages/cli/src/context/scan/local-scan.ts b/packages/cli/src/context/scan/local-scan.ts index e9644726..0966d15b 100644 --- a/packages/cli/src/context/scan/local-scan.ts +++ b/packages/cli/src/context/scan/local-scan.ts @@ -141,6 +141,7 @@ function normalizeDriver(driver: string | undefined): KtxConnectionDriver { if ( normalized === 'postgres' || normalized === 'sqlite' || + normalized === 'duckdb' || normalized === 'mysql' || normalized === 'clickhouse' || normalized === 'sqlserver' || @@ -151,7 +152,7 @@ function normalizeDriver(driver: string | undefined): KtxConnectionDriver { return normalized; } throw new Error( - `Standalone ktx scan supports postgres/sqlite/mysql/clickhouse/sqlserver/bigquery/snowflake/mongodb in this phase, received "${driver ?? 'unknown'}"`, + `Standalone ktx scan supports postgres/sqlite/duckdb/mysql/clickhouse/sqlserver/bigquery/snowflake/mongodb in this phase, received "${driver ?? 'unknown'}"`, ); } diff --git a/packages/cli/src/context/scan/types.ts b/packages/cli/src/context/scan/types.ts index 9c6010c5..9bd7d865 100644 --- a/packages/cli/src/context/scan/types.ts +++ b/packages/cli/src/context/scan/types.ts @@ -2,6 +2,7 @@ import type { KtxTableRefKey } from './table-ref.js'; export type KtxConnectionDriver = | 'sqlite' + | 'duckdb' | 'postgres' | 'sqlserver' | 'bigquery' diff --git a/packages/cli/src/context/sql-analysis/dialect-notes.ts b/packages/cli/src/context/sql-analysis/dialect-notes.ts index d0a7c634..f0a20a2e 100644 --- a/packages/cli/src/context/sql-analysis/dialect-notes.ts +++ b/packages/cli/src/context/sql-analysis/dialect-notes.ts @@ -6,8 +6,7 @@ import type { SqlAnalysisDialect } from './ports.js'; // dialect), served by the sql_dialect_notes MCP tool. They are package-internal: // copy-runtime-assets.mjs ships them to dist, and they are never installed onto an // agent target. The set covers every dialect reachable from a configured warehouse -// driver; duckdb/databricks are intentionally absent because no connector produces -// them. +// driver; databricks is intentionally absent because no connector produces it. /** @internal Dialects with an authored ./dialects/.md file. */ export const DIALECTS_WITH_NOTES = [ @@ -16,6 +15,7 @@ export const DIALECTS_WITH_NOTES = [ 'snowflake', 'bigquery', 'sqlite', + 'duckdb', 'clickhouse', 'tsql', ] as const; diff --git a/packages/cli/src/context/sql-analysis/dialects/duckdb.md b/packages/cli/src/context/sql-analysis/dialects/duckdb.md new file mode 100644 index 00000000..8569b0a0 --- /dev/null +++ b/packages/cli/src/context/sql-analysis/dialects/duckdb.md @@ -0,0 +1,10 @@ +**duckdb** SQL conventions: +- **FQTN:** `schema.table` within one database (e.g. `main.orders`); a bare `table` resolves against `main`. A second, attached database is `db.schema.table` (e.g. `sales.main.orders`). +- **Identifiers:** case-insensitive; double-quote (`"Name"`) to keep a name with spaces or a reserved word. +- **Date/time:** native `DATE`/`TIMESTAMP`. Bucket with `date_trunc('month', ts)`, pull parts with `EXTRACT(YEAR FROM ts)`, format with `strftime(ts, '%Y-%m')`, and use `CURRENT_DATE`; cast text with `col::DATE` (or `TRY_CAST(col AS DATE)` to null bad values). +- **Series:** `FROM generate_series(DATE '2023-01-01', DATE '2023-12-01', INTERVAL 1 MONTH) AS s(d)` builds a date spine (use `range(...)` for integers), then `LEFT JOIN` the aggregated facts onto it so empty periods still appear. +- **Rolling window over time:** a native calendar-range frame spans real dates and tolerates gaps — `AVG(amount) OVER (ORDER BY day RANGE BETWEEN INTERVAL 29 DAYS PRECEDING AND CURRENT ROW)` is a trailing 30-day average without a spine; guard minimum periods with `COUNT(*) OVER ()`. +- **Integer division:** unlike postgres, `/` is true division (`5 / 2` → `2.5`), so a ratio keeps its fraction; use `//` for floor division (`5 // 2` → `2`) when you want the integer quotient, and round only in the final projection. +- **Safe cast:** duckdb has `TRY_CAST` — `TRY_CAST(x AS DOUBLE)` yields `NULL` for a value that does not parse instead of raising, so counting residual `NULL`s among non-sentinel rows catches an encoding the sample missed without a regex guard. +- **Top-N / windows:** filter a window inline with `QUALIFY` — `SELECT ... QUALIFY ROW_NUMBER() OVER (PARTITION BY key ORDER BY x DESC) = 1` returns one row per key without a wrapping CTE; use `ORDER BY ... LIMIT n` for a global top-N. +- **JSON / semi-structured:** `col->'k'` returns JSON, `col->>'k'` returns text, deep path `json_extract(col, '$.a.b')`; duckdb also has native `STRUCT`, `LIST`, and `MAP` — read a struct field with `col.field` and a list element with `col[1]` (1-indexed). diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index d923a68b..03ff48d2 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -9,6 +9,8 @@ import { isKtxPostgresConnectionConfig, type KtxPostgresConnectionConfig } from import { KtxPostgresHistoricSqlQueryClient } from './connectors/postgres/historic-sql-query-client.js'; import { createSqliteLiveDatabaseIntrospection } from './connectors/sqlite/live-database-introspection.js'; import { isKtxSqliteConnectionConfig } from './connectors/sqlite/connector.js'; +import { createDuckDbLiveDatabaseIntrospection } from './connectors/duckdb/live-database-introspection.js'; +import { isKtxDuckDbConnectionConfig } from './connectors/duckdb/connector.js'; import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js'; import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js'; import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js'; @@ -104,6 +106,10 @@ function createKtxCliLiveDatabaseIntrospection( projectDir: project.projectDir, connections: project.config.connections, }); + const duckdb = createDuckDbLiveDatabaseIntrospection({ + projectDir: project.projectDir, + connections: project.config.connections, + }); const mysql = createMysqlLiveDatabaseIntrospection({ connections: project.config.connections, }); @@ -139,6 +145,9 @@ function createKtxCliLiveDatabaseIntrospection( if (isKtxSqliteConnectionConfig(connection)) { return sqlite.extractSchema(connectionId, options); } + if (isKtxDuckDbConnectionConfig(connection)) { + return duckdb.extractSchema(connectionId, options); + } if (isKtxMysqlConnectionConfig(connection)) { return mysql.extractSchema(connectionId, options); } diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index 7388f74a..33e3b1e0 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -64,6 +64,7 @@ const execFileAsync = promisify(execFileCallback); export type KtxSetupDatabaseDriver = | 'sqlite' + | 'duckdb' | 'postgres' | 'mysql' | 'clickhouse' @@ -159,6 +160,7 @@ const DRIVER_OPTIONS: Array<{ value: KtxSetupDatabaseDriver; label: string }> = { value: 'sqlserver', label: 'SQL Server' }, { value: 'mongodb', label: 'MongoDB' }, { value: 'sqlite', label: 'SQLite' }, + { value: 'duckdb', label: 'DuckDB' }, ]; const DRIVER_LABELS = Object.fromEntries(DRIVER_OPTIONS.map((option) => [option.value, option.label])) as Record< @@ -174,6 +176,7 @@ const HISTORIC_SQL_DIALECT_BY_DRIVER: Partial = { sqlite: 'sqlite-local', + duckdb: 'duckdb-local', postgres: 'postgres-warehouse', mysql: 'mysql-warehouse', clickhouse: 'clickhouse-warehouse', @@ -813,6 +816,18 @@ async function buildConnectionConfig(input: { if (path === undefined) return 'back'; return path ? { driver: 'sqlite', path } : null; } + if (driver === 'duckdb') { + if (args.inputMode === 'disabled' && !args.databaseUrl) return null; + const path = + args.databaseUrl ?? + (await promptText( + prompts, + 'DuckDB database file\nEnter a relative or absolute path, for example ./warehouse.duckdb.', + stringConfigField(input.existingConnection, 'path'), + )); + if (path === undefined) return 'back'; + return path ? { driver: 'duckdb', path } : null; + } if (driver === 'postgres' || driver === 'mysql' || driver === 'clickhouse' || driver === 'sqlserver') { return await buildUrlConnectionConfig({ driver, @@ -1417,9 +1432,11 @@ async function maybeConfigureDatabaseScope(input: { const project = await loadKtxProject({ projectDir: input.projectDir }); const connection = project.config.connections[input.connectionId]; const driver = normalizeDriver(connection?.driver); - if (!driver || driver === 'sqlite') return okValidateResult(); + const spec = driver ? SCOPE_DISCOVERY_SPECS[driver] : undefined; + // Drivers with no scope spec are single-namespace (sqlite, duckdb): there is no + // schema to choose, so skip the scope picker and ingest every table. + if (!driver || !spec) return okValidateResult(); - const spec = SCOPE_DISCOVERY_SPECS[driver]; const existingTables = connection?.enabled_tables; const hasExistingTables = Array.isArray(existingTables) && existingTables.length > 0; const existingScope = spec ? configuredScopeValues(connection, spec) : []; diff --git a/packages/cli/src/status-project.ts b/packages/cli/src/status-project.ts index 6d52518b..2acd151a 100644 --- a/packages/cli/src/status-project.ts +++ b/packages/cli/src/status-project.ts @@ -412,6 +412,7 @@ function buildConnectionStatus( const hint = envHint((conn as Record).credentials_json); return warn(hint ? `credentials missing (env: ${hint})` : 'credentials not set', hint ? `Set ${hint}` : 'Rerun `ktx setup`'); } + case 'duckdb': case 'sqlite': { const path = (conn as Record).path; if (typeof path === 'string' && path.length > 0) return ok(`path: ${path}`); @@ -553,7 +554,7 @@ async function buildQueryHistoryStatus( } const ADAPTER_DRIVER_REQUIREMENT: Record = { - 'live-database': ['postgres', 'mysql', 'snowflake', 'bigquery', 'clickhouse', 'sqlite', 'sqlserver'], + 'live-database': ['postgres', 'mysql', 'snowflake', 'bigquery', 'clickhouse', 'sqlite', 'duckdb', 'sqlserver'], dbt: ['dbt', 'dbt-core', 'dbt-cloud'], notion: ['notion'], metabase: ['metabase'], diff --git a/packages/cli/test/connection.test.ts b/packages/cli/test/connection.test.ts index beeaad65..0fbc6a41 100644 --- a/packages/cli/test/connection.test.ts +++ b/packages/cli/test/connection.test.ts @@ -688,7 +688,7 @@ describe('runKtxConnection', () => { await initKtxProject({ projectDir }); await writeFile( join(projectDir, 'ktx.yaml'), - 'connections:\n mystery:\n driver: duckdb\n', + 'connections:\n mystery:\n driver: nonsense\n', 'utf-8', ); const io = makeIo(); diff --git a/packages/cli/test/connectors/duckdb/connector.test.ts b/packages/cli/test/connectors/duckdb/connector.test.ts new file mode 100644 index 00000000..9f1c3394 --- /dev/null +++ b/packages/cli/test/connectors/duckdb/connector.test.ts @@ -0,0 +1,280 @@ +import { DuckDBInstance } from '@duckdb/node-api'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { pathToFileURL } from 'node:url'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { + KtxDuckDbScanConnector, + duckDbDatabasePathFromConfig, + isKtxDuckDbConnectionConfig, +} from '../../../src/connectors/duckdb/connector.js'; +import { tableRefSet } from '../../../src/context/scan/table-ref.js'; + +let dir: string; +let dbPath: string; + +beforeAll(async () => { + dir = await mkdtemp(join(tmpdir(), 'ktx-duckdb-')); + dbPath = join(dir, 'warehouse.duckdb'); + const instance = await DuckDBInstance.create(dbPath); + const connection = await instance.connect(); + await connection.run('CREATE TABLE customers (id BIGINT PRIMARY KEY, name VARCHAR, big BIGINT)'); + await connection.run( + `INSERT INTO customers VALUES (1, 'Ada', 9223372036854775807), (2, 'Lin', 10)`, + ); + await connection.run('CREATE TABLE orders (id BIGINT, customer_id BIGINT REFERENCES customers(id))'); + await connection.run('INSERT INTO orders VALUES (1, 1), (2, 2)'); + // Composite primary key + composite foreign key, to exercise the parallel + // unnest() zip of constraint/referenced column names in readForeignKeys. + await connection.run('CREATE TABLE regions (country VARCHAR, code VARCHAR, PRIMARY KEY (country, code))'); + await connection.run( + 'CREATE TABLE stores (id BIGINT, country VARCHAR, code VARCHAR, FOREIGN KEY (country, code) REFERENCES regions(country, code))', + ); + await connection.run('CREATE TABLE empty_table (id BIGINT)'); + connection.closeSync(); + instance.closeSync(); +}); + +afterAll(async () => { + await rm(dir, { recursive: true, force: true }); +}); + +function connector(connection: Record = { driver: 'duckdb', path: dbPath }) { + return new KtxDuckDbScanConnector({ connectionId: 'warehouse', connection, projectDir: dir }); +} + +describe('isKtxDuckDbConnectionConfig', () => { + it('accepts duckdb driver, rejects others', () => { + expect(isKtxDuckDbConnectionConfig({ driver: 'duckdb' })).toBe(true); + expect(isKtxDuckDbConnectionConfig({ driver: 'sqlite' })).toBe(false); + }); +}); + +describe('duckDbDatabasePathFromConfig', () => { + it('resolves a relative path against projectDir', () => { + const resolved = duckDbDatabasePathFromConfig({ + connectionId: 'warehouse', + projectDir: dir, + connection: { driver: 'duckdb', path: 'warehouse.duckdb' }, + }); + expect(resolved).toBe(dbPath); + }); + + it('derives the path from a file: url', () => { + const resolved = duckDbDatabasePathFromConfig({ + connectionId: 'warehouse', + connection: { driver: 'duckdb', url: pathToFileURL(dbPath).href }, + }); + expect(resolved).toBe(dbPath); + }); + + it('derives the path from a duckdb: url', () => { + const resolved = duckDbDatabasePathFromConfig({ + connectionId: 'warehouse', + connection: { driver: 'duckdb', url: `duckdb://${dbPath}` }, + }); + expect(resolved).toBe(dbPath); + }); + + it('resolves an env: reference in path', () => { + process.env.KTX_TEST_DUCKDB_PATH = dbPath; + try { + const resolved = duckDbDatabasePathFromConfig({ + connectionId: 'warehouse', + connection: { driver: 'duckdb', path: 'env:KTX_TEST_DUCKDB_PATH' }, + }); + expect(resolved).toBe(dbPath); + } finally { + delete process.env.KTX_TEST_DUCKDB_PATH; + } + }); + + it('rejects a non-duckdb driver', () => { + expect(() => + duckDbDatabasePathFromConfig({ + connectionId: 'warehouse', + connection: { driver: 'sqlite', path: dbPath }, + }), + ).toThrow(/cannot run driver "sqlite"/); + }); + + it('requires a path or url', () => { + expect(() => + duckDbDatabasePathFromConfig({ + connectionId: 'warehouse', + connection: { driver: 'duckdb' }, + }), + ).toThrow(/requires connections\.warehouse\.path or url/); + }); +}); + +describe('KtxDuckDbScanConnector', () => { + it('testConnection succeeds for an existing file', async () => { + const c = connector(); + expect(await c.testConnection()).toEqual({ success: true }); + await c.cleanup(); + }); + + it('testConnection fails (never creating) for a missing file', async () => { + const c = connector({ driver: 'duckdb', path: join(dir, 'absent.duckdb') }); + const result = await c.testConnection(); + expect(result.success).toBe(false); + await c.cleanup(); + }); + + it('introspects main-schema tables, columns, and foreign keys', async () => { + const c = connector(); + const snapshot = await c.introspect({ connectionId: 'warehouse', driver: 'duckdb' }, { runId: 't' }); + const names = snapshot.tables.map((t) => t.name).sort(); + expect(names).toEqual(['customers', 'empty_table', 'orders', 'regions', 'stores']); + const orders = snapshot.tables.find((t) => t.name === 'orders'); + expect(orders?.foreignKeys[0]).toMatchObject({ fromColumn: 'customer_id', toTable: 'customers', toColumn: 'id' }); + await c.cleanup(); + }); + + it('maps a composite foreign key column-for-column to the referenced table', async () => { + const c = connector(); + const snapshot = await c.introspect({ connectionId: 'warehouse', driver: 'duckdb' }, { runId: 't' }); + const stores = snapshot.tables.find((t) => t.name === 'stores'); + const fks = stores?.foreignKeys.map((fk) => ({ fromColumn: fk.fromColumn, toTable: fk.toTable, toColumn: fk.toColumn })); + expect(fks).toEqual([ + { fromColumn: 'country', toTable: 'regions', toColumn: 'country' }, + { fromColumn: 'code', toTable: 'regions', toColumn: 'code' }, + ]); + await c.cleanup(); + }); + + it('lists tables', async () => { + const c = connector(); + const tables = (await c.listTables()).map((t) => t.name).sort(); + expect(tables).toEqual(['customers', 'empty_table', 'orders', 'regions', 'stores']); + await c.cleanup(); + }); + + it('samples a table', async () => { + const c = connector(); + const sample = await c.sampleTable( + { connectionId: 'warehouse', table: { name: 'customers', catalog: null, db: null }, limit: 1 }, + { runId: 't' }, + ); + expect(sample.rows.length).toBe(1); + await c.cleanup(); + }); + + it('stringifies BIGINT beyond 2^53 in read-only results', async () => { + const c = connector(); + const result = await c.executeReadOnly( + { connectionId: 'warehouse', sql: 'SELECT big FROM customers WHERE id = 1', maxRows: 10 }, + { runId: 't' }, + ); + expect(result.rows[0][0]).toBe('9223372036854775807'); + await c.cleanup(); + }); + + it('rejects non-read-only SQL', async () => { + const c = connector(); + await expect( + c.executeReadOnly({ connectionId: 'warehouse', sql: 'DELETE FROM customers', maxRows: 10 }, { runId: 't' }), + ).rejects.toThrow(); + await c.cleanup(); + }); + + it('returns distinct values under the cardinality cap', async () => { + const c = connector(); + const distinct = await c.getColumnDistinctValues({ name: 'customers', catalog: null, db: null }, 'name', { + maxCardinality: 10, + limit: 10, + }); + expect(distinct?.values?.sort()).toEqual(['Ada', 'Lin']); + await c.cleanup(); + }); + + it('withholds values but reports the count when cardinality exceeds the cap', async () => { + const c = connector(); + const distinct = await c.getColumnDistinctValues({ name: 'customers', catalog: null, db: null }, 'name', { + maxCardinality: 1, + limit: 10, + }); + expect(distinct).toEqual({ values: null, cardinality: 2 }); + await c.cleanup(); + }); + + it('samples a single column, dropping null rows', async () => { + const c = connector(); + const sample = await c.sampleColumn( + { connectionId: 'warehouse', table: { name: 'customers', catalog: null, db: null }, column: 'name', limit: 10 }, + { runId: 't' }, + ); + expect(sample.values.sort()).toEqual(['Ada', 'Lin']); + expect(sample.nullCount).toBeNull(); + await c.cleanup(); + }); + + it('counts table rows', async () => { + const c = connector(); + expect(await c.getTableRowCount('customers')).toBe(2); + await c.cleanup(); + }); + + it('lists only the main schema and reports no column stats', async () => { + const c = connector(); + expect(await c.listSchemas()).toEqual(['main']); + expect(await c.columnStats({ connectionId: 'warehouse', table: { name: 'customers', catalog: null, db: null }, column: 'id' }, { runId: 't' })).toBeNull(); + await c.cleanup(); + }); + + it('rejects operations for a mismatched connection id', async () => { + const c = connector(); + await expect( + c.executeReadOnly({ connectionId: 'other', sql: 'SELECT 1', maxRows: 1 }, { runId: 't' }), + ).rejects.toThrow(/cannot serve connection other/); + await c.cleanup(); + }); + + it('exposes the dialect identifier quoting', () => { + expect(connector().quoteIdentifier('a"b')).toBe('"a""b"'); + }); + + // Opening a connection must never create the file: the db() guard throws + // rather than letting DuckDBInstance.create() materialize a missing path. + it('refuses to open (never creating) a missing file when a query runs', async () => { + const c = connector({ driver: 'duckdb', path: join(dir, 'absent.duckdb') }); + await expect(c.listTables()).rejects.toThrow(/File not found/); + await c.cleanup(); + }); + + it('returns no tables for an empty table scope', async () => { + const c = connector(); + const snapshot = await c.introspect( + { connectionId: 'warehouse', driver: 'duckdb', tableScope: new Set() }, + { runId: 't' }, + ); + expect(snapshot.tables).toEqual([]); + await c.cleanup(); + }); + + it('restricts introspection to the named tables in a non-empty scope', async () => { + const c = connector(); + const snapshot = await c.introspect( + { + connectionId: 'warehouse', + driver: 'duckdb', + tableScope: tableRefSet([{ catalog: null, db: null, name: 'customers' }]), + }, + { runId: 't' }, + ); + expect(snapshot.tables.map((t) => t.name)).toEqual(['customers']); + await c.cleanup(); + }); + + it('reports zero cardinality and an empty value list for an empty column', async () => { + const c = connector(); + const distinct = await c.getColumnDistinctValues({ name: 'empty_table', catalog: null, db: null }, 'id', { + maxCardinality: 10, + limit: 10, + }); + expect(distinct).toEqual({ values: [], cardinality: 0 }); + await c.cleanup(); + }); +}); diff --git a/packages/cli/test/connectors/duckdb/dialect.test.ts b/packages/cli/test/connectors/duckdb/dialect.test.ts new file mode 100644 index 00000000..6079dfab --- /dev/null +++ b/packages/cli/test/connectors/duckdb/dialect.test.ts @@ -0,0 +1,108 @@ +import { describe, expect, it } from 'vitest'; +import { KtxDuckDbDialect } from '../../../src/connectors/duckdb/dialect.js'; + +describe('KtxDuckDbDialect', () => { + const dialect = new KtxDuckDbDialect(); + + it('quotes identifiers with double quotes and escapes embedded quotes', () => { + expect(dialect.quoteIdentifier('order"s')).toBe('"order""s"'); + }); + + it('maps integer types to number dimension', () => { + expect(dialect.mapToDimensionType('BIGINT')).toBe('number'); + expect(dialect.mapToDimensionType('DOUBLE')).toBe('number'); + }); + + it('maps timestamp types to time dimension', () => { + expect(dialect.mapToDimensionType('TIMESTAMP')).toBe('time'); + expect(dialect.mapToDimensionType('DATE')).toBe('time'); + }); + + it('maps text types to string dimension', () => { + expect(dialect.mapToDimensionType('VARCHAR')).toBe('string'); + }); + + it('maps boolean types to boolean dimension', () => { + expect(dialect.mapToDimensionType('BOOLEAN')).toBe('boolean'); + expect(dialect.mapToDimensionType('BOOL')).toBe('boolean'); + }); + + it('falls back to string for an empty or unknown native type', () => { + expect(dialect.mapToDimensionType('')).toBe('string'); + expect(dialect.mapToDimensionType('JSON')).toBe('string'); + }); + + // The precedence ladder strips parameters before substring rules fire, so a + // parameterized DECIMAL still resolves through the numeric branch rather than + // the string fallback. + it('strips type parameters before resolving the dimension', () => { + expect(dialect.mapToDimensionType('DECIMAL(10,2)')).toBe('number'); + expect(dialect.mapToDimensionType('VARCHAR(255)')).toBe('string'); + }); + + // Types absent from the exact-match table still resolve via substring rules: + // TIMESTAMP_NS (time), UINT128/HUGEINT-like (number), and lowercase input. + it('resolves unlisted types through substring matching, case-insensitively', () => { + expect(dialect.mapToDimensionType('timestamp_ns')).toBe('time'); + expect(dialect.mapToDimensionType('INT128')).toBe('number'); + expect(dialect.mapToDimensionType(' double ')).toBe('number'); + }); + + it('generates a limited sample query', () => { + expect(dialect.generateSampleQuery('"t"', 5)).toBe('SELECT * FROM "t" LIMIT 5'); + }); + + it('quotes selected columns in a sample query', () => { + expect(dialect.generateSampleQuery('"t"', 5, ['a', 'b'])).toBe('SELECT "a", "b" FROM "t" LIMIT 5'); + }); + + it('builds a non-null, non-blank column sample query', () => { + expect(dialect.generateColumnSampleQuery('"t"', 'email', 3)).toBe( + `SELECT "email" FROM "t" WHERE "email" IS NOT NULL AND TRIM(CAST("email" AS VARCHAR)) != '' LIMIT 3`, + ); + }); + + // A degenerate sample percentage (<=0 or >=1) means "no sampling", so both the + // random filter and the TABLESAMPLE clause must collapse to an empty string. + it('returns empty sample clauses outside the (0,1) range and real clauses inside it', () => { + expect(dialect.getRandomSampleFilter(0)).toBe(''); + expect(dialect.getRandomSampleFilter(1)).toBe(''); + expect(dialect.getRandomSampleFilter(0.25)).toBe('RANDOM() < 0.25'); + expect(dialect.getTableSampleClause(0)).toBe(''); + expect(dialect.getTableSampleClause(0.1)).toBe('USING SAMPLE 10 PERCENT (bernoulli)'); + }); + + // A type missing from the exact-match table but containing BOOL still resolves + // through the substring branch rather than the string fallback. + it('resolves a BOOL-substring type to boolean', () => { + expect(dialect.mapToDimensionType('MYBOOL')).toBe('boolean'); + }); + + it('builds limit/offset, sample-value aggregation, and randomized cardinality clauses', () => { + expect(dialect.getLimitOffsetClause(10, 5)).toContain('LIMIT 10'); + expect(dialect.getSampleValueAggregation('SELECT 1')).toContain('STRING_AGG'); + expect(dialect.generateRandomizedCardinalitySampleQuery('"t"', 'c', 100)).toContain('USING SAMPLE 100 ROWS'); + }); + + it('exposes profiling expressions and a null column-statistics query', () => { + expect(dialect.getNullCountExpression('c')).toBe('SUM(CASE WHEN c IS NULL THEN 1 ELSE 0 END)'); + expect(dialect.getDistinctCountExpression('c')).toBe('COUNT(DISTINCT c)'); + expect(dialect.textLengthExpression('c')).toBe('LENGTH(CAST(c AS VARCHAR))'); + expect(dialect.castToText('c')).toBe('CAST(c AS VARCHAR)'); + expect(dialect.mapDataType('BIGINT')).toBe('BIGINT'); + expect(dialect.getTopClause(5)).toBe(''); + expect(dialect.generateColumnStatisticsQuery('main', 't')).toBeNull(); + }); + + // Guards the single-namespace (db=null) display shape: v1 introspects only + // `main`, so a display ref must round-trip as a bare table name. An ANSI shape + // would emit a 1-part name it then refuses to parse, breaking column lookups. + it('round-trips a single-namespace display ref and reports a 1-part column shape', () => { + const table = { catalog: null, db: null, name: 'orders' }; + const display = dialect.formatDisplayRef(table); + expect(display).toBe('orders'); + expect(dialect.parseDisplayRef(display)).toMatchObject({ name: 'orders' }); + expect(dialect.columnDisplayTablePartCount()).toBe(1); + expect(dialect.formatTableName(table)).toBe('"orders"'); + }); +}); diff --git a/packages/cli/test/connectors/duckdb/federated-attach.test.ts b/packages/cli/test/connectors/duckdb/federated-attach.test.ts index 7d16fb47..e5c30090 100644 --- a/packages/cli/test/connectors/duckdb/federated-attach.test.ts +++ b/packages/cli/test/connectors/duckdb/federated-attach.test.ts @@ -135,6 +135,14 @@ describe('federatedAttachTarget', () => { expect(target).toContain('ssl_mode=REQUIRED'); }); + it('resolves a duckdb member to its database file path', () => { + const target = federatedAttachTarget( + { connectionId: 'dux', driver: 'duckdb', projectDir: '/p', connection: { driver: 'duckdb', path: 'a.duckdb' } }, + {}, + ); + expect(target).toBe('/p/a.duckdb'); + }); + it('throws for an unsupported driver', () => { expect(() => federatedAttachTarget(member({ driver: 'snowflake', connection: { driver: 'snowflake' } }), {})).toThrow( /cannot be attached/i, diff --git a/packages/cli/test/connectors/duckdb/federated-executor.test.ts b/packages/cli/test/connectors/duckdb/federated-executor.test.ts index 0cc07dc4..96638c08 100644 --- a/packages/cli/test/connectors/duckdb/federated-executor.test.ts +++ b/packages/cli/test/connectors/duckdb/federated-executor.test.ts @@ -67,4 +67,27 @@ describe('buildAttachStatements', () => { ); expect(stmts.at(-1)).toBe('ATTACH \'postgresql://u:it\'\'s@h/db\' AS "pg" (TYPE postgres, READ_ONLY);'); }); + + it('attaches a native duckdb member with no TYPE and no INSTALL/LOAD', () => { + const statements = buildAttachStatements( + [{ connectionId: 'dux', driver: 'duckdb', projectDir: '/p', connection: { driver: 'duckdb', path: '/p/a.duckdb' } }], + {}, + ); + expect(statements.some((s) => s.startsWith('INSTALL'))).toBe(false); + expect(statements.find((s) => s.startsWith('ATTACH'))).toContain('(READ_ONLY)'); + expect(statements.find((s) => s.startsWith('ATTACH'))).not.toContain('TYPE'); + }); + + it('mixes a duckdb member with a postgres member, loading only postgres', () => { + const statements = buildAttachStatements( + [ + { connectionId: 'dux', driver: 'duckdb', projectDir: '/p', connection: { driver: 'duckdb', path: '/p/a.duckdb' } }, + { connectionId: 'pg', driver: 'postgres', projectDir: '/p', connection: { driver: 'postgres', url: 'postgres://h/db' } }, + ], + {}, + ); + expect(statements).toContain('INSTALL postgres; LOAD postgres;'); + expect(statements.some((s) => s.includes('INSTALL duckdb'))).toBe(false); + expect(statements.filter((s) => s.startsWith('ATTACH')).length).toBe(2); + }); }); diff --git a/packages/cli/test/connectors/duckdb/live-database-introspection.test.ts b/packages/cli/test/connectors/duckdb/live-database-introspection.test.ts new file mode 100644 index 00000000..61d4d0e2 --- /dev/null +++ b/packages/cli/test/connectors/duckdb/live-database-introspection.test.ts @@ -0,0 +1,45 @@ +import { DuckDBInstance } from '@duckdb/node-api'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { createDuckDbLiveDatabaseIntrospection } from '../../../src/connectors/duckdb/live-database-introspection.js'; +import { tableRefSet } from '../../../src/context/scan/table-ref.js'; + +let dir: string; +let dbPath: string; + +beforeAll(async () => { + dir = await mkdtemp(join(tmpdir(), 'ktx-duckdb-live-')); + dbPath = join(dir, 'warehouse.duckdb'); + const instance = await DuckDBInstance.create(dbPath); + const connection = await instance.connect(); + await connection.run('CREATE TABLE customers (id BIGINT, name VARCHAR)'); + await connection.run('CREATE TABLE orders (id BIGINT)'); + connection.closeSync(); + instance.closeSync(); +}); + +afterAll(async () => { + await rm(dir, { recursive: true, force: true }); +}); + +function port() { + return createDuckDbLiveDatabaseIntrospection({ + projectDir: dir, + connections: { warehouse: { driver: 'duckdb', path: dbPath } }, + }); +} + +describe('createDuckDbLiveDatabaseIntrospection', () => { + it('extracts the full schema for a connection', async () => { + const snapshot = await port().extractSchema('warehouse'); + expect(snapshot.tables.map((t) => t.name).sort()).toEqual(['customers', 'orders']); + }); + + it('restricts extraction to a table scope', async () => { + const tableScope = tableRefSet([{ catalog: null, db: null, name: 'customers' }]); + const snapshot = await port().extractSchema('warehouse', { tableScope }); + expect(snapshot.tables.map((t) => t.name)).toEqual(['customers']); + }); +}); diff --git a/packages/cli/test/connectors/shared/duckdb-json-safe.test.ts b/packages/cli/test/connectors/shared/duckdb-json-safe.test.ts new file mode 100644 index 00000000..cd114a53 --- /dev/null +++ b/packages/cli/test/connectors/shared/duckdb-json-safe.test.ts @@ -0,0 +1,17 @@ +import { describe, expect, it } from 'vitest'; +import { jsonSafeBigint, toJsonSafeRows } from '../../../src/connectors/shared/duckdb-json-safe.js'; + +describe('duckdb json-safe bigint', () => { + it('keeps safe-range bigints as numbers', () => { + expect(jsonSafeBigint(42n)).toBe(42); + }); + + it('stringifies bigints beyond Number.MAX_SAFE_INTEGER', () => { + const big = BigInt(Number.MAX_SAFE_INTEGER) + 10n; + expect(jsonSafeBigint(big)).toBe(big.toString()); + }); + + it('converts only bigint cells in a row matrix', () => { + expect(toJsonSafeRows([[1n, 'a', null]])).toEqual([[1, 'a', null]]); + }); +}); diff --git a/packages/cli/test/context/connections/dialects.test.ts b/packages/cli/test/context/connections/dialects.test.ts index cc7eaa59..74f95cc2 100644 --- a/packages/cli/test/context/connections/dialects.test.ts +++ b/packages/cli/test/context/connections/dialects.test.ts @@ -305,7 +305,7 @@ describe('getDialectForDriver', () => { it('throws with a supported-driver list for unknown drivers', () => { expect(() => getDialectForDriver('oracle')).toThrow( - 'Unsupported driver "oracle". Supported drivers: bigquery, clickhouse, mongodb, mysql, postgres, snowflake, sqlite, sqlserver', + 'Unsupported driver "oracle". Supported drivers: bigquery, clickhouse, duckdb, mongodb, mysql, postgres, snowflake, sqlite, sqlserver', ); }); diff --git a/packages/cli/test/context/connections/drivers.test.ts b/packages/cli/test/context/connections/drivers.test.ts index 65bbca4b..38a0f48f 100644 --- a/packages/cli/test/context/connections/drivers.test.ts +++ b/packages/cli/test/context/connections/drivers.test.ts @@ -22,6 +22,7 @@ const connectionFixtures: Record = { schemas: ['public'], }), sqlite: () => ({ driver: 'sqlite', path: 'warehouse.db' }), + duckdb: (projectDir) => ({ driver: 'duckdb', path: join(projectDir, 'warehouse.duckdb') }), mongodb: () => ({ driver: 'mongodb', url: 'mongodb://localhost:27017/app', @@ -101,6 +102,7 @@ describe('driverRegistrations', () => { expect(listSupportedDrivers()).toEqual([ 'bigquery', 'clickhouse', + 'duckdb', 'mongodb', 'mysql', 'postgres', @@ -138,7 +140,7 @@ describe('driverRegistrations', () => { expect(connector.listTables).toEqual(expect.any(Function)); await connector.cleanup?.(); - if (registration.driver === 'sqlite') { + if (registration.driver === 'sqlite' || registration.driver === 'duckdb') { expect(registration.scopeConfigKey).toBeNull(); } else { expect(registration.scopeConfigKey).not.toBeNull(); diff --git a/packages/cli/test/context/mcp/dialect-notes.test.ts b/packages/cli/test/context/mcp/dialect-notes.test.ts index 27e9d922..7cf36d7f 100644 --- a/packages/cli/test/context/mcp/dialect-notes.test.ts +++ b/packages/cli/test/context/mcp/dialect-notes.test.ts @@ -33,8 +33,7 @@ describe('per-dialect SQL notes', () => { }); it('does not author notes for unreachable dialects', () => { - // duckdb/databricks appear in the resolver map but no connector produces them. - expect(DIALECTS_WITH_NOTES).not.toContain('duckdb'); + // databricks appears in the resolver map but no connector produces it. expect(DIALECTS_WITH_NOTES).not.toContain('databricks'); }); diff --git a/packages/cli/test/local-scan-connectors.test.ts b/packages/cli/test/local-scan-connectors.test.ts index 827b4ba1..6cb2d8cd 100644 --- a/packages/cli/test/local-scan-connectors.test.ts +++ b/packages/cli/test/local-scan-connectors.test.ts @@ -92,7 +92,7 @@ describe('createKtxCliScanConnector', () => { expect(bigQueryMock.constructorInputs[0]).not.toHaveProperty('maxBytesBilled'); }); - it('rejects daemon-only fallback driver configs at config parse time', async () => { + it('resolves a duckdb connection to the DuckDB scan connector', async () => { await initKtxProject({ projectDir: tempDir }); await writeFile( join(tempDir, 'ktx.yaml'), @@ -105,10 +105,12 @@ describe('createKtxCliScanConnector', () => { ].join('\n'), 'utf-8', ); + const project = await loadKtxProject({ projectDir: tempDir }); - await expect(loadKtxProject({ projectDir: tempDir })).rejects.toThrow( - /connections\.warehouse\.driver:.*Invalid discriminator value/, - ); + const connector = await createKtxCliScanConnector(project, 'warehouse'); + + expect(connector.id).toBe('duckdb:warehouse'); + expect(connector.driver).toBe('duckdb'); }); it('rejects connection blocks with no driver field at config parse time', async () => { diff --git a/packages/cli/test/setup-databases.test.ts b/packages/cli/test/setup-databases.test.ts index 3f363805..bcf1db79 100644 --- a/packages/cli/test/setup-databases.test.ts +++ b/packages/cli/test/setup-databases.test.ts @@ -245,6 +245,7 @@ describe('setup databases step', () => { { value: 'sqlserver', label: 'SQL Server' }, { value: 'mongodb', label: 'MongoDB' }, { value: 'sqlite', label: 'SQLite' }, + { value: 'duckdb', label: 'DuckDB' }, ], required: true, }); @@ -3515,4 +3516,72 @@ describe('setup databases step', () => { expect(io.stdout()).toContain('ktx cannot work until you add a database.'); expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).not.toContain('completed_steps:'); }); + + it('adds one non-interactive DuckDB connection from --database-url without prompting', async () => { + const io = makeIo(); + const prompts = makePromptAdapter({}); + const testConnection = vi.fn(async () => 0); + const scanConnection = vi.fn(async () => 0); + + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'disabled', + databaseDrivers: ['duckdb'], + databaseConnectionId: 'duckdb-local', + databaseUrl: './warehouse.duckdb', + databaseSchemas: [], + skipDatabases: false, + }, + io.io, + { prompts, testConnection, scanConnection }, + ); + + expect(result.status).toBe('ready'); + expect(prompts.text).not.toHaveBeenCalled(); + expect(testConnection).toHaveBeenCalledWith(tempDir, 'duckdb-local', expect.anything()); + expect(scanConnection).toHaveBeenCalledWith(tempDir, 'duckdb-local', expect.anything()); + const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['duckdb-local']).toEqual({ + driver: 'duckdb', + path: './warehouse.duckdb', + }); + expect(config.setup).toEqual({ + database_connection_ids: ['duckdb-local'], + }); + expect((await readKtxSetupState(tempDir)).completed_steps).toContain('databases'); + }); + + it('adds an interactive DuckDB connection without prompting for a schema', async () => { + const prompts = makePromptAdapter({ + selectValues: ['no'], + textValues: ['', './warehouse.duckdb'], + }); + const pickers = makePickerStubs(); + + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'auto', + databaseDrivers: ['duckdb'], + databaseSchemas: [], + skipDatabases: false, + }, + makeIo().io, + { + prompts, + testConnection: vi.fn(async () => 0), + scanConnection: vi.fn(async () => 0), + pickDatabaseScope: pickers.pickDatabaseScope, + }, + ); + + expect(result.status).toBe('ready'); + expect(pickers.scopeCalls).toHaveLength(0); + const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['duckdb-local']).toEqual({ + driver: 'duckdb', + path: './warehouse.duckdb', + }); + }); });