diff --git a/docs-site/content/docs/concepts/cross-database-federation.mdx b/docs-site/content/docs/concepts/cross-database-federation.mdx new file mode 100644 index 00000000..ee3884f2 --- /dev/null +++ b/docs-site/content/docs/concepts/cross-database-federation.mdx @@ -0,0 +1,154 @@ +--- +title: Cross-database federation +description: How ktx federates postgres, mysql, and sqlite connections so a single read-only SQL query can join across them without copying data. +--- + +Cross-database federation lets a single read-only SQL query join tables that +live in different databases. **ktx** achieves this by embedding DuckDB and +using its `ATTACH` mechanism to connect each member database read-only. The +join executes inside DuckDB at query time — live data, no ETL, no copy. + +You run federated queries as raw SQL against the `_ktx_federated` connection +(see [Querying the federated connection +directly](#querying-the-federated-connection-directly)). Semantic-layer queries +(`ktx sl query` / the `sl_query` tool) stay per-connection; pointing one at +`_ktx_federated` returns an error telling you to use raw SQL instead. + +Federation activates automatically when a `ktx.yaml` file declares two or more +attach-compatible connections. There is nothing to configure and no federation +block to add. With zero or one compatible connection the behavior is unchanged. + +## Which connections participate + +The v1 federation engine supports three drivers: + +| Driver | Participates in federation | +|--------|---------------------------| +| `postgres` | Yes | +| `mysql` | Yes | +| `sqlite` | Yes | +| `snowflake` | No — standalone connection | +| `bigquery` | No — standalone connection | +| `clickhouse` | No — standalone connection | +| `sqlserver` | No — standalone connection | + +Non-participating connections continue to work exactly as they did. They are +queried independently; they do not appear as federation members. + +## How it activates + +**ktx** inspects the connections in `ktx.yaml` at startup. When it finds two or +more connections whose driver is `postgres`, `mysql`, or `sqlite`, it +instantiates the DuckDB federation engine and attaches each one read-only. +There is no `federation:` key, no opt-in flag, and no connection-level setting +to enable. The engine is derived entirely from what is already declared. + +A minimal `ktx.yaml` that triggers federation: + +```yaml +connections: + pg_books: + driver: postgres + url: "postgres://user:pass@localhost:5432/books" # pragma: allowlist secret + sqlite_reviews: + driver: sqlite + path: ./data/reviews.db +``` + +Two attach-compatible connections are present, so federation is active. + +## Table naming in federated queries + +Inside a federated query, postgres and mysql tables use a three-part name: +`connectionId.schema.table`. SQLite tables, which have no schema layer in +DuckDB, use the two-part form `connectionId.table`. In both cases the +connection's `id` field in `ktx.yaml` becomes the catalog name inside DuckDB. + +If a connection `id` is not a bare SQL identifier — for example it contains a +hyphen, like `books-db` — double-quote it in the query the same way DuckDB +quotes any identifier: `"books-db".public.books`. Writing it unquoted +(`books-db.public.books`) is a SQL syntax error, not a federation feature. + +For the example above: + +- `pg_books.public.books` — the `books` table in the `public` schema of the + postgres connection +- `sqlite_reviews.reviews` — the `reviews` table in the sqlite connection + +These fully qualified names are what you write when you query the federated +connection with raw SQL (see [Querying the federated connection +directly](#querying-the-federated-connection-directly)). A source file's own +`table:` field is not prefixed this way — see [Source files keep member-native +table refs](#source-files-keep-member-native-table-refs) below. + +## Source names in the federated view + +When you list or search semantic-layer sources under the federated connection, +each source's `name` is prefixed with its member connection id — for example +`pg_books.books` and `sqlite_reviews.reviews`. The prefix keeps names unique +when two members own a table with the same name: a `users` table in each of +`pg_app` and `sqlite_app` surfaces as `pg_app.users` and `sqlite_app.users` +rather than colliding on a bare `users`. + +## Source files keep member-native table refs + +A source file's physical `table:` field is not prefixed with the connection id. +It stays the member-native reference the connector uses on its own — +`public.books` for the postgres member, `reviews` for the sqlite member — +because the same file backs a per-connection semantic-layer query against that +member, which runs on the member's own driver where a `pg_books.` catalog prefix +would point at a database that does not exist. The connection-id prefix is a +DuckDB catalog name that appears only in raw federated SQL; the member prefix on +the source `name` (above) is independent of it. + +## Cross-database joins + +Write a cross-database join as raw SQL against `_ktx_federated` — see +[Querying the federated connection +directly](#querying-the-federated-connection-directly) below for a runnable +example. DuckDB attaches both members and resolves the join live at query time. + +Declaring the join in a source file's `joins:` block is not supported yet. The +semantic layer plans each connection on its own, so a `joins:` entry whose `to:` +points at a table in another member is not resolved across the federation +boundary. Until that lands, express cross-database joins as raw SQL. + +## Querying the federated connection directly + +The federated connection is addressable by its id, +`_ktx_federated`, anywhere **ktx** runs read-only SQL. The same id works for the +`ktx sql` command and for a data agent calling the `sql_execution` MCP tool, so +both surfaces can run a cross-database query without a source file: + +```bash +ktx sql -c _ktx_federated \ + "SELECT b.title, avg(r.rating) AS avg_rating + FROM pg_books.public.books b + JOIN sqlite_reviews.reviews r ON b.id = r.book_id + GROUP BY b.title" +``` + +Table names follow the rules from +[Table naming in federated queries](#table-naming-in-federated-queries): +three-part `connectionId.schema.table` for postgres and mysql, two-part +`connectionId.table` for sqlite. The `_ktx_federated` id is virtual — it is +never written to `ktx.yaml` and only exists when two or more attach-compatible +connections are declared. It surfaces in `ktx connection` and in the agent's +connection list so the id is discoverable. Querying a single member database +directly with its own connection id (`ktx sql -c pg_books ...`) is unchanged. + +## Federated queries are read-only + +DuckDB attaches every member database with read-only access. Federated queries +are `SELECT`/`WITH` only. No writes, no DDL, and no mutations reach any member +database through the federation engine. + +## Current limitations + +- **Raw SQL joins only.** Cross-database joins are written as raw SQL; declaring + them in a source's `joins:` block and automatic discovery of cross-database + relationships are not available yet. Intra-database relationship discovery for + each member connection is unchanged. +- **postgres, mysql, and sqlite only.** Other drivers (snowflake, bigquery, + clickhouse, sqlserver) do not participate in federation in this version. They + remain usable as standalone connections. diff --git a/docs-site/content/docs/concepts/meta.json b/docs-site/content/docs/concepts/meta.json index bf4de9d6..3936328a 100644 --- a/docs-site/content/docs/concepts/meta.json +++ b/docs-site/content/docs/concepts/meta.json @@ -1,5 +1,5 @@ { "title": "Concepts", "defaultOpen": true, - "pages": ["the-context-layer", "semantic-layer-internals", "wiki-retrieval"] + "pages": ["the-context-layer", "semantic-layer-internals", "cross-database-federation", "wiki-retrieval"] } diff --git a/packages/cli/package.json b/packages/cli/package.json index bcdf8c47..ed1b366e 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -55,6 +55,7 @@ "@clack/prompts": "1.4.0", "@clickhouse/client": "^1.18.5", "@commander-js/extra-typings": "14.0.0", + "@duckdb/node-api": "1.5.3-r.3", "@google-cloud/bigquery": "^8.3.1", "@looker/sdk": "^26.8.0", "@looker/sdk-node": "^26.8.0", diff --git a/packages/cli/src/connection.ts b/packages/cli/src/connection.ts index 1e267833..d12dccb7 100644 --- a/packages/cli/src/connection.ts +++ b/packages/cli/src/connection.ts @@ -6,6 +6,7 @@ import { type NotionBotInfo, NotionClient } from './context/ingest/adapters/noti import { createLocalLookerCredentialResolver } from './context/ingest/adapters/looker/local-looker.adapter.js'; import { metabaseRuntimeConfigFromLocalConnection } from './context/ingest/adapters/metabase/local-metabase.adapter.js'; import { testRepoConnection } from './context/ingest/repo-fetch.js'; +import { federatedConnectionListing } from './context/connections/federation.js'; import { getDriverRegistration } from './context/connections/drivers.js'; import { parseNotionConnectionConfig, resolveNotionConnectionAuthToken } from './context/connections/notion-config.js'; import { resolveKtxConfigReference } from './context/core/config-reference.js'; @@ -447,15 +448,23 @@ export async function runKtxConnection( io.stdout.write('No connections configured. Run `ktx setup` to add one.\n'); return 0; } - const idWidth = Math.max('ID'.length, ...entries.map(([id]) => id.length)); - const driverWidth = Math.max( - 'DRIVER'.length, + const federated = federatedConnectionListing(project.config.connections, args.projectDir); + const idCandidates = [...entries.map(([id]) => id), ...(federated ? [federated.id] : [])]; + const driverLengths = [ ...entries.map(([, c]) => (c.driver ?? 'unknown').length), - ); + ...(federated ? [federated.driver.length] : []), + ]; + const idWidth = Math.max('ID'.length, ...idCandidates.map((id) => id.length)); + const driverWidth = Math.max('DRIVER'.length, ...driverLengths); io.stdout.write(`${'ID'.padEnd(idWidth)} ${'DRIVER'.padEnd(driverWidth)}\n`); for (const [id, connection] of entries) { io.stdout.write(`${id.padEnd(idWidth)} ${(connection.driver ?? 'unknown').padEnd(driverWidth)}\n`); } + if (federated) { + io.stdout.write(`${federated.id.padEnd(idWidth)} ${federated.driver.padEnd(driverWidth)}\n`); + io.stdout.write(` federates: ${federated.members.join(', ')}\n`); + io.stdout.write(` ${federated.hint}\n`); + } return 0; } diff --git a/packages/cli/src/connectors/bigquery/connector.ts b/packages/cli/src/connectors/bigquery/connector.ts index eae0f2ed..0b30c025 100644 --- a/packages/cli/src/connectors/bigquery/connector.ts +++ b/packages/cli/src/connectors/bigquery/connector.ts @@ -26,9 +26,7 @@ import { type KtxTableSampleInput, type KtxTableSampleResult, } from '../../context/scan/types.js'; -import { readFileSync } from 'node:fs'; -import { homedir } from 'node:os'; -import { resolve } from 'node:path'; +import { resolveStringReference } from '../shared/string-reference.js'; export interface KtxBigQueryConnectionConfig { driver?: string; @@ -138,18 +136,6 @@ class DefaultBigQueryClientFactory implements KtxBigQueryClientFactory { } } -function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { - if (value.startsWith('env:')) { - return env[value.slice('env:'.length)] ?? ''; - } - if (value.startsWith('file:')) { - const rawPath = value.slice('file:'.length); - const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; - return readFileSync(path, 'utf-8').trim(); - } - return value; -} - function stringConfigValue( connection: KtxBigQueryConnectionConfig | undefined, key: keyof KtxBigQueryConnectionConfig, diff --git a/packages/cli/src/connectors/clickhouse/connector.ts b/packages/cli/src/connectors/clickhouse/connector.ts index c0d8c9a6..38a477e7 100644 --- a/packages/cli/src/connectors/clickhouse/connector.ts +++ b/packages/cli/src/connectors/clickhouse/connector.ts @@ -3,10 +3,8 @@ import { getDialectForDriver } from '../../context/connections/dialects.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { connectorTestFailure, createKtxConnectorCapabilities, type KtxConnectorTestResult, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js'; import { scopedTableNames } from '../../context/scan/table-ref.js'; -import { readFileSync } from 'node:fs'; +import { resolveStringReference } from '../shared/string-reference.js'; import { Agent as HttpsAgent } from 'node:https'; -import { homedir } from 'node:os'; -import { resolve } from 'node:path'; export interface KtxClickHouseConnectionConfig { driver?: string; @@ -142,19 +140,6 @@ function stringConfigValue( return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined; } -function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { - if (value.startsWith('env:')) { - const envName = value.slice('env:'.length); - return env[envName] ?? ''; - } - if (value.startsWith('file:')) { - const rawPath = value.slice('file:'.length); - const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; - return readFileSync(path, 'utf-8').trim(); - } - return value; -} - function maybeNumber(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } diff --git a/packages/cli/src/connectors/duckdb/federated-attach.ts b/packages/cli/src/connectors/duckdb/federated-attach.ts new file mode 100644 index 00000000..edcb94eb --- /dev/null +++ b/packages/cli/src/connectors/duckdb/federated-attach.ts @@ -0,0 +1,90 @@ +import { sqliteDatabasePathFromConfig, type KtxSqliteConnectionConfig } from '../sqlite/connector.js'; +import { postgresPoolConfigFromConfig, type KtxPostgresConnectionConfig } from '../postgres/connector.js'; +import { + mysqlConnectionPoolConfigFromConfig, + type KtxMysqlConnectionConfig, +} from '../mysql/connector.js'; +import type { FederatedMember } from '../../context/connections/federation.js'; + +function kvKeyword(value: string): string { + // libpq/DuckDB key-value values quote with single quotes and backslash-escape. + return /[\s'\\]/.test(value) ? `'${value.replaceAll('\\', '\\\\').replaceAll("'", "\\'")}'` : value; +} + +function withRequiredSslMode(connectionString: string): string { + // DuckDB passes this libpq URL straight to the server, so an ssl:true member + // must carry sslmode in the URL itself; keep a stronger mode the URL already pins. + const url = new URL(connectionString); + if (url.searchParams.has('sslmode')) { + return connectionString; + } + url.searchParams.set('sslmode', 'require'); + return url.toString(); +} + +function postgresAttachString(member: FederatedMember, env: NodeJS.ProcessEnv): string { + const cfg = postgresPoolConfigFromConfig({ + connectionId: member.connectionId, + connection: member.connection as KtxPostgresConnectionConfig, + env, + }); + if (cfg.connectionString) { + return cfg.ssl ? withRequiredSslMode(cfg.connectionString) : cfg.connectionString; + } + const parts: string[] = []; + if (cfg.host) parts.push(`host=${kvKeyword(cfg.host)}`); + if (cfg.port) parts.push(`port=${cfg.port}`); + if (cfg.database) parts.push(`dbname=${kvKeyword(cfg.database)}`); + if (cfg.user) parts.push(`user=${kvKeyword(cfg.user)}`); + if (cfg.password) parts.push(`password=${kvKeyword(cfg.password)}`); + if (cfg.ssl) { + parts.push('sslmode=require'); + } + if (cfg.options) { + parts.push(`options=${kvKeyword(cfg.options)}`); + } + return parts.join(' '); +} + +function mysqlAttachString(member: FederatedMember, env: NodeJS.ProcessEnv): string { + const cfg = mysqlConnectionPoolConfigFromConfig({ + connectionId: member.connectionId, + connection: member.connection as KtxMysqlConnectionConfig, + env, + }); + const parts: string[] = [ + `host=${kvKeyword(cfg.host)}`, + `port=${cfg.port}`, + `database=${kvKeyword(cfg.database)}`, + `user=${kvKeyword(cfg.user)}`, + ]; + if (cfg.password) { + parts.push(`password=${kvKeyword(cfg.password)}`); + } + if (cfg.ssl) { + parts.push('ssl_mode=REQUIRED'); + } + return parts.join(' '); +} + +/** + * Resolves a federated member's ktx.yaml config into the connection target + * DuckDB's ATTACH wants for that driver, reusing each connector's canonical + * resolver so federation and standalone scans agree on config interpretation. + */ +export function federatedAttachTarget(member: FederatedMember, env: NodeJS.ProcessEnv): string { + switch (member.driver.toLowerCase()) { + case 'sqlite': + return sqliteDatabasePathFromConfig({ + connectionId: member.connectionId, + projectDir: member.projectDir, + connection: member.connection as KtxSqliteConnectionConfig, + }); + case 'postgres': + return postgresAttachString(member, env); + case 'mysql': + return mysqlAttachString(member, env); + default: + throw new Error(`Driver "${member.driver}" cannot be attached by DuckDB federation.`); + } +} diff --git a/packages/cli/src/connectors/duckdb/federated-executor.ts b/packages/cli/src/connectors/duckdb/federated-executor.ts new file mode 100644 index 00000000..7972166c --- /dev/null +++ b/packages/cli/src/connectors/duckdb/federated-executor.ts @@ -0,0 +1,78 @@ +import { DuckDBInstance } from '@duckdb/node-api'; +import { federatedAttachTarget } from './federated-attach.js'; +import type { + KtxSqlQueryExecutionInput, + KtxSqlQueryExecutionResult, +} from '../../context/connections/query-executor.js'; +import { normalizeQueryRows } from '../../context/connections/query-executor.js'; +import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; +import { attachTypeForDriver, type FederatedMember } from '../../context/connections/federation.js'; + +function quoteDuckdbIdentifier(id: string): string { + return `"${id.replaceAll('"', '""')}"`; +} + +const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER); +const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER); + +// DuckDB returns integer columns as JS bigint (unserializable by JSON). Values +// in Number's safe range become Number; larger magnitudes become strings so a +// BIGINT beyond 2^53 keeps its exact value instead of silently rounding. +function jsonSafeBigint(value: bigint): number | string { + return value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT ? Number(value) : value.toString(); +} + +function toJsonSafeRows(rows: unknown[][]): unknown[][] { + return rows.map((row) => row.map((cell) => (typeof cell === 'bigint' ? jsonSafeBigint(cell) : cell))); +} + +/** @internal */ +export function buildAttachStatements(members: FederatedMember[], env: NodeJS.ProcessEnv): string[] { + const attachments = members.map((member) => ({ + type: attachTypeForDriver(member.driver), + url: federatedAttachTarget(member, env), + alias: member.connectionId, + })); + + const loadStatements = [...new Set(attachments.map((a) => a.type))].map( + (type) => `INSTALL ${type}; LOAD ${type};`, + ); + const attachStatements = attachments.map( + ({ type, url, alias }) => + `ATTACH '${url.replaceAll("'", "''")}' AS ${quoteDuckdbIdentifier(alias)} (TYPE ${type}, READ_ONLY);`, + ); + return [...loadStatements, ...attachStatements]; +} + +export async function executeFederatedQuery( + members: FederatedMember[], + input: KtxSqlQueryExecutionInput, + env: NodeJS.ProcessEnv = process.env, +): Promise { + const sql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); + const attachStatements = buildAttachStatements(members, env); + + const instance = await DuckDBInstance.create(':memory:'); + try { + const connection = await instance.connect(); + try { + for (const statement of attachStatements) { + await connection.run(statement); + } + const reader = await connection.runAndReadAll(sql); + const rows = toJsonSafeRows(normalizeQueryRows(reader.getRows())); + const headers = reader.columnNames(); + return { + headers, + rows, + totalRows: rows.length, + command: 'SELECT', + rowCount: rows.length, + }; + } finally { + connection.closeSync(); + } + } finally { + instance.closeSync(); + } +} diff --git a/packages/cli/src/connectors/mysql/connector.ts b/packages/cli/src/connectors/mysql/connector.ts index 2675fa2c..5bddec53 100644 --- a/packages/cli/src/connectors/mysql/connector.ts +++ b/packages/cli/src/connectors/mysql/connector.ts @@ -1,8 +1,6 @@ import mysql, { type FieldPacket, type Pool, type RowDataPacket } from 'mysql2/promise'; -import { readFileSync } from 'node:fs'; -import { homedir } from 'node:os'; -import { resolve } from 'node:path'; import { getDialectForDriver } from '../../context/connections/dialects.js'; +import { resolveStringReference } from '../shared/string-reference.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { constraintDiscoveryWarning, @@ -183,19 +181,6 @@ function stringConfigValue( return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined; } -function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { - if (value.startsWith('env:')) { - const envName = value.slice('env:'.length); - return env[envName] ?? ''; - } - if (value.startsWith('file:')) { - const rawPath = value.slice('file:'.length); - const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; - return readFileSync(path, 'utf-8').trim(); - } - return value; -} - function maybeNumber(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } diff --git a/packages/cli/src/connectors/postgres/connector.ts b/packages/cli/src/connectors/postgres/connector.ts index 1a956a3d..1a2fcd40 100644 --- a/packages/cli/src/connectors/postgres/connector.ts +++ b/packages/cli/src/connectors/postgres/connector.ts @@ -1,6 +1,4 @@ -import { readFileSync } from 'node:fs'; -import { homedir } from 'node:os'; -import { resolve } from 'node:path'; +import { resolveStringReference } from '../shared/string-reference.js'; import { getDialectForDriver } from '../../context/connections/dialects.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js'; @@ -281,17 +279,6 @@ function stringConfigValue( return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined; } -function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { - if (value.startsWith('env:')) { - return env[value.slice('env:'.length)] ?? ''; - } - if (value.startsWith('file:')) { - const rawPath = value.slice('file:'.length); - const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; - return readFileSync(path, 'utf-8').trim(); - } - return value; -} function numberValue(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; diff --git a/packages/cli/src/connectors/shared/string-reference.ts b/packages/cli/src/connectors/shared/string-reference.ts new file mode 100644 index 00000000..7f83736d --- /dev/null +++ b/packages/cli/src/connectors/shared/string-reference.ts @@ -0,0 +1,20 @@ +import { readFileSync } from 'node:fs'; +import { homedir } from 'node:os'; +import { resolve } from 'node:path'; + +/** + * Resolves a config string that may reference an environment variable + * (`env:NAME`) or a file (`file:/path`, `~` expands to the home dir). + * Plain values pass through unchanged. + */ +export function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { + if (value.startsWith('env:')) { + return env[value.slice('env:'.length)] ?? ''; + } + if (value.startsWith('file:')) { + const rawPath = value.slice('file:'.length); + const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(rawPath[1] === '/' ? 2 : 1)) : rawPath; + return readFileSync(path, 'utf-8').trim(); + } + return value; +} diff --git a/packages/cli/src/connectors/snowflake/connector.ts b/packages/cli/src/connectors/snowflake/connector.ts index 56c3b2f3..5f016675 100644 --- a/packages/cli/src/connectors/snowflake/connector.ts +++ b/packages/cli/src/connectors/snowflake/connector.ts @@ -1,8 +1,6 @@ import { createPrivateKey } from 'node:crypto'; -import { readFileSync } from 'node:fs'; -import { homedir } from 'node:os'; -import { resolve } from 'node:path'; import { getDialectForDriver } from '../../context/connections/dialects.js'; +import { resolveStringReference } from '../shared/string-reference.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js'; import { scopedTableNames } from '../../context/scan/table-ref.js'; @@ -135,18 +133,6 @@ export interface KtxSnowflakeColumnDistinctValuesResult { const DATE_TYPES = ['DATE', 'TIMESTAMP', 'TIMESTAMP_LTZ', 'TIMESTAMP_NTZ', 'TIMESTAMP_TZ', 'TIME']; -function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { - if (value.startsWith('env:')) { - return env[value.slice('env:'.length)] ?? ''; - } - if (value.startsWith('file:')) { - const rawPath = value.slice('file:'.length); - const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; - return readFileSync(path, 'utf-8').trim(); - } - return value; -} - function stringConfigValue( connection: KtxSnowflakeConnectionConfig | undefined, key: keyof KtxSnowflakeConnectionConfig, diff --git a/packages/cli/src/connectors/sqlserver/connector.ts b/packages/cli/src/connectors/sqlserver/connector.ts index 0d0136be..116fdea7 100644 --- a/packages/cli/src/connectors/sqlserver/connector.ts +++ b/packages/cli/src/connectors/sqlserver/connector.ts @@ -25,10 +25,8 @@ import { type KtxTableSampleInput, type KtxTableSampleResult, } from '../../context/scan/types.js'; -import { readFileSync } from 'node:fs'; -import { homedir } from 'node:os'; -import { resolve } from 'node:path'; import sql from 'mssql'; +import { resolveStringReference } from '../shared/string-reference.js'; export interface KtxSqlServerConnectionConfig { driver?: string; @@ -208,18 +206,6 @@ function stringConfigValue( return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined; } -function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { - if (value.startsWith('env:')) { - return env[value.slice('env:'.length)] ?? ''; - } - if (value.startsWith('file:')) { - const rawPath = value.slice('file:'.length); - const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; - return readFileSync(path, 'utf-8').trim(); - } - return value; -} - function parseSqlServerUrl(url: string): Partial { const parsed = new URL(url); return { diff --git a/packages/cli/src/context/connections/federation.ts b/packages/cli/src/context/connections/federation.ts new file mode 100644 index 00000000..74036e2f --- /dev/null +++ b/packages/cli/src/context/connections/federation.ts @@ -0,0 +1,83 @@ +import type { KtxProjectConnectionConfig } from '../project/config.js'; + +/** Stable id for the runtime-derived federated connection. Never written to ktx.yaml. */ +export const FEDERATED_CONNECTION_ID = '_ktx_federated'; + +/** + * Drivers DuckDB can ATTACH for federation. The driver name doubles as the + * DuckDB extension/TYPE name, so this set is the single source of truth for + * both membership (a driver participates iff it appears here) and attach type. + */ +const ATTACH_COMPATIBLE_DRIVERS = new Set(['postgres', 'mysql', 'sqlite']); + +export function attachTypeForDriver(driver: string): string { + const normalized = driver.toLowerCase(); + if (!ATTACH_COMPATIBLE_DRIVERS.has(normalized)) { + throw new Error(`Driver "${driver}" cannot be attached by DuckDB federation.`); + } + return normalized; +} + +export interface FederatedMember { + connectionId: string; + driver: string; + projectDir: string; + connection: KtxProjectConnectionConfig; +} + +export interface FederatedConnectionDescriptor { + id: typeof FEDERATED_CONNECTION_ID; + driver: 'duckdb'; + members: FederatedMember[]; +} + +/** + * Derives a virtual federated connection when a project declares 2+ + * attach-compatible databases. Returns null otherwise — single-DB and + * incompatible projects are unaffected. + */ +export function deriveFederatedConnection( + connections: Record, + projectDir: string, +): FederatedConnectionDescriptor | null { + const members: FederatedMember[] = Object.entries(connections) + .filter(([, config]) => ATTACH_COMPATIBLE_DRIVERS.has(config.driver.toLowerCase())) + .map(([connectionId, config]) => ({ + connectionId, + driver: config.driver.toLowerCase(), + projectDir, + connection: config, + })); + if (members.length < 2) { + return null; + } + return { id: FEDERATED_CONNECTION_ID, driver: 'duckdb', members }; +} + +export interface FederatedConnectionListing { + id: typeof FEDERATED_CONNECTION_ID; + driver: 'duckdb'; + members: string[]; + hint: string; +} + +/** + * Listing-facing view of the virtual federated connection for `ktx connection` + * and MCP `connection_list`. Derived from the same declared state as + * deriveFederatedConnection, so both surfaces describe one connection. + */ +export function federatedConnectionListing( + connections: Record, + projectDir: string, +): FederatedConnectionListing | null { + const descriptor = deriveFederatedConnection(connections, projectDir); + if (!descriptor) { + return null; + } + return { + id: FEDERATED_CONNECTION_ID, + driver: 'duckdb', + members: descriptor.members.map((member) => member.connectionId), + hint: 'Cross-database queries run here. Name tables connectionId.schema.table (or connectionId.table for sqlite); double-quote any id that is not a bare SQL identifier, e.g. "books-db".public.books.', + }; +} diff --git a/packages/cli/src/context/connections/local-warehouse-descriptor.ts b/packages/cli/src/context/connections/local-warehouse-descriptor.ts index 4ad926df..0e5d0b9d 100644 --- a/packages/cli/src/context/connections/local-warehouse-descriptor.ts +++ b/packages/cli/src/context/connections/local-warehouse-descriptor.ts @@ -16,6 +16,8 @@ export interface LocalConnectionInfo { id: string; name: string; connectionType: string; + members?: string[]; + hint?: string; } const DRIVER_TO_CONNECTION_TYPE: Record = { diff --git a/packages/cli/src/context/connections/project-sql-executor.ts b/packages/cli/src/context/connections/project-sql-executor.ts new file mode 100644 index 00000000..0c2da04e --- /dev/null +++ b/packages/cli/src/context/connections/project-sql-executor.ts @@ -0,0 +1,58 @@ +import { executeFederatedQuery } from '../../connectors/duckdb/federated-executor.js'; +import type { KtxLocalProject } from '../project/project.js'; +import type { KtxScanConnector, KtxScanContext } from '../scan/types.js'; +import { deriveFederatedConnection, FEDERATED_CONNECTION_ID } from './federation.js'; +import type { KtxSqlQueryExecutionInput, KtxSqlQueryExecutionResult } from './query-executor.js'; + +export interface ExecuteProjectReadOnlySqlDeps { + project: KtxLocalProject; + input: KtxSqlQueryExecutionInput; + createConnector: (connectionId: string) => Promise | KtxScanConnector; + executeFederated?: typeof executeFederatedQuery; + runId?: string; +} + +/** + * Single resolve-and-execute path for project read-only SQL. The federated + * connection is derived from declared state here so every executor entry point + * routes `_ktx_federated` identically; standard connections go through the + * scan connector. + */ +export async function executeProjectReadOnlySql( + deps: ExecuteProjectReadOnlySqlDeps, +): Promise { + const { project, input } = deps; + if (input.connectionId === FEDERATED_CONNECTION_ID) { + const descriptor = deriveFederatedConnection(project.config.connections, project.projectDir); + if (!descriptor) { + throw new Error('Federated execution requested but fewer than 2 attach-compatible connections exist.'); + } + const runFederated = deps.executeFederated ?? executeFederatedQuery; + return runFederated(descriptor.members, input); + } + + let connector: KtxScanConnector | null = null; + try { + connector = await deps.createConnector(input.connectionId); + if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) { + throw new Error( + `Connection "${input.connectionId}" driver "${connector.driver}" does not support read-only SQL execution.`, + ); + } + const ctx: KtxScanContext = { runId: deps.runId ?? 'sql-execution' }; + const result = await connector.executeReadOnly( + { connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows }, + ctx, + ); + return { + headers: result.headers, + ...(result.headerTypes ? { headerTypes: result.headerTypes } : {}), + rows: result.rows, + totalRows: result.totalRows, + command: 'SELECT', + rowCount: result.rowCount, + }; + } finally { + await connector?.cleanup?.(); + } +} diff --git a/packages/cli/src/context/connections/query-executor.ts b/packages/cli/src/context/connections/query-executor.ts index a397dfc3..0f963c63 100644 --- a/packages/cli/src/context/connections/query-executor.ts +++ b/packages/cli/src/context/connections/query-executor.ts @@ -8,8 +8,9 @@ export interface KtxSqlQueryExecutionInput { maxRows?: number; } -interface KtxSqlQueryExecutionResult { +export interface KtxSqlQueryExecutionResult { headers: string[]; + headerTypes?: string[]; rows: unknown[][]; totalRows: number; command: string; diff --git a/packages/cli/src/context/ingest/adapters/live-database/manifest.ts b/packages/cli/src/context/ingest/adapters/live-database/manifest.ts index 3c35b463..2e864528 100644 --- a/packages/cli/src/context/ingest/adapters/live-database/manifest.ts +++ b/packages/cli/src/context/ingest/adapters/live-database/manifest.ts @@ -86,6 +86,9 @@ export interface BuildLiveDatabaseManifestShardsInput { existingPreservedJoins?: Map; existingDescriptions?: Map; existingUsage?: Map; + // Table refs owned by other federated members; declared cross-DB joins to + // these survive even though the target has no shard in this snapshot. + federatedSiblingTargets?: Set; } export interface BuildLiveDatabaseManifestShardsResult { @@ -204,15 +207,20 @@ function joinCondition( .join(' AND '); } -function buildJoinsByTable( +/** @internal */ +export function buildJoinsByTable( tableNames: Set, joins: LiveDatabaseManifestJoinData[], preservedJoins: Map, + federatedSiblingTargets: Set = new Set(), ): Map { const joinsByTable = new Map(); for (const join of joins) { - if (!tableNames.has(join.fromTable) || !tableNames.has(join.toTable)) { + const fromLocal = tableNames.has(join.fromTable); + const toLocal = tableNames.has(join.toTable); + const toSibling = federatedSiblingTargets.has(join.toTable); + if (!fromLocal || (!toLocal && !toSibling)) { continue; } const relationship = RELATIONSHIP_MAP[join.relationship] ?? join.relationship; @@ -223,13 +231,17 @@ function buildJoinsByTable( source: join.source, }); - const reverseRelationship = RELATIONSHIP_INVERSE[relationship] ?? 'one_to_many'; - addJoinOnce(joinsByTable, join.toTable, { - to: join.fromTable, - on: joinCondition(join.toTable, join.toColumns, join.fromTable, join.fromColumns), - relationship: reverseRelationship, - source: join.source, - }); + // Reverse direction only when the target is a local table in THIS snapshot; + // a federated sibling has no shard here, so it gets no reverse entry. + if (toLocal) { + const reverseRelationship = RELATIONSHIP_INVERSE[relationship] ?? 'one_to_many'; + addJoinOnce(joinsByTable, join.toTable, { + to: join.fromTable, + on: joinCondition(join.toTable, join.toColumns, join.fromTable, join.fromColumns), + relationship: reverseRelationship, + source: join.source, + }); + } } for (const [tableName, tableJoins] of preservedJoins) { @@ -237,7 +249,7 @@ function buildJoinsByTable( continue; } for (const join of tableJoins) { - if (tableNames.has(join.to)) { + if (tableNames.has(join.to) || federatedSiblingTargets.has(join.to)) { addJoinOnce(joinsByTable, tableName, join); } } @@ -250,7 +262,12 @@ export function buildLiveDatabaseManifestShards( input: BuildLiveDatabaseManifestShardsInput, ): BuildLiveDatabaseManifestShardsResult { const tableNames = new Set(input.tables.map((table) => table.name)); - const joinsByTable = buildJoinsByTable(tableNames, input.joins, input.existingPreservedJoins ?? new Map()); + const joinsByTable = buildJoinsByTable( + tableNames, + input.joins, + input.existingPreservedJoins ?? new Map(), + input.federatedSiblingTargets ?? new Set(), + ); const shards = new Map(); for (const table of input.tables) { diff --git a/packages/cli/src/context/mcp/context-tools.ts b/packages/cli/src/context/mcp/context-tools.ts index 28e7a9c7..94b889c4 100644 --- a/packages/cli/src/context/mcp/context-tools.ts +++ b/packages/cli/src/context/mcp/context-tools.ts @@ -56,7 +56,7 @@ const toolAnnotations = { const toolDescriptions = { connection_list: - 'List configured read-only data connections available to this ktx project. Use this before connection-scoped tools when the project may have multiple warehouses.', + 'List configured read-only data connections available to this ktx project. Use this before connection-scoped tools when the project may have multiple warehouses. A "_ktx_federated" entry (when present) queries all its member databases together; use its id for cross-database joins.', discover_data: 'Search across ktx wiki pages, semantic-layer sources, measures, dimensions, raw tables, and columns. Example: discover_data({ query: "monthly orders by customer", connectionId: "warehouse", kinds: ["sl_source", "table"] }).', wiki_search: @@ -227,6 +227,8 @@ const connectionListOutputSchema = z.object({ id: z.string(), name: z.string(), connectionType: z.string(), + members: z.array(z.string()).optional(), + hint: z.string().optional(), }), ), }); diff --git a/packages/cli/src/context/mcp/local-project-ports.ts b/packages/cli/src/context/mcp/local-project-ports.ts index 7c2863cb..bf1af94a 100644 --- a/packages/cli/src/context/mcp/local-project-ports.ts +++ b/packages/cli/src/context/mcp/local-project-ports.ts @@ -1,12 +1,16 @@ import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-executor.js'; -import { resolveConfiguredConnection } from '../../context/connections/resolve-connection.js'; import { KtxExpectedError, KtxQueryError, isNativeProgrammingFault } from '../../errors.js'; -import { localConnectionInfoFromConfig } from '../../context/connections/local-warehouse-descriptor.js'; +import { executeProjectReadOnlySql } from '../../context/connections/project-sql-executor.js'; +import { FEDERATED_CONNECTION_ID, federatedConnectionListing } from '../../context/connections/federation.js'; +import { resolveConfiguredConnection } from '../../context/connections/resolve-connection.js'; +import { + type LocalConnectionInfo, + localConnectionInfoFromConfig, +} from '../../context/connections/local-warehouse-descriptor.js'; import type { KtxEmbeddingPort } from '../../context/core/embedding.js'; import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js'; import type { KtxLocalProject } from '../../context/project/project.js'; import { createKtxEntityDetailsService } from '../../context/scan/entity-details.js'; -import type { KtxScanConnector } from '../../context/scan/types.js'; import type { LocalScanMcpOptions } from '../../context/scan/local-scan.js'; import { createKtxDiscoverDataService } from '../../context/search/discover.js'; import { sqlAnalysisDialectForDriver } from '../../context/sql-analysis/dialect.js'; @@ -26,12 +30,6 @@ interface CreateLocalProjectMcpContextPortsOptions { embeddingService: KtxEmbeddingPort | null; } -async function cleanupConnector(connector: KtxScanConnector | null): Promise { - if (connector?.cleanup) { - await connector.cleanup(); - } -} - async function executeValidatedReadOnlySql( project: KtxLocalProject, options: CreateLocalProjectMcpContextPortsOptions, @@ -39,60 +37,57 @@ async function executeValidatedReadOnlySql( onProgress?: KtxMcpProgressCallback, ): Promise { await onProgress?.({ progress: 0, message: 'Validating SQL' }); - const connectionId = assertSafeConnectionId(input.connectionId); - const connection = resolveConfiguredConnection(project.config, connectionId); if (!options.sqlAnalysis) { throw new Error('sql_execution requires parser-backed SQL validation.'); } - const validation = await options.sqlAnalysis.validateReadOnly(input.sql, sqlAnalysisDialectForDriver(connection.driver)); - if (!validation.ok) { - // A read-only guard rejecting the agent's SQL is an expected outcome, not a - // ktx fault: classify it so reportException keeps it out of Error Tracking. - throw new KtxQueryError(validation.error ?? 'SQL is not read-only.'); - } const createConnector = options.localScan?.createConnector; if (!createConnector) { throw new Error('sql_execution requires a local scan connector factory.'); } - let connector: KtxScanConnector | null = null; - try { - connector = await createConnector(connectionId); - if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) { - throw new Error(`Connection "${connectionId}" does not support read-only SQL execution.`); - } - await onProgress?.({ progress: 0.3, message: 'Executing' }); - const result = await connector - .executeReadOnly( - { - connectionId, - sql: input.sql, - maxRows: input.maxRows, - }, - { runId: 'mcp-sql-execution' }, - ) - .catch((error: unknown) => { - // A warehouse/driver rejection (e.g. the agent's SQL failed to compile) - // is a surfaced operational outcome, not a ktx fault: mark it expected - // while preserving the warehouse's own diagnostics. A native JS error - // (TypeError, etc.) signals a bug in connector code — let it propagate - // unchanged so Error Tracking still sees it. - if (isNativeProgrammingFault(error) || error instanceof KtxExpectedError) { - throw error; - } - throw new KtxQueryError(error instanceof Error ? error.message : String(error), { cause: error }); - }); - const response = { - headers: result.headers, - ...(result.headerTypes ? { headerTypes: result.headerTypes } : {}), - rows: result.rows, - rowCount: result.rowCount ?? result.rows.length, - }; - await onProgress?.({ progress: 1, message: `Fetched ${response.rowCount} rows` }); - return response; - } finally { - await cleanupConnector(connector); + const isFederated = input.connectionId === FEDERATED_CONNECTION_ID; + const connectionId = isFederated ? input.connectionId : assertSafeConnectionId(input.connectionId); + const connection = isFederated ? undefined : resolveConfiguredConnection(project.config, connectionId); + + const dialect = sqlAnalysisDialectForDriver(isFederated ? 'duckdb' : connection!.driver); + const validation = await options.sqlAnalysis.validateReadOnly(input.sql, dialect); + if (!validation.ok) { + // A read-only guard rejecting the agent's SQL is an expected outcome, not a + // ktx fault: classify it so reportException keeps it out of Error Tracking. + throw new KtxQueryError(validation.error ?? 'SQL is not read-only.'); } + + await onProgress?.({ progress: 0.3, message: 'Executing' }); + const result = await executeProjectReadOnlySql({ + project, + input: { + connectionId, + projectDir: project.projectDir, + connection, + sql: input.sql, + maxRows: input.maxRows, + }, + createConnector, + runId: 'mcp-sql-execution', + }).catch((error: unknown) => { + // A warehouse/driver rejection (e.g. the agent's SQL failed to compile) is a + // surfaced operational outcome, not a ktx fault: mark it expected while + // preserving the warehouse's own diagnostics. A native JS error (TypeError, + // etc.) signals a bug in connector code — let it propagate unchanged so Error + // Tracking still sees it. + if (isNativeProgrammingFault(error) || error instanceof KtxExpectedError) { + throw error; + } + throw new KtxQueryError(error instanceof Error ? error.message : String(error), { cause: error }); + }); + const response = { + headers: result.headers, + ...(result.headerTypes ? { headerTypes: result.headerTypes } : {}), + rows: result.rows, + rowCount: result.rowCount ?? result.rows.length, + }; + await onProgress?.({ progress: 1, message: `Fetched ${response.rowCount} rows` }); + return response; } export function createLocalProjectMcpContextPorts( @@ -103,12 +98,21 @@ export function createLocalProjectMcpContextPorts( const ports: KtxMcpContextPorts = { connections: { async list() { - return Object.entries(project.config.connections) + const configured = Object.entries(project.config.connections) .map(([id, config]) => localConnectionInfoFromConfig(id, config)) - .filter( - (connection): connection is { id: string; name: string; connectionType: string } => connection !== null, - ) + .filter((connection): connection is LocalConnectionInfo => connection !== null) .sort((a, b) => a.id.localeCompare(b.id)); + const federated = federatedConnectionListing(project.config.connections, project.projectDir); + if (federated) { + configured.push({ + id: federated.id, + name: federated.id, + connectionType: 'DUCKDB', + members: federated.members, + hint: federated.hint, + }); + } + return configured; }, }, knowledge: { diff --git a/packages/cli/src/context/mcp/types.ts b/packages/cli/src/context/mcp/types.ts index 3694e3d6..e48d0975 100644 --- a/packages/cli/src/context/mcp/types.ts +++ b/packages/cli/src/context/mcp/types.ts @@ -78,6 +78,8 @@ interface KtxConnectionSummary { id: string; name: string; connectionType: string; + members?: string[]; + hint?: string; } interface KtxConnectionsMcpPort { diff --git a/packages/cli/src/context/scan/local-enrichment-artifacts.ts b/packages/cli/src/context/scan/local-enrichment-artifacts.ts index e2072d6b..798107b8 100644 --- a/packages/cli/src/context/scan/local-enrichment-artifacts.ts +++ b/packages/cli/src/context/scan/local-enrichment-artifacts.ts @@ -4,6 +4,7 @@ import type { TableUsageOutput } from '../../context/ingest/adapters/historic-sq import type { KtxScanRelationshipConfig } from '../project/config.js'; import type { KtxLocalProject } from '../../context/project/project.js'; import { isSlYamlPath } from '../../context/sl/source-files.js'; +import { deriveFederatedConnection } from '../connections/federation.js'; import type { KtxLocalScanEnrichmentResult } from './local-enrichment.js'; import { buildKtxRelationshipArtifacts, @@ -193,10 +194,43 @@ function joinReferencesExistingColumns( return true; } +async function federatedSiblingTargets( + project: KtxLocalProject, + connectionId: string, +): Promise> { + const descriptor = deriveFederatedConnection(project.config.connections, project.projectDir); + if (!descriptor) { + return new Set(); + } + const siblings = descriptor.members.filter((member) => member.connectionId !== connectionId); + const perSibling = await Promise.all(siblings.map((sibling) => siblingJoinTargets(project, sibling.connectionId))); + return new Set(perSibling.flat()); +} + +async function siblingJoinTargets(project: KtxLocalProject, connectionId: string): Promise { + const listed = await project.fileStore.listFiles(schemaDir(connectionId)).catch(() => ({ files: [] })); + const files = listed.files.filter(isSlYamlPath); + const perFile = await Promise.all( + files.map(async (file) => { + const shard = await project.fileStore + .readFile(file) + .then(({ content }) => YAML.parse(content) as LiveDatabaseManifestShard | null) + .catch(() => null); + // entry.table is buildTableRef's member-local ref (1-3 parts: + // table / schema.table / catalog.schema.table), never connectionId- + // prefixed — so prefixing with the member id yields the fully-qualified + // `to:` form authored in cross-DB joins. + return Object.values(shard?.tables ?? {}).map((entry) => `${connectionId}.${entry.table}`); + }), + ); + return perFile.flat(); +} + async function loadExistingManifestState( project: KtxLocalProject, connectionId: string, snapshot: KtxSchemaSnapshot, + siblingTargets: Set, ): Promise { const descriptions = new Map(); const preservedJoins = new Map(); @@ -236,7 +270,7 @@ async function loadExistingManifestState( const joins = (entry.joins ?? []).filter((join) => { return ( (join.source === 'manual' || join.source === 'inferred') && - validTableNames.has(join.to) && + (validTableNames.has(join.to) || siblingTargets.has(join.to)) && joinReferencesExistingColumns(join, columnsByTable) ); }); @@ -277,7 +311,13 @@ export async function writeLocalScanManifestShards( }; } - const existing = await loadExistingManifestState(input.project, input.connectionId, input.snapshot); + const siblingTargets = await federatedSiblingTargets(input.project, input.connectionId); + const existing = await loadExistingManifestState( + input.project, + input.connectionId, + input.snapshot, + siblingTargets, + ); const { shards } = buildLiveDatabaseManifestShards({ connectionType: input.driver.toUpperCase(), tables: snapshotTablesToManifestData(input.snapshot, input.descriptionUpdates), @@ -285,6 +325,7 @@ export async function writeLocalScanManifestShards( existingDescriptions: existing.descriptions, existingPreservedJoins: existing.preservedJoins, existingUsage: existing.usage, + federatedSiblingTargets: siblingTargets, mapColumnType: (dimensionType) => dimensionType, }); diff --git a/packages/cli/src/context/sl/local-query.ts b/packages/cli/src/context/sl/local-query.ts index 1622bd2d..384b4762 100644 --- a/packages/cli/src/context/sl/local-query.ts +++ b/packages/cli/src/context/sl/local-query.ts @@ -2,16 +2,23 @@ import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-ex import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js'; import type { KtxMcpProgressCallback } from '../mcp/types.js'; import type { KtxLocalProject } from '../../context/project/project.js'; +import { FEDERATED_CONNECTION_ID } from '../connections/federation.js'; import { resolveRequiredConnectionId } from '../connections/resolve-connection.js'; import { sqlAnalysisDialectForDriver } from '../sql-analysis/dialect.js'; import { loadLocalSlSourceRecords } from './local-sl.js'; import { toResolvedWire } from './semantic-layer.service.js'; import { assertSafeConnectionId } from './source-files.js'; -import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput } from './types.js'; +import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput, SemanticLayerSource } from './types.js'; const COMPILE_ONLY_REASON = 'Local semantic-layer query compiled SQL but no data-source execution adapter is configured.'; +const FEDERATED_SL_QUERY_UNSUPPORTED = + `Semantic-layer queries are per-connection and cannot target the federated connection '${FEDERATED_CONNECTION_ID}'. ` + + `Run a cross-database query as read-only SQL instead — ktx sql -c ${FEDERATED_CONNECTION_ID} "SELECT ..." or the sql_execution tool — ` + + 'using catalog-qualified table names (connectionId.schema.table, or connectionId.table for sqlite; ' + + 'double-quote ids that are not bare identifiers, e.g. "books-db".public.books).'; + export interface CompileLocalSlQueryOptions { connectionId?: string; query: SemanticLayerQueryInput; @@ -31,13 +38,33 @@ function resolveLocalConnectionId(project: KtxLocalProject, requested: string | return assertSafeConnectionId(resolveRequiredConnectionId(project.config, requested)); } +// The planner rejects a source set carrying a join whose `to` names a source +// outside that set, which would break every query for this connection. Keep only +// joins resolvable within the connection's own sources; a cross-database join +// (its `to` qualified by a sibling connection id) is just one such unresolvable +// target and runs as raw SQL instead. Membership is the test, not a connection-id +// prefix match, so a same-connection target whose name collides with a sibling +// connection id is preserved. +function withResolvableJoinsOnly( + source: SemanticLayerSource, + knownSourceNames: ReadonlySet, +): SemanticLayerSource { + if (source.joins.length === 0) { + return source; + } + const joins = source.joins.filter((join) => knownSourceNames.has(join.to)); + return joins.length === source.joins.length ? source : { ...source, joins }; +} + async function loadComputableSources( project: KtxLocalProject, connectionId: string, ): Promise[]> { - return (await loadLocalSlSourceRecords(project, { connectionId: assertSafeConnectionId(connectionId) })) - .filter((record) => record.source.table || record.source.sql) - .map((record) => toResolvedWire(record.source)); + const records = (await loadLocalSlSourceRecords(project, { connectionId })).filter( + (record) => record.source.table || record.source.sql, + ); + const knownSourceNames = new Set(records.map((record) => record.source.name)); + return records.map((record) => toResolvedWire(withResolvableJoinsOnly(record.source, knownSourceNames))); } function headersFromColumns(columns: Array>): string[] { @@ -50,9 +77,13 @@ export async function compileLocalSlQuery( project: KtxLocalProject, options: CompileLocalSlQueryOptions, ): Promise { + if (options.connectionId === FEDERATED_CONNECTION_ID) { + throw new Error(FEDERATED_SL_QUERY_UNSUPPORTED); + } await options.onProgress?.({ progress: 0, message: 'Compiling query' }); const connectionId = resolveLocalConnectionId(project, options.connectionId); - const dialect = sqlAnalysisDialectForDriver(project.config.connections[connectionId]?.driver); + const driver = project.config.connections[connectionId]?.driver; + const dialect = sqlAnalysisDialectForDriver(driver); const sources = await loadComputableSources(project, connectionId); await options.onProgress?.({ progress: 0.3, message: 'Generating SQL' }); @@ -107,7 +138,7 @@ export async function compileLocalSlQuery( ...response.plan, execution: { mode: 'executed', - driver: project.config.connections[connectionId]?.driver ?? 'unknown', + driver: driver ?? 'unknown', maxRows, rowCount: execution.rowCount, }, diff --git a/packages/cli/src/context/sl/local-sl.ts b/packages/cli/src/context/sl/local-sl.ts index 1c12ef67..6b8509b4 100644 --- a/packages/cli/src/context/sl/local-sl.ts +++ b/packages/cli/src/context/sl/local-sl.ts @@ -2,6 +2,7 @@ import { join } from 'node:path'; import YAML from 'yaml'; import { z } from 'zod'; import type { KtxEmbeddingPort } from '../../context/core/embedding.js'; +import { deriveFederatedConnection, FEDERATED_CONNECTION_ID } from '../connections/federation.js'; import type { KtxLocalProject } from '../../context/project/project.js'; import { HybridSearchCore } from '../../context/search/hybrid-search-core.js'; import type { SearchCandidateGenerator } from '../../context/search/types.js'; @@ -169,7 +170,38 @@ export async function loadLocalSlSourceRecords( project: KtxLocalProject, input: { connectionId: string }, ): Promise { - const connectionId = assertSafeConnectionId(input.connectionId); + if (input.connectionId === FEDERATED_CONNECTION_ID) { + const descriptor = deriveFederatedConnection(project.config.connections, project.projectDir); + if (!descriptor) { + return []; + } + const perMember = await Promise.all( + descriptor.members.map(async (member) => { + const records = await loadSingleConnectionSourceRecords(project, member.connectionId); + return records.map((record) => { + // The federated view is one virtual connection: rows carry its id and a + // member-prefixed name, so a listing/search row round-trips to + // `ktx sl -c _ktx_federated read `. Member origin lives in the name. + const name = `${member.connectionId}.${record.name}`; + return { + ...record, + connectionId: FEDERATED_CONNECTION_ID, + name, + source: { ...record.source, name }, + }; + }); + }), + ); + return perMember.flat(); + } + return loadSingleConnectionSourceRecords(project, input.connectionId); +} + +async function loadSingleConnectionSourceRecords( + project: KtxLocalProject, + rawConnectionId: string, +): Promise { + const connectionId = assertSafeConnectionId(rawConnectionId); const dir = `semantic-layer/${connectionId}`; const schemaDir = `${dir}/_schema`; const listed = await project.fileStore.listFiles(dir); diff --git a/packages/cli/src/context/sl/source-files.ts b/packages/cli/src/context/sl/source-files.ts index ae44c683..6d2e361d 100644 --- a/packages/cli/src/context/sl/source-files.ts +++ b/packages/cli/src/context/sl/source-files.ts @@ -23,7 +23,15 @@ function assertSafePathToken(kind: string, value: string): string { return value; } +/** @internal */ +export function isReservedConnectionId(connectionId: string): boolean { + return connectionId.startsWith('_ktx_'); +} + export function assertSafeConnectionId(connectionId: string): string { + if (isReservedConnectionId(connectionId)) { + throw new Error(`Connection id "${connectionId}" uses the reserved "_ktx_" prefix.`); + } if (!isSafeConnectionId(connectionId)) { throw new Error(`Unsafe connection id: ${connectionId}`); } diff --git a/packages/cli/src/ingest-query-executor.ts b/packages/cli/src/ingest-query-executor.ts index f8b6880d..2671beee 100644 --- a/packages/cli/src/ingest-query-executor.ts +++ b/packages/cli/src/ingest-query-executor.ts @@ -1,16 +1,14 @@ +import { executeFederatedQuery } from './connectors/duckdb/federated-executor.js'; import type { KtxSqlQueryExecutionInput, KtxSqlQueryExecutorPort } from './context/connections/query-executor.js'; +import { executeProjectReadOnlySql } from './context/connections/project-sql-executor.js'; import type { KtxLocalProject } from './context/project/project.js'; -import type { KtxScanConnector, KtxScanContext } from './context/scan/types.js'; import { createKtxCliScanConnector } from './local-scan-connectors.js'; type CreateConnector = typeof createKtxCliScanConnector; export interface KtxCliIngestQueryExecutorDeps { createConnector?: CreateConnector; -} - -async function cleanupConnector(connector: KtxScanConnector | null): Promise { - await connector?.cleanup?.(); + executeFederated?: typeof executeFederatedQuery; } export function createKtxCliIngestQueryExecutor( @@ -20,30 +18,13 @@ export function createKtxCliIngestQueryExecutor( const createConnector = deps.createConnector ?? createKtxCliScanConnector; return { async execute(input: KtxSqlQueryExecutionInput) { - let connector: KtxScanConnector | null = null; - try { - connector = await createConnector(project, input.connectionId); - if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) { - throw new Error( - `Connection "${input.connectionId}" driver "${connector.driver}" does not support read-only SQL execution.`, - ); - } - - const ctx: KtxScanContext = { runId: 'ingest-sql-execution' }; - const result = await connector.executeReadOnly( - { connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows }, - ctx, - ); - return { - headers: result.headers, - rows: result.rows, - totalRows: result.totalRows, - command: 'SELECT', - rowCount: result.rowCount, - }; - } finally { - await cleanupConnector(connector); - } + return executeProjectReadOnlySql({ + project, + input, + createConnector: (connectionId) => createConnector(project, connectionId), + executeFederated: deps.executeFederated, + runId: 'ingest-sql-execution', + }); }, }; } diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index 9b7aa189..9f9f828d 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -3,6 +3,7 @@ import { readFile, writeFile } from 'node:fs/promises'; import { delimiter, dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; import { promisify } from 'node:util'; +import { deriveFederatedConnection, FEDERATED_CONNECTION_ID } from './context/connections/federation.js'; import { getDriverRegistration } from './context/connections/drivers.js'; import { createLocalKtxLlmRuntimeFromConfig } from './context/llm/local-config.js'; import type { KtxLlmRuntimePort } from './context/llm/runtime-port.js'; @@ -1171,6 +1172,26 @@ async function writeConnectionConfig(input: { if (queryHistory?.enabled === true) { await ensureHistoricSqlIngestDefaults(input.projectDir); } + + if (input.io) { + const federationNotice = federationNoticeFor(config.connections, input.projectDir); + if (federationNotice) { + writeSetupSection(input.io, 'Federated connection available', [federationNotice]); + } + } +} + +/** @internal */ +export function federationNoticeFor( + connections: Record, + projectDir: string, +): string | null { + const descriptor = deriveFederatedConnection(connections, projectDir); + if (!descriptor) { + return null; + } + const names = descriptor.members.map((m) => m.connectionId).join(', '); + return `Detected ${descriptor.members.length} attach-compatible databases (${names}). Run a cross-database join as read-only SQL against \`${FEDERATED_CONNECTION_ID}\` (ktx sql -c ${FEDERATED_CONNECTION_ID} "SELECT ..."), using catalog-qualified table names.`; } async function disableConnectionQueryHistory(projectDir: string, connectionId: string): Promise { diff --git a/packages/cli/src/setup-sources.ts b/packages/cli/src/setup-sources.ts index a92dea6e..70f42a67 100644 --- a/packages/cli/src/setup-sources.ts +++ b/packages/cli/src/setup-sources.ts @@ -20,6 +20,7 @@ import type { KtxCliIo } from './cli-runtime.js'; import { createCliSpinner, errorMessage, writePrefixedLines } from './clack.js'; import { pickNotionRootPages } from './notion-page-picker.js'; import { runKtxSourceMapping } from './source-mapping.js'; +import { assertSafeConnectionId } from './context/sl/source-files.js'; import { runConnectionSetupWithRecovery, type ConfigureResult, @@ -206,12 +207,6 @@ async function promptText( return await prompts.text({ ...options, message: withTextInputNavigation(options.message) }); } -function assertSafeConnectionId(connectionId: string): void { - if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) { - throw new Error(`Unsafe connection id: ${connectionId}`); - } -} - function credentialRef(value: string | undefined, label: string): string { const ref = value?.trim(); if (!ref) { diff --git a/packages/cli/src/sql.ts b/packages/cli/src/sql.ts index cbfcbc47..8a5431eb 100644 --- a/packages/cli/src/sql.ts +++ b/packages/cli/src/sql.ts @@ -1,6 +1,10 @@ +import { executeFederatedQuery } from './connectors/duckdb/federated-executor.js'; +import { FEDERATED_CONNECTION_ID } from './context/connections/federation.js'; +import { executeProjectReadOnlySql } from './context/connections/project-sql-executor.js'; +import type { KtxSqlQueryExecutionResult } from './context/connections/query-executor.js'; import { resolveConfiguredConnection } from './context/connections/resolve-connection.js'; import { loadKtxProject, type KtxLocalProject } from './context/project/project.js'; -import type { KtxQueryResult, KtxScanConnector } from './context/scan/types.js'; +import { sqlAnalysisDialectForDriver } from './context/sql-analysis/dialect.js'; import type { SqlAnalysisDialect, SqlAnalysisPort } from './context/sql-analysis/ports.js'; import type { KtxCliIo } from './cli-runtime.js'; import { type KtxOutputMode, resolveOutputMode } from './io/mode.js'; @@ -31,6 +35,7 @@ export interface KtxSqlDeps { loadProject?: typeof loadKtxProject; createSqlAnalysis?: () => SqlAnalysisPort; createScanConnector?: typeof createKtxCliScanConnector; + executeFederated?: typeof executeFederatedQuery; } interface SqlExecutionOutput { @@ -41,20 +46,6 @@ interface SqlExecutionOutput { rowCount: number; } -function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDialect { - const normalized = String(driver ?? '').trim().toLowerCase(); - const map: Record = { - postgres: 'postgres', - bigquery: 'bigquery', - snowflake: 'snowflake', - mysql: 'mysql', - sqlserver: 'tsql', - sqlite: 'sqlite', - clickhouse: 'clickhouse', - }; - return map[normalized] ?? 'postgres'; -} - function queryVerb(sql: string): 'select' | 'explain' | 'show' | 'with' | 'other' { const first = sql.trim().split(/\s+/, 1)[0]?.toLowerCase(); if (first === 'select' || first === 'explain' || first === 'show' || first === 'with') { @@ -124,13 +115,7 @@ function printSqlResult(output: SqlExecutionOutput, mode: KtxSqlOutputMode, io: printPretty(output, io); } -async function cleanupConnector(connector: KtxScanConnector | null): Promise { - if (connector?.cleanup) { - await connector.cleanup(); - } -} - -function resultOutput(connectionId: string, result: KtxQueryResult): SqlExecutionOutput { +function resultOutput(connectionId: string, result: KtxSqlQueryExecutionResult): SqlExecutionOutput { return { connectionId, headers: result.headers, @@ -147,9 +132,10 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps: let project: KtxLocalProject | undefined; try { project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir }); - const connection = resolveConfiguredConnection(project.config, args.connectionId); - driver = String(connection.driver ?? 'unknown').toLowerCase(); - demoConnection = isDemoConnection(args.connectionId, connection); + const isFederated = args.connectionId === FEDERATED_CONNECTION_ID; + const connection = isFederated ? undefined : resolveConfiguredConnection(project.config, args.connectionId); + driver = isFederated ? 'duckdb' : String(connection?.driver ?? 'unknown').toLowerCase(); + demoConnection = isFederated ? false : isDemoConnection(args.connectionId, connection); const createSqlAnalysis = deps.createSqlAnalysis ?? @@ -161,7 +147,7 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps: io, })); const analysisPort = createSqlAnalysis(); - const dialect = sqlAnalysisDialectForDriver(connection.driver); + const dialect: SqlAnalysisDialect = isFederated ? 'duckdb' : sqlAnalysisDialectForDriver(connection?.driver); const validation = await analysisPort.validateReadOnly(args.sql, dialect); if (!validation.ok) { throw new Error(validation.error ?? 'SQL is not read-only.'); @@ -169,39 +155,35 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps: const referencedTableCount = await safeReferencedTableCount(analysisPort, args.sql, dialect); const createScanConnector = deps.createScanConnector ?? createKtxCliScanConnector; - let connector: KtxScanConnector | null = null; - try { - connector = await createScanConnector(project, args.connectionId); - if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) { - throw new Error(`Connection "${args.connectionId}" does not support read-only SQL execution.`); - } - const result = await connector.executeReadOnly( - { - connectionId: args.connectionId, - sql: args.sql, - maxRows: args.maxRows, - }, - { runId: 'cli-sql' }, - ); - const mode = resolveOutputMode({ explicit: args.output, json: args.json, io }); - printSqlResult(resultOutput(args.connectionId, result), mode, io); - await emitTelemetryEvent({ - name: 'sql_completed', + const result = await executeProjectReadOnlySql({ + project, + input: { + connectionId: args.connectionId, projectDir: args.projectDir, - io, - fields: { - driver, - isDemoConnection: demoConnection, - queryVerb: queryVerb(args.sql), - referencedTableCount, - durationMs: Math.max(0, performance.now() - startedAt), - outcome: 'ok', - }, - }); - return 0; - } finally { - await cleanupConnector(connector); - } + connection, + sql: args.sql, + maxRows: args.maxRows, + }, + createConnector: (connectionId) => createScanConnector(project!, connectionId), + executeFederated: deps.executeFederated, + runId: 'cli-sql', + }); + const mode = resolveOutputMode({ explicit: args.output, json: args.json, io }); + printSqlResult(resultOutput(args.connectionId, result), mode, io); + await emitTelemetryEvent({ + name: 'sql_completed', + projectDir: args.projectDir, + io, + fields: { + driver, + isDemoConnection: demoConnection, + queryVerb: queryVerb(args.sql), + referencedTableCount, + durationMs: Math.max(0, performance.now() - startedAt), + outcome: 'ok', + }, + }); + return 0; } catch (error) { const errorClass = scrubErrorClass(error); await emitTelemetryEvent({ diff --git a/packages/cli/test/connection-list-federated.test.ts b/packages/cli/test/connection-list-federated.test.ts new file mode 100644 index 00000000..edb3c72b --- /dev/null +++ b/packages/cli/test/connection-list-federated.test.ts @@ -0,0 +1,66 @@ +import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { runKtxConnection } from '../src/connection.js'; +import { initKtxProject } from '../src/context/project/project.js'; +import { parseKtxProjectConfig, serializeKtxProjectConfig } from '../src/context/project/config.js'; +import type { KtxProjectConnectionConfig } from '../src/context/project/config.js'; + +function makeIo() { + const out: string[] = []; + return { + io: { + stdout: { isTTY: false, write: (c: string) => { out.push(c); return true; } }, + stderr: { write: () => true }, + }, + stdout: () => out.join(''), + }; +} + +async function writeConnections( + projectDir: string, + connections: Record, +): Promise { + const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + await writeFile(join(projectDir, 'ktx.yaml'), serializeKtxProjectConfig({ ...config, connections }), 'utf-8'); +} + +describe('ktx connection list federated entry', () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-conn-fed-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('shows _ktx_federated when 2+ attach-compatible connections exist', async () => { + const projectDir = join(tempDir, 'project'); + await initKtxProject({ projectDir }); + await writeConnections(projectDir, { + books_db: { driver: 'sqlite' }, + reviews_db: { driver: 'sqlite' }, + }); + const io = makeIo(); + const code = await runKtxConnection({ command: 'list', projectDir }, io.io); + const printed = io.stdout(); + expect(code).toBe(0); + expect(printed).toContain('_ktx_federated'); + expect(printed).toContain('books_db, reviews_db'); + expect(printed).toContain('Cross-database queries run here'); + }); + + it('omits _ktx_federated with a single connection', async () => { + const projectDir = join(tempDir, 'project'); + await initKtxProject({ projectDir }); + await writeConnections(projectDir, { + books_db: { driver: 'sqlite' }, + }); + const io = makeIo(); + await runKtxConnection({ command: 'list', projectDir }, io.io); + expect(io.stdout()).not.toContain('_ktx_federated'); + }); +}); diff --git a/packages/cli/test/connectors/duckdb/federated-attach.test.ts b/packages/cli/test/connectors/duckdb/federated-attach.test.ts new file mode 100644 index 00000000..7d16fb47 --- /dev/null +++ b/packages/cli/test/connectors/duckdb/federated-attach.test.ts @@ -0,0 +1,143 @@ +import { mkdtempSync, writeFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it } from 'vitest'; +import { federatedAttachTarget } from '../../../src/connectors/duckdb/federated-attach.js'; +import type { FederatedMember } from '../../../src/context/connections/federation.js'; + +const member = (over: Partial): FederatedMember => ({ + connectionId: 'm', + driver: 'sqlite', + projectDir: '/proj', + connection: { driver: 'sqlite' }, + ...over, +}); + +describe('federatedAttachTarget', () => { + it('resolves a sqlite path: config to an absolute filesystem path against projectDir', () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-attach-')); + writeFileSync(join(dir, 'reviews.db'), ''); + try { + const target = federatedAttachTarget( + member({ driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: './reviews.db' } }), + {}, + ); + expect(target).toBe(join(dir, 'reviews.db')); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it('resolves a sqlite file:// url to a filesystem path', () => { + const target = federatedAttachTarget( + member({ driver: 'sqlite', connection: { driver: 'sqlite', url: 'file:///data/reviews.db' } }), + {}, + ); + expect(target).toBe('/data/reviews.db'); + }); + + it('builds a libpq connection string for postgres from host/database/user', () => { + const target = federatedAttachTarget( + member({ + driver: 'postgres', + connection: { driver: 'postgres', host: 'h', port: 5433, database: 'books', username: 'u', password: 'p' }, + }), + {}, + ); + expect(target).toContain('host=h'); + expect(target).toContain('port=5433'); + expect(target).toContain('dbname=books'); + expect(target).toContain('user=u'); + expect(target).toContain('password=p'); + }); + + it('passes a postgres url through as the connection string', () => { + const target = federatedAttachTarget( + member({ driver: 'postgres', connection: { driver: 'postgres', url: 'env:PG_URL' } }), + { PG_URL: 'postgresql://localhost/books' }, + ); + expect(target).toBe('postgresql://localhost/books'); + }); + + it('adds sslmode=require to a postgres url when the member sets ssl', () => { + const target = federatedAttachTarget( + member({ driver: 'postgres', connection: { driver: 'postgres', url: 'env:PG_URL', ssl: true } }), + { PG_URL: 'postgresql://localhost/books' }, + ); + expect(target).toContain('sslmode=require'); + }); + + it('keeps a stronger sslmode already pinned in a postgres url', () => { + const target = federatedAttachTarget( + member({ driver: 'postgres', connection: { driver: 'postgres', url: 'env:PG_URL', ssl: true } }), + { PG_URL: 'postgresql://localhost/books?sslmode=verify-full' }, + ); + expect(target).toContain('sslmode=verify-full'); + expect(target).not.toContain('sslmode=require'); + }); + + it('builds a mysql connection string from host/database/user', () => { + const target = federatedAttachTarget( + member({ + driver: 'mysql', + connection: { driver: 'mysql', host: 'h', port: 3307, database: 'app', username: 'u', password: 'p' }, + }), + {}, + ); + expect(target).toContain('host=h'); + expect(target).toContain('port=3307'); + expect(target).toContain('database=app'); + expect(target).toContain('user=u'); + expect(target).toContain('password=p'); + }); + + it('quotes mysql values containing spaces', () => { + const target = federatedAttachTarget( + member({ + driver: 'mysql', + connection: { driver: 'mysql', host: 'h', database: 'app', username: 'u', password: 'pass word' }, // pragma: allowlist secret + }), + {}, + ); + expect(target).toContain("password='pass word'"); // pragma: allowlist secret + }); + + it('emits sslmode=require for a postgres member configured with discrete fields and ssl', () => { + const target = federatedAttachTarget( + member({ + driver: 'postgres', + connection: { driver: 'postgres', host: 'h', database: 'db', username: 'u', ssl: true }, + }), + {}, + ); + expect(target).toContain('sslmode=require'); + }); + + it('passes through the postgres search_path as options', () => { + const target = federatedAttachTarget( + member({ + driver: 'postgres', + connection: { driver: 'postgres', host: 'h', database: 'db', username: 'u', schema: 'analytics' }, + }), + {}, + ); + expect(target).toContain('search_path=analytics'); + }); + + it('emits ssl_mode=REQUIRED for a mysql member with ssl', () => { + const target = federatedAttachTarget( + member({ + driver: 'mysql', + connection: { driver: 'mysql', host: 'h', database: 'db', username: 'u', ssl: true }, + }), + {}, + ); + expect(target).toContain('ssl_mode=REQUIRED'); + }); + + it('throws for an unsupported driver', () => { + expect(() => federatedAttachTarget(member({ driver: 'snowflake', connection: { driver: 'snowflake' } }), {})).toThrow( + /cannot be attached/i, + ); + }); +}); diff --git a/packages/cli/test/connectors/duckdb/federated-executor.test.ts b/packages/cli/test/connectors/duckdb/federated-executor.test.ts new file mode 100644 index 00000000..0cc07dc4 --- /dev/null +++ b/packages/cli/test/connectors/duckdb/federated-executor.test.ts @@ -0,0 +1,70 @@ +import { describe, expect, it } from 'vitest'; +import { buildAttachStatements } from '../../../src/connectors/duckdb/federated-executor.js'; +import { attachTypeForDriver, type FederatedMember } from '../../../src/context/connections/federation.js'; + +const member = ( + connectionId: string, + driver: string, + connection: FederatedMember['connection'], +): FederatedMember => ({ connectionId, driver, projectDir: '/proj', connection }); + +describe('attachTypeForDriver', () => { + it('maps drivers to DuckDB attach extension types', () => { + expect(attachTypeForDriver('postgres')).toBe('postgres'); + expect(attachTypeForDriver('mysql')).toBe('mysql'); + expect(attachTypeForDriver('sqlite')).toBe('sqlite'); + }); + + it('throws for an unsupported driver', () => { + expect(() => attachTypeForDriver('snowflake')).toThrow(/cannot be attached/i); + }); +}); + +describe('buildAttachStatements', () => { + it('loads each driver type once, then emits READ_ONLY ATTACH aliased by connectionId, resolving env refs', () => { + const stmts = buildAttachStatements( + [ + member('pg_books', 'postgres', { driver: 'postgres', url: 'env:PG_URL' }), + member('sqlite_reviews', 'sqlite', { driver: 'sqlite', path: '/data/reviews.db' }), + ], + { PG_URL: 'postgresql://localhost/books' }, + ); + expect(stmts).toEqual([ + 'INSTALL postgres; LOAD postgres;', + 'INSTALL sqlite; LOAD sqlite;', + 'ATTACH \'postgresql://localhost/books\' AS "pg_books" (TYPE postgres, READ_ONLY);', + 'ATTACH \'/data/reviews.db\' AS "sqlite_reviews" (TYPE sqlite, READ_ONLY);', + ]); + }); + + it('loads a shared driver type only once across members', () => { + const stmts = buildAttachStatements( + [ + member('pg_a', 'postgres', { driver: 'postgres', url: 'postgresql://h/a' }), + member('pg_b', 'postgres', { driver: 'postgres', url: 'postgresql://h/b' }), + ], + {}, + ); + expect(stmts).toEqual([ + 'INSTALL postgres; LOAD postgres;', + 'ATTACH \'postgresql://h/a\' AS "pg_a" (TYPE postgres, READ_ONLY);', + 'ATTACH \'postgresql://h/b\' AS "pg_b" (TYPE postgres, READ_ONLY);', + ]); + }); + + it('quotes a hyphenated connection id as a DuckDB identifier', () => { + const stmts = buildAttachStatements( + [member('postgres-warehouse', 'postgres', { driver: 'postgres', url: 'postgresql://h/db' })], + {}, + ); + expect(stmts.at(-1)).toBe(`ATTACH 'postgresql://h/db' AS "postgres-warehouse" (TYPE postgres, READ_ONLY);`); + }); + + it('escapes single quotes in a resolved attach target', () => { + const stmts = buildAttachStatements( + [member('pg', 'postgres', { driver: 'postgres', url: "postgresql://u:it's@h/db" })], + {}, + ); + expect(stmts.at(-1)).toBe('ATTACH \'postgresql://u:it\'\'s@h/db\' AS "pg" (TYPE postgres, READ_ONLY);'); + }); +}); diff --git a/packages/cli/test/connectors/duckdb/federated-join.integration.test.ts b/packages/cli/test/connectors/duckdb/federated-join.integration.test.ts new file mode 100644 index 00000000..222c9b7a --- /dev/null +++ b/packages/cli/test/connectors/duckdb/federated-join.integration.test.ts @@ -0,0 +1,147 @@ +import { describe, expect, it } from 'vitest'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { executeFederatedQuery } from '../../../src/connectors/duckdb/federated-executor.js'; +import type { FederatedMember } from '../../../src/context/connections/federation.js'; + +describe('federated cross-catalog join (live DuckDB)', () => { + it('joins two sqlite catalogs and enforces read-only', async () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-fed-')); + const booksPath = join(dir, 'books.db'); + const reviewsPath = join(dir, 'reviews.db'); + + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune'), (2, 'Foundation');"); + books.close(); + + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 4), (2, 2);'); + reviews.close(); + + const members: FederatedMember[] = [ + { connectionId: 'books_db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: booksPath } }, + { connectionId: 'reviews_db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: reviewsPath } }, + ]; + + try { + const result = await executeFederatedQuery(members, { + connectionId: '_ktx_federated', + connection: undefined, + sql: 'SELECT b.title, AVG(r.stars) AS avg_stars FROM books_db.books b JOIN reviews_db.reviews r ON b.id = r.book_id GROUP BY b.title ORDER BY b.title', + }); + expect(result.headers).toEqual(['title', 'avg_stars']); + // ORDER BY title: Dune, Foundation + expect(result.rows.map((row) => row[0])).toEqual(['Dune', 'Foundation']); + expect(Number(result.rows[0][1])).toBeCloseTo(4.5); // Dune: (5+4)/2 + expect(Number(result.rows[1][1])).toBeCloseTo(2.0); // Foundation: 2/1 + + await expect( + executeFederatedQuery(members, { + connectionId: '_ktx_federated', + connection: undefined, + sql: "INSERT INTO books_db.books VALUES (2, 'Hack')", + }), + ).rejects.toThrow(/read-only/i); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it('returns integer columns as JSON-safe numbers, not bigint', async () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-fed-bigint-')); + const booksPath = join(dir, 'books.db'); + const reviewsPath = join(dir, 'reviews.db'); + + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune'), (2, 'Foundation');"); + books.close(); + + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 4), (2, 2);'); + reviews.close(); + + const members: FederatedMember[] = [ + { connectionId: 'books_db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: booksPath } }, + { connectionId: 'reviews_db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: reviewsPath } }, + ]; + + try { + const result = await executeFederatedQuery(members, { + connectionId: '_ktx_federated', + connection: undefined, + sql: 'SELECT b.id, count(*) AS n FROM books_db.books b JOIN reviews_db.reviews r ON b.id = r.book_id GROUP BY b.id ORDER BY b.id', + }); + for (const row of result.rows) { + for (const cell of row) { + expect(typeof cell).not.toBe('bigint'); + } + } + expect(() => JSON.stringify(result)).not.toThrow(); + expect(result.rows[0][0]).toBe(1); + expect(Number(result.rows[0][1])).toBeGreaterThan(0); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it('preserves a BIGINT beyond 2^53 as an exact string instead of rounding', async () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-fed-bigint-large-')); + const idsPath = join(dir, 'ids.db'); + const otherPath = join(dir, 'other.db'); + + const ids = new Database(idsPath); + // 9007199254740993 = 2^53 + 1, which rounds to ...992 as a JS number; the + // literal lives in SQL text so sqlite stores it exactly. + ids.exec('CREATE TABLE ids (big_id INTEGER); INSERT INTO ids VALUES (9007199254740993);'); + ids.close(); + const other = new Database(otherPath); + other.exec('CREATE TABLE t (x INTEGER); INSERT INTO t VALUES (1);'); + other.close(); + + const members: FederatedMember[] = [ + { connectionId: 'ids_db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: idsPath } }, + { connectionId: 'other_db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: otherPath } }, + ]; + + try { + const result = await executeFederatedQuery(members, { + connectionId: '_ktx_federated', + connection: undefined, + sql: 'SELECT big_id FROM ids_db.ids', + }); + expect(result.rows[0][0]).toBe('9007199254740993'); + expect(() => JSON.stringify(result)).not.toThrow(); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it('joins catalogs whose connection ids contain hyphens', async () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-fed-hyphen-')); + const booksPath = join(dir, 'books.db'); + const reviewsPath = join(dir, 'reviews.db'); + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune');"); + books.close(); + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 3);'); + reviews.close(); + const members: FederatedMember[] = [ + { connectionId: 'books-db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: booksPath } }, + { connectionId: 'reviews-db', driver: 'sqlite', projectDir: dir, connection: { driver: 'sqlite', path: reviewsPath } }, + ]; + try { + const result = await executeFederatedQuery(members, { + connectionId: '_ktx_federated', + connection: undefined, + sql: 'SELECT b.title, AVG(r.stars) AS avg_stars FROM "books-db".books b JOIN "reviews-db".reviews r ON b.id = r.book_id GROUP BY b.title', + }); + expect(result.rows[0][0]).toBe('Dune'); + expect(Number(result.rows[0][1])).toBeCloseTo(4.0); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/cli/test/connectors/shared/string-reference.test.ts b/packages/cli/test/connectors/shared/string-reference.test.ts new file mode 100644 index 00000000..da2b6dc1 --- /dev/null +++ b/packages/cli/test/connectors/shared/string-reference.test.ts @@ -0,0 +1,41 @@ +import { describe, expect, it } from 'vitest'; +import { mkdtempSync, writeFileSync, rmSync } from 'node:fs'; +import { homedir, tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { resolveStringReference } from '../../../src/connectors/shared/string-reference.js'; + +describe('resolveStringReference', () => { + it('returns plain values unchanged', () => { + expect(resolveStringReference('postgres://localhost/db', {})).toBe('postgres://localhost/db'); + }); + + it('resolves env: references from the provided env', () => { + expect(resolveStringReference('env:MY_URL', { MY_URL: 'resolved-url' })).toBe('resolved-url'); + }); + + it('returns empty string for a missing env var', () => { + expect(resolveStringReference('env:NOPE', {})).toBe(''); + }); + + it('resolves file: references and trims whitespace', () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-strref-')); + const file = join(dir, 'secret.txt'); + writeFileSync(file, ' hunter2\n'); + try { + expect(resolveStringReference(`file:${file}`, {})).toBe('hunter2'); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it('expands ~ in file: references to the home directory', () => { + const name = `.ktx-strref-test-${process.pid}.txt`; + const abs = join(homedir(), name); + writeFileSync(abs, 'tilde-secret\n'); + try { + expect(resolveStringReference(`file:~/${name}`, {})).toBe('tilde-secret'); + } finally { + rmSync(abs, { force: true }); + } + }); +}); diff --git a/packages/cli/test/context/connections/federation.test.ts b/packages/cli/test/context/connections/federation.test.ts new file mode 100644 index 00000000..68d82898 --- /dev/null +++ b/packages/cli/test/context/connections/federation.test.ts @@ -0,0 +1,80 @@ +import { describe, expect, it } from 'vitest'; +import { + deriveFederatedConnection, + federatedConnectionListing, + FEDERATED_CONNECTION_ID, +} from '../../../src/context/connections/federation.js'; + +const conns = (entries: Record) => entries as never; + +describe('deriveFederatedConnection', () => { + it('returns null with zero compatible members', () => { + expect(deriveFederatedConnection(conns({ snow: { driver: 'snowflake' } }), '/proj')).toBeNull(); + }); + + it('returns null with exactly one compatible member', () => { + expect(deriveFederatedConnection(conns({ pg: { driver: 'postgres' } }), '/proj')).toBeNull(); + }); + + it('derives a descriptor with two compatible members', () => { + const result = deriveFederatedConnection( + conns({ pg: { driver: 'postgres' }, lite: { driver: 'sqlite' } }), + '/proj', + ); + expect(result).not.toBeNull(); + expect(result?.id).toBe(FEDERATED_CONNECTION_ID); + expect(result?.driver).toBe('duckdb'); + expect(result?.members.map((m) => m.connectionId).sort()).toEqual(['lite', 'pg']); + }); + + it('carries each member connection config and projectDir', () => { + const result = deriveFederatedConnection( + conns({ pg: { driver: 'postgres', host: 'h' }, lite: { driver: 'sqlite', path: './a.db' } }), + '/proj', + ); + const pg = result?.members.find((m) => m.connectionId === 'pg'); + expect(pg?.connection).toEqual({ driver: 'postgres', host: 'h' }); + expect(pg?.projectDir).toBe('/proj'); + }); + + it('excludes incompatible members from the group', () => { + const result = deriveFederatedConnection( + conns({ pg: { driver: 'postgres' }, my: { driver: 'mysql' }, snow: { driver: 'snowflake' } }), + '/proj', + ); + expect(result?.members.map((m) => m.connectionId).sort()).toEqual(['my', 'pg']); + }); + + it('is case-insensitive on driver names', () => { + const result = deriveFederatedConnection( + conns({ pg: { driver: 'POSTGRES' }, lite: { driver: 'SQLite' } }), + '/proj', + ); + expect(result?.members).toHaveLength(2); + }); +}); + +describe('federatedConnectionListing', () => { + it('returns null with fewer than 2 attach-compatible connections', () => { + expect( + federatedConnectionListing({ books_db: { driver: 'sqlite', path: './b.db' } }, '/tmp/p'), + ).toBeNull(); + }); + + it('returns id, driver, member ids and a usage hint with 2+ members', () => { + const listing = federatedConnectionListing( + { + books_db: { driver: 'sqlite', path: './b.db' }, + reviews_db: { driver: 'sqlite', path: './r.db' }, + snow: { driver: 'snowflake', account: 'x' }, + }, + '/tmp/p', + ); + expect(listing).not.toBeNull(); + expect(listing!.id).toBe(FEDERATED_CONNECTION_ID); + expect(listing!.driver).toBe('duckdb'); + expect(listing!.members).toEqual(['books_db', 'reviews_db']); + expect(listing!.hint).toContain('Cross-database'); + expect(listing!.hint).toContain('connectionId.table'); + }); +}); diff --git a/packages/cli/test/context/connections/project-sql-executor.integration.test.ts b/packages/cli/test/context/connections/project-sql-executor.integration.test.ts new file mode 100644 index 00000000..b6322f43 --- /dev/null +++ b/packages/cli/test/context/connections/project-sql-executor.integration.test.ts @@ -0,0 +1,57 @@ +import { describe, expect, it } from 'vitest'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { executeProjectReadOnlySql } from '../../../src/context/connections/project-sql-executor.js'; +import type { KtxLocalProject } from '../../../src/context/project/project.js'; + +function fakeProject(projectDir: string, connections: Record): KtxLocalProject { + return { + projectDir, + configPath: join(projectDir, 'ktx.yaml'), + config: { connections } as unknown as KtxLocalProject['config'], + coreConfig: {} as KtxLocalProject['coreConfig'], + git: {} as KtxLocalProject['git'], + fileStore: {} as KtxLocalProject['fileStore'], + }; +} + +describe('executeProjectReadOnlySql — federated integration (real DuckDB)', () => { + it('runs a federated cross-catalog join through the default executeFederatedQuery', async () => { + const dir = mkdtempSync(join(tmpdir(), 'ktx-fed-exec-')); + const booksPath = join(dir, 'books.db'); + const reviewsPath = join(dir, 'reviews.db'); + + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune'), (2, 'Foundation');"); + books.close(); + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 4), (2, 2);'); + reviews.close(); + + const project = fakeProject(dir, { + books_db: { driver: 'sqlite', path: booksPath }, + reviews_db: { driver: 'sqlite', path: reviewsPath }, + }); + + try { + const result = await executeProjectReadOnlySql({ + project, + input: { + connectionId: '_ktx_federated', + connection: undefined, + sql: 'SELECT b.title, AVG(r.stars) AS avg_stars FROM books_db.books b JOIN reviews_db.reviews r ON b.id = r.book_id GROUP BY b.title ORDER BY b.title', + maxRows: 100, + }, + createConnector: () => { + throw new Error('federated path must not create a scan connector'); + }, + }); + expect(result.rows.map((row) => row[0])).toEqual(['Dune', 'Foundation']); + expect(Number(result.rows[0][1])).toBeCloseTo(4.5); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/cli/test/context/connections/project-sql-executor.test.ts b/packages/cli/test/context/connections/project-sql-executor.test.ts new file mode 100644 index 00000000..899875a8 --- /dev/null +++ b/packages/cli/test/context/connections/project-sql-executor.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { executeFederatedQuery } from '../../../src/connectors/duckdb/federated-executor.js'; +import { executeProjectReadOnlySql } from '../../../src/context/connections/project-sql-executor.js'; +import type { KtxLocalProject } from '../../../src/context/project/project.js'; +import type { KtxScanConnector } from '../../../src/context/scan/types.js'; + +function fakeProject(connections: Record): KtxLocalProject { + return { + projectDir: '/tmp/proj', + configPath: '/tmp/proj/ktx.yaml', + config: { connections } as unknown as KtxLocalProject['config'], + coreConfig: {} as KtxLocalProject['coreConfig'], + git: {} as KtxLocalProject['git'], + fileStore: {} as KtxLocalProject['fileStore'], + }; +} + +describe('executeProjectReadOnlySql — federated routing', () => { + it('routes _ktx_federated through the federated executor with derived members', async () => { + const project = fakeProject({ pg: { driver: 'postgres' }, lite: { driver: 'sqlite' } }); + const executeFederated = vi.fn(async () => ({ + headers: ['x'], + rows: [[1]], + totalRows: 1, + command: 'SELECT', + rowCount: 1, + })); + const createConnector = vi.fn(); + + const result = await executeProjectReadOnlySql({ + project, + input: { connectionId: '_ktx_federated', connection: undefined, sql: 'SELECT 1', maxRows: 100 }, + createConnector: createConnector as never, + executeFederated, + }); + + expect(result.rows).toEqual([[1]]); + expect(executeFederated).toHaveBeenCalledOnce(); + const members = executeFederated.mock.calls[0][0]; + expect(members.map((m) => m.connectionId).sort()).toEqual(['lite', 'pg']); + expect(createConnector).not.toHaveBeenCalled(); + }); + + it('throws when _ktx_federated requested but fewer than 2 compatible members', async () => { + const project = fakeProject({ pg: { driver: 'postgres' } }); + await expect( + executeProjectReadOnlySql({ + project, + input: { connectionId: '_ktx_federated', connection: undefined, sql: 'SELECT 1', maxRows: 100 }, + createConnector: (() => { + throw new Error('should not be called'); + }) as never, + executeFederated: vi.fn(), + }), + ).rejects.toThrow(/fewer than 2/i); + }); + + it('routes a normal connection through the scan connector', async () => { + const project = fakeProject({ pg: { driver: 'postgres' } }); + const connector = { + driver: 'postgres', + capabilities: { readOnlySql: true }, + executeReadOnly: vi.fn(async () => ({ headers: ['a'], rows: [['v']], totalRows: 1, rowCount: 1 })), + cleanup: vi.fn(async () => {}), + }; + const result = await executeProjectReadOnlySql({ + project, + input: { connectionId: 'pg', connection: { driver: 'postgres' }, sql: 'SELECT a', maxRows: 50 }, + createConnector: (async () => connector) as never, + executeFederated: vi.fn(), + }); + expect(result.rows).toEqual([['v']]); + expect(connector.executeReadOnly).toHaveBeenCalledOnce(); + expect(connector.cleanup).toHaveBeenCalledOnce(); + }); +}); + +function connectorReturning(result: { + headers: string[]; + headerTypes?: string[]; + rows: unknown[][]; + totalRows: number; + rowCount: number | null; +}): KtxScanConnector { + return { + driver: 'sqlite', + capabilities: { readOnlySql: true }, + async executeReadOnly() { + return result; + }, + } as unknown as KtxScanConnector; +} + +describe('executeProjectReadOnlySql headerTypes', () => { + it('forwards connector headerTypes on the non-federated branch', async () => { + const project = { + projectDir: '/tmp/p', + config: { connections: { books_db: { driver: 'sqlite', path: './b.db' } } }, + } as never; + + const result = await executeProjectReadOnlySql({ + project, + input: { connectionId: 'books_db', connection: undefined, sql: 'SELECT 1', maxRows: 10 }, + createConnector: () => + connectorReturning({ + headers: ['id'], + headerTypes: ['INTEGER'], + rows: [[1]], + totalRows: 1, + rowCount: 1, + }), + }); + + expect(result.headerTypes).toEqual(['INTEGER']); + }); +}); diff --git a/packages/cli/test/context/ingest/manifest-federated-join.test.ts b/packages/cli/test/context/ingest/manifest-federated-join.test.ts new file mode 100644 index 00000000..0595d321 --- /dev/null +++ b/packages/cli/test/context/ingest/manifest-federated-join.test.ts @@ -0,0 +1,70 @@ +import { describe, expect, it } from 'vitest'; +import { buildJoinsByTable, buildLiveDatabaseManifestShards } from '../../../src/context/ingest/adapters/live-database/manifest.js'; + +const joinData = (toTable: string) => ({ + fromTable: 'books', + fromColumns: ['id'], + toTable, + toColumns: ['book_id'], + relationship: 'one_to_many', + source: 'manual' as const, +}); + +describe('buildJoinsByTable federated siblings', () => { + it('keeps a forward join whose target is a federated sibling table', () => { + const result = buildJoinsByTable( + new Set(['books']), // current snapshot + [joinData('sqlite_reviews.reviews')], // target NOT local + new Map(), + new Set(['sqlite_reviews.reviews']), // federated sibling targets + ); + expect(result.get('books')?.map((j) => j.to)).toEqual(['sqlite_reviews.reviews']); + // The sibling target must NOT get a reverse entry (it has no shard in this snapshot) + expect(result.get('sqlite_reviews.reviews')).toBeUndefined(); + }); + + it('still drops a join whose target is neither local nor a sibling', () => { + const result = buildJoinsByTable(new Set(['books']), [joinData('ghost')], new Map(), new Set()); + expect(result.get('books')).toBeUndefined(); + }); + + it('keeps both directions for a fully-local join (unchanged behavior)', () => { + const result = buildJoinsByTable(new Set(['books', 'authors']), [joinData('authors')], new Map(), new Set()); + expect(result.get('books')?.map((j) => j.to)).toEqual(['authors']); + expect(result.get('authors')?.map((j) => j.to)).toEqual(['books']); // reverse still added for local joins + }); +}); + +describe('buildLiveDatabaseManifestShards federated preserved joins', () => { + it('keeps a preserved manual join whose target is a federated sibling', () => { + const result = buildLiveDatabaseManifestShards({ + connectionType: 'POSTGRES', + tables: [{ name: 'books', catalog: null, db: 'public', columns: [{ name: 'id', type: 'int' }] }], + joins: [], + existingPreservedJoins: new Map([ + [ + 'books', + [{ to: 'sqlite_reviews.reviews', on: 'books.id = reviews.book_id', relationship: 'one_to_many', source: 'manual' }], + ], + ]), + federatedSiblingTargets: new Set(['sqlite_reviews.reviews']), + mapColumnType: (t) => t, + }); + const shard = result.shards.get('public'); + expect(shard?.tables.books?.joins?.map((j) => j.to)).toEqual(['sqlite_reviews.reviews']); + }); + + it('still drops a preserved join whose target is neither local nor a sibling', () => { + const result = buildLiveDatabaseManifestShards({ + connectionType: 'POSTGRES', + tables: [{ name: 'books', catalog: null, db: 'public', columns: [{ name: 'id', type: 'int' }] }], + joins: [], + existingPreservedJoins: new Map([ + ['books', [{ to: 'ghost', on: 'books.id = ghost.id', relationship: 'one_to_many', source: 'manual' }]], + ]), + federatedSiblingTargets: new Set(), + mapColumnType: (t) => t, + }); + expect(result.shards.get('public')?.tables.books?.joins).toBeUndefined(); + }); +}); diff --git a/packages/cli/test/context/mcp/__snapshots__/mcp-tools-list.json b/packages/cli/test/context/mcp/__snapshots__/mcp-tools-list.json index 3ffca96b..8a78009f 100644 --- a/packages/cli/test/context/mcp/__snapshots__/mcp-tools-list.json +++ b/packages/cli/test/context/mcp/__snapshots__/mcp-tools-list.json @@ -2,7 +2,7 @@ { "name": "connection_list", "title": "Connection List", - "description": "List configured read-only data connections available to this ktx project. Use this before connection-scoped tools when the project may have multiple warehouses.", + "description": "List configured read-only data connections available to this ktx project. Use this before connection-scoped tools when the project may have multiple warehouses. A \"_ktx_federated\" entry (when present) queries all its member databases together; use its id for cross-database joins.", "inputSchema": { "type": "object", "properties": {}, @@ -24,6 +24,15 @@ }, "connectionType": { "type": "string" + }, + "members": { + "type": "array", + "items": { + "type": "string" + } + }, + "hint": { + "type": "string" } }, "required": [ diff --git a/packages/cli/test/context/mcp/connection-list-federated.test.ts b/packages/cli/test/context/mcp/connection-list-federated.test.ts new file mode 100644 index 00000000..f09b11c9 --- /dev/null +++ b/packages/cli/test/context/mcp/connection-list-federated.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from 'vitest'; +import { createLocalProjectMcpContextPorts } from '../../../src/context/mcp/local-project-ports.js'; + +const project = { + projectDir: '/tmp/p', + config: { + connections: { + books_db: { driver: 'sqlite', path: './b.db' }, + reviews_db: { driver: 'sqlite', path: './r.db' }, + }, + }, +} as never; + +describe('MCP connection_list federated entry', () => { + it('includes _ktx_federated with members and hint', async () => { + const ports = createLocalProjectMcpContextPorts(project, { embeddingService: null }); + const list = await ports.connections!.list(); + const federated = list.find((c) => c.id === '_ktx_federated'); + expect(federated).toBeDefined(); + expect(federated!.connectionType).toBe('DUCKDB'); + expect(federated!.members).toEqual(['books_db', 'reviews_db']); + expect(federated!.hint).toContain('Cross-database'); + }); + + it('omits _ktx_federated with a single connection', async () => { + const single = { + projectDir: '/tmp/p', + config: { connections: { books_db: { driver: 'sqlite', path: './b.db' } } }, + } as never; + const ports = createLocalProjectMcpContextPorts(single, { embeddingService: null }); + const list = await ports.connections!.list(); + expect(list.find((c) => c.id === '_ktx_federated')).toBeUndefined(); + }); +}); diff --git a/packages/cli/test/context/mcp/local-project-ports-federated.integration.test.ts b/packages/cli/test/context/mcp/local-project-ports-federated.integration.test.ts new file mode 100644 index 00000000..362406df --- /dev/null +++ b/packages/cli/test/context/mcp/local-project-ports-federated.integration.test.ts @@ -0,0 +1,99 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { describe, expect, it, vi } from 'vitest'; +import { createLocalProjectMcpContextPorts } from '../../../src/context/mcp/local-project-ports.js'; +import { initKtxProject } from '../../../src/context/project/project.js'; + +describe('MCP sql_execution — federated routing (live DuckDB)', () => { + it('routes _ktx_federated through the shared federated executor, validating with the duckdb dialect', async () => { + const dir = await mkdtemp(join(tmpdir(), 'ktx-mcp-fed-')); + try { + const booksPath = join(dir, 'books.db'); + const reviewsPath = join(dir, 'reviews.db'); + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune');"); + books.close(); + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 3);'); + reviews.close(); + + const project = await initKtxProject({ projectDir: dir }); + project.config.connections.books_db = { driver: 'sqlite', path: booksPath }; + project.config.connections.reviews_db = { driver: 'sqlite', path: reviewsPath }; + + const validateReadOnly = vi.fn(async () => ({ ok: true, error: null })); + const ports = createLocalProjectMcpContextPorts(project, { + sqlAnalysis: { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(), + validateReadOnly, + } as never, + localScan: { + createConnector: () => { + throw new Error('federated path must not create a scan connector'); + }, + }, + embeddingService: null, + }); + + const result = await ports.sqlExecution?.execute({ + connectionId: '_ktx_federated', + sql: 'SELECT b.title, AVG(r.stars) AS avg_stars FROM books_db.books b JOIN reviews_db.reviews r ON b.id = r.book_id GROUP BY b.title', + maxRows: 100, + }); + + expect(result?.rows?.[0]?.[0]).toBe('Dune'); + // Federated validation uses the duckdb dialect, not a member driver. + expect(validateReadOnly).toHaveBeenCalledWith(expect.any(String), 'duckdb'); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + it('serializes integer columns from a federated query without throwing on bigint', async () => { + const dir = await mkdtemp(join(tmpdir(), 'ktx-mcp-fed-int-')); + try { + const booksPath = join(dir, 'books.db'); + const reviewsPath = join(dir, 'reviews.db'); + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune');"); + books.close(); + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 3);'); + reviews.close(); + + const project = await initKtxProject({ projectDir: dir }); + project.config.connections.books_db = { driver: 'sqlite', path: booksPath }; + project.config.connections.reviews_db = { driver: 'sqlite', path: reviewsPath }; + + const validateReadOnly = vi.fn(async () => ({ ok: true, error: null })); + const ports = createLocalProjectMcpContextPorts(project, { + sqlAnalysis: { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(), + validateReadOnly, + } as never, + localScan: { + createConnector: () => { + throw new Error('federated path must not create a scan connector'); + }, + }, + embeddingService: null, + }); + + const result = await ports.sqlExecution?.execute({ + connectionId: '_ktx_federated', + sql: 'SELECT b.title, count(*) AS n FROM books_db.books b JOIN reviews_db.reviews r ON b.id = r.book_id GROUP BY b.title', + maxRows: 100, + }); + + expect(() => JSON.stringify(result)).not.toThrow(); + expect(result?.rows?.[0]?.[0]).toBe('Dune'); + expect(result?.rows?.[0]?.[1]).toBe(2); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/cli/test/context/scan/local-enrichment-federated-join.test.ts b/packages/cli/test/context/scan/local-enrichment-federated-join.test.ts new file mode 100644 index 00000000..ba1c6cae --- /dev/null +++ b/packages/cli/test/context/scan/local-enrichment-federated-join.test.ts @@ -0,0 +1,130 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import YAML from 'yaml'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { buildDefaultKtxProjectConfig } from '../../../src/context/project/config.js'; +import type { GitService } from '../../../src/context/core/git.service.js'; +import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js'; +import type { KtxLocalProject } from '../../../src/context/project/project.js'; +import { writeLocalScanManifestShards } from '../../../src/context/scan/local-enrichment-artifacts.js'; +import type { KtxSchemaSnapshot } from '../../../src/context/scan/types.js'; + +// `writeLocalScanManifestShards` commits its output via git; the file is +// already on disk before the commit call, so the stub only returns commit info. +const stubGitCommitFile: Pick = { + commitFile: async () => ({ + commitHash: 'stub', + shortHash: 'stub', + message: 'stub', + author: 'ktx', + authorEmail: 'ktx@example.com', + timestamp: new Date().toISOString(), + committedDate: new Date().toISOString(), + created: true, + }), +}; +const stubGit = stubGitCommitFile as GitService; + +function fakeProject(projectDir: string, connections: KtxLocalProject['config']['connections']): KtxLocalProject { + const fileStore = new LocalGitFileStore({ rootDir: projectDir, git: stubGit }); + return { + projectDir, + configPath: join(projectDir, 'ktx.yaml'), + config: { ...buildDefaultKtxProjectConfig(), connections }, + coreConfig: {} as KtxLocalProject['coreConfig'], + git: stubGit, + fileStore, + }; +} + +const EXISTING_BOOKS_SHARD = `tables: + books: + table: public.books + columns: + - name: id + type: number + pk: true + joins: + - to: sqlite_reviews.reviews + on: books.id = reviews.book_id + relationship: one_to_many + source: manual +`; + +const booksSnapshot: KtxSchemaSnapshot = { + connectionId: 'pg_books', + driver: 'postgres', + extractedAt: new Date().toISOString(), + scope: {}, + metadata: {}, + tables: [ + { + name: 'books', + catalog: null, + db: 'public', + kind: 'table', + comment: null, + estimatedRows: null, + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: null, + }, + ], + foreignKeys: [], + }, + ], +}; + +describe('writeLocalScanManifestShards federated cross-DB joins', () => { + let tempDir: string; + let project: KtxLocalProject; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-enrich-fed-')); + project = fakeProject(join(tempDir, 'project'), { + pg_books: { driver: 'postgres' }, + sqlite_reviews: { driver: 'sqlite' }, + }); + await project.fileStore.writeFile( + 'semantic-layer/pg_books/_schema/public.yaml', + EXISTING_BOOKS_SHARD, + 'ktx', + 'ktx@example.com', + 'seed', + { skipLock: true }, + ); + await project.fileStore.writeFile( + 'semantic-layer/sqlite_reviews/_schema/main.yaml', + 'tables:\n reviews:\n table: reviews\n columns:\n - name: book_id\n type: number\n', + 'ktx', + 'ktx@example.com', + 'seed', + { skipLock: true }, + ); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('preserves a manual cross-DB join to a sqlite sibling across a re-scan', async () => { + await writeLocalScanManifestShards({ + project, + connectionId: 'pg_books', + syncId: 'sync1', + driver: 'postgres', + snapshot: booksSnapshot, + dryRun: false, + }); + const { content } = await project.fileStore.readFile('semantic-layer/pg_books/_schema/public.yaml'); + const shard = YAML.parse(content) as { tables: Record }> }; + expect(shard.tables.books?.joins?.map((j) => j.to)).toEqual(['sqlite_reviews.reviews']); + }); +}); diff --git a/packages/cli/test/context/sl/local-query-federated.integration.test.ts b/packages/cli/test/context/sl/local-query-federated.integration.test.ts new file mode 100644 index 00000000..b0ff1f5e --- /dev/null +++ b/packages/cli/test/context/sl/local-query-federated.integration.test.ts @@ -0,0 +1,111 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { buildDefaultKtxProjectConfig } from '../../../src/context/project/config.js'; +import { executeProjectReadOnlySql } from '../../../src/context/connections/project-sql-executor.js'; +import type { GitService } from '../../../src/context/core/git.service.js'; +import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js'; +import type { KtxLocalProject } from '../../../src/context/project/project.js'; +import { loadLocalSlSourceRecords } from '../../../src/context/sl/local-sl.js'; + +const BOOKS_MANIFEST = `tables: + books: + table: main.books + columns: + - name: id + type: number + pk: true + - name: title + type: string +`; + +const REVIEWS_MANIFEST = `tables: + reviews: + table: main.reviews + columns: + - name: book_id + type: number + pk: true + - name: stars + type: number +`; + +// On-disk file store only (no git init/commit) so manifest seeding never hits +// the gpg-signing path; connections also carry real sqlite paths so the +// federated executor can attach them. +function fakeProject(projectDir: string, connections: KtxLocalProject['config']['connections']): KtxLocalProject { + const fileStore = new LocalGitFileStore({ rootDir: projectDir, git: {} as GitService }); + const config = { ...buildDefaultKtxProjectConfig(), connections }; + return { + projectDir, + configPath: join(projectDir, 'ktx.yaml'), + config, + coreConfig: {} as KtxLocalProject['coreConfig'], + git: {} as GitService, + fileStore, + }; +} + +async function seedManifest(project: KtxLocalProject, path: string, content: string): Promise { + await project.fileStore.writeFile(path, content, 'ktx', 'ktx@example.com', 'seed manifest', { skipLock: true }); +} + +describe('federated SL source loading and physical execution (real DuckDB)', () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-local-query-fed-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('namespaces source names while keeping physical table refs, and executes against them', async () => { + const projectDir = join(tempDir, 'project'); + const booksPath = join(tempDir, 'books.db'); + const reviewsPath = join(tempDir, 'reviews.db'); + + const books = new Database(booksPath); + books.exec("CREATE TABLE books (id INTEGER, title TEXT); INSERT INTO books VALUES (1, 'Dune'), (2, 'Foundation');"); + books.close(); + const reviews = new Database(reviewsPath); + reviews.exec('CREATE TABLE reviews (book_id INTEGER, stars INTEGER); INSERT INTO reviews VALUES (1, 5), (1, 4), (2, 2);'); + reviews.close(); + + const project = fakeProject(projectDir, { + sqlite_books: { driver: 'sqlite', path: booksPath }, + sqlite_reviews: { driver: 'sqlite', path: reviewsPath }, + }); + await seedManifest(project, 'semantic-layer/sqlite_books/_schema/main.yaml', BOOKS_MANIFEST); + await seedManifest(project, 'semantic-layer/sqlite_reviews/_schema/main.yaml', REVIEWS_MANIFEST); + + // (a) Name-vs-physical separation: federated loading namespaces source.name + // by member id while source.table stays the unprefixed physical ref. + const records = await loadLocalSlSourceRecords(project, { connectionId: '_ktx_federated' }); + const byName = new Map(records.map((record) => [record.source.name, record.source.table])); + expect([...byName.keys()].sort()).toEqual(['sqlite_books.books', 'sqlite_reviews.reviews']); + expect(byName.get('sqlite_books.books')).toBe('main.books'); + expect(byName.get('sqlite_reviews.reviews')).toBe('main.reviews'); + + // (b) Physical targeting end-to-end: a federated query joining the two + // attached catalogs by their connectionId-prefixed physical refs returns + // the correct joined rows through live DuckDB. + const result = await executeProjectReadOnlySql({ + project, + input: { + connectionId: '_ktx_federated', + connection: undefined, + sql: 'SELECT b.title, AVG(r.stars) AS avg_stars FROM sqlite_books.books b JOIN sqlite_reviews.reviews r ON b.id = r.book_id GROUP BY b.title ORDER BY b.title', + maxRows: 100, + }, + createConnector: () => { + throw new Error('federated path must not create a scan connector'); + }, + }); + expect(result.rows.map((row) => row[0])).toEqual(['Dune', 'Foundation']); + expect(Number(result.rows[0][1])).toBeCloseTo(4.5); + }); +}); diff --git a/packages/cli/test/context/sl/local-query-federated.test.ts b/packages/cli/test/context/sl/local-query-federated.test.ts new file mode 100644 index 00000000..a8a8a239 --- /dev/null +++ b/packages/cli/test/context/sl/local-query-federated.test.ts @@ -0,0 +1,207 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { KtxSemanticLayerComputePort } from '../../../src/context/daemon/semantic-layer-compute.js'; +import type { KtxLocalProject } from '../../../src/context/project/project.js'; +import { compileLocalSlQuery } from '../../../src/context/sl/local-query.js'; + +function makeFakeProject(): KtxLocalProject { + const fileStore = { + listFiles: vi.fn(async () => ({ files: [] })), + readFile: vi.fn(async () => ({ content: '' })), + writeFile: vi.fn(async () => ({})), + deleteFile: vi.fn(async () => ({})), + fileHistory: vi.fn(async () => []), + headCommit: vi.fn(async () => null), + } as unknown as KtxLocalProject['fileStore']; + + return { + projectDir: '/tmp/fake-ktx-project', + configPath: '/tmp/fake-ktx-project/ktx.yaml', + config: { + connections: { + pg_books: { driver: 'postgres' }, + sqlite_reviews: { driver: 'sqlite' }, + }, + storage: { state: 'sqlite', search: 'sqlite-fts5', git: {} }, + llm: {}, + ingest: {}, + agent: {}, + scan: {}, + } as unknown as KtxLocalProject['config'], + coreConfig: {} as KtxLocalProject['coreConfig'], + git: {} as KtxLocalProject['git'], + fileStore, + }; +} + +function makeFakeProjectWithFiles( + connections: Record, + files: Record, +): KtxLocalProject { + const fileStore = { + listFiles: vi.fn(async (dir: string) => ({ + files: Object.keys(files).filter((path) => path.startsWith(`${dir}/`)), + })), + readFile: vi.fn(async (path: string) => ({ content: files[path] ?? '' })), + writeFile: vi.fn(async () => ({})), + deleteFile: vi.fn(async () => ({})), + fileHistory: vi.fn(async () => []), + headCommit: vi.fn(async () => null), + } as unknown as KtxLocalProject['fileStore']; + + return { + projectDir: '/tmp/fake-ktx-project', + configPath: '/tmp/fake-ktx-project/ktx.yaml', + config: { + connections, + storage: { state: 'sqlite', search: 'sqlite-fts5', git: {} }, + llm: {}, + ingest: {}, + agent: {}, + scan: {}, + } as unknown as KtxLocalProject['config'], + coreConfig: {} as KtxLocalProject['coreConfig'], + git: {} as KtxLocalProject['git'], + fileStore, + }; +} + +function makeFakeCompute(): KtxSemanticLayerComputePort & { + lastDialect: string | undefined; + lastSources: Array<{ name: string; joins?: Array<{ to: string }> }> | undefined; +} { + const fake = { + lastDialect: undefined as string | undefined, + lastSources: undefined as Array<{ name: string; joins?: Array<{ to: string }> }> | undefined, + query: vi.fn(async (input: { dialect: string; query: unknown; sources: unknown[] }) => { + fake.lastDialect = input.dialect; + fake.lastSources = input.sources as Array<{ name: string; joins?: Array<{ to: string }> }>; + return { + sql: 'select 1', + dialect: input.dialect, + columns: [], + plan: { measures: [], dimensions: [] }, + }; + }), + validateSources: vi.fn(), + generateSources: vi.fn(), + }; + return fake; +} + +describe('compileLocalSlQuery — federated connection', () => { + it('rejects federated queries and points to raw SQL', async () => { + const project = makeFakeProject(); + const compute = makeFakeCompute(); + + await expect( + compileLocalSlQuery(project, { + connectionId: '_ktx_federated', + query: { measures: [], dimensions: [] }, + compute, + execute: false, + }), + ).rejects.toThrow(/_ktx_federated[\s\S]*ktx sql/); + // The compute adapter must never be invoked for a federated query. + expect(compute.query).not.toHaveBeenCalled(); + }); + + it('still uses the driver dialect for a normal connection', async () => { + const project = makeFakeProject(); + const compute = makeFakeCompute(); + + await compileLocalSlQuery(project, { + connectionId: 'pg_books', + query: { measures: [], dimensions: [] }, + compute, + execute: false, + }); + + expect(compute.lastDialect).toBe('postgres'); + }); + + it('drops a cross-connection join target so a member query is not poisoned', async () => { + // A preserved cross-DB join (to: sqlite_reviews.reviews) would otherwise be + // an orphan target the planner rejects, breaking every pg_books SL query. + const manifest = `tables: + books: + table: public.books + columns: + - name: id + type: number + pk: true + - name: author_id + type: number + joins: + - to: sqlite_reviews.reviews + on: books.id = reviews.book_id + relationship: one_to_many + - to: authors + on: books.author_id = authors.id + relationship: many_to_one + authors: + table: public.authors + columns: + - name: id + type: number + pk: true +`; + const project = makeFakeProjectWithFiles( + { pg_books: { driver: 'postgres' }, sqlite_reviews: { driver: 'sqlite' } }, + { 'semantic-layer/pg_books/_schema/public.yaml': manifest }, + ); + const compute = makeFakeCompute(); + + await compileLocalSlQuery(project, { + connectionId: 'pg_books', + query: { measures: [], dimensions: [] }, + compute, + execute: false, + }); + + expect(compute.query).toHaveBeenCalledTimes(1); + const books = compute.lastSources?.find((source) => source.name === 'books'); + // The same-connection join survives; only the federated-sibling target is dropped. + expect(books?.joins?.map((join) => join.to)).toEqual(['authors']); + }); + + it('keeps a same-connection join whose target name collides with another connection id', async () => { + // Connection ids and source names share a vocabulary, so a sibling connection + // can be named `authors` while a same-connection source is also `authors`. The + // join target resolves within the connection and must not be pruned. + const manifest = `tables: + books: + table: public.books + columns: + - name: id + type: number + pk: true + - name: author_id + type: number + joins: + - to: authors + on: books.author_id = authors.id + relationship: many_to_one + authors: + table: public.authors + columns: + - name: id + type: number + pk: true +`; + const project = makeFakeProjectWithFiles( + { pg_books: { driver: 'postgres' }, authors: { driver: 'postgres' } }, + { 'semantic-layer/pg_books/_schema/public.yaml': manifest }, + ); + const compute = makeFakeCompute(); + + await compileLocalSlQuery(project, { + connectionId: 'pg_books', + query: { measures: [], dimensions: [] }, + compute, + execute: false, + }); + + const books = compute.lastSources?.find((source) => source.name === 'books'); + expect(books?.joins?.map((join) => join.to)).toEqual(['authors']); + }); +}); diff --git a/packages/cli/test/context/sl/local-sl-federated.test.ts b/packages/cli/test/context/sl/local-sl-federated.test.ts new file mode 100644 index 00000000..0da8c8bf --- /dev/null +++ b/packages/cli/test/context/sl/local-sl-federated.test.ts @@ -0,0 +1,108 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { buildDefaultKtxProjectConfig } from '../../../src/context/project/config.js'; +import type { GitService } from '../../../src/context/core/git.service.js'; +import { LocalGitFileStore } from '../../../src/context/project/local-git-file-store.js'; +import type { KtxLocalProject } from '../../../src/context/project/project.js'; +import { loadLocalSlSourceRecords } from '../../../src/context/sl/local-sl.js'; + +const BOOKS_MANIFEST = `tables: + books: + table: public.books + columns: + - name: book_id + type: number + pk: true + - name: title + type: string +`; + +const REVIEWS_MANIFEST = `tables: + reviews: + table: main.reviews + columns: + - name: review_id + type: number + pk: true + - name: rating + type: number +`; + +// Build a project backed only by an on-disk file store (no git init, no +// commit), so the fixture never hits the gpg-signing path during init. +function fakeProject(projectDir: string, connections: KtxLocalProject['config']['connections']): KtxLocalProject { + const fileStore = new LocalGitFileStore({ rootDir: projectDir, git: {} as GitService }); + const config = { ...buildDefaultKtxProjectConfig(), connections }; + return { + projectDir, + configPath: join(projectDir, 'ktx.yaml'), + config, + coreConfig: {} as KtxLocalProject['coreConfig'], + git: {} as GitService, + fileStore, + }; +} + +// `skipLock: true` writes the file to disk without committing, avoiding git. +async function seedManifest(project: KtxLocalProject, path: string, content: string): Promise { + await project.fileStore.writeFile(path, content, 'ktx', 'ktx@example.com', 'seed manifest', { skipLock: true }); +} + +describe('federated semantic-layer source loading', () => { + let tempDir: string; + let project: KtxLocalProject; + let singleMemberProject: KtxLocalProject; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-local-sl-fed-')); + + project = fakeProject(join(tempDir, 'project'), { + pg_books: { driver: 'postgres' }, + sqlite_reviews: { driver: 'sqlite' }, + }); + await seedManifest(project, 'semantic-layer/pg_books/_schema/public.yaml', BOOKS_MANIFEST); + await seedManifest(project, 'semantic-layer/sqlite_reviews/_schema/main.yaml', REVIEWS_MANIFEST); + + singleMemberProject = fakeProject(join(tempDir, 'single'), { + pg_books: { driver: 'postgres' }, + }); + await seedManifest(singleMemberProject, 'semantic-layer/pg_books/_schema/public.yaml', BOOKS_MANIFEST); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('namespaces member source records by connection id for _ktx_federated', async () => { + const records = await loadLocalSlSourceRecords(project, { connectionId: '_ktx_federated' }); + const names = records.map((r) => r.source.name).sort(); + expect(names).toEqual(['pg_books.books', 'sqlite_reviews.reviews']); + }); + + it('keeps colliding member table names distinct via namespacing', async () => { + const collide = fakeProject(join(tempDir, 'collide'), { + pg_a: { driver: 'postgres' }, + sqlite_b: { driver: 'sqlite' }, + }); + const usersManifest = `tables:\n users:\n table: public.users\n columns:\n - name: id\n type: number\n`; + await seedManifest(collide, 'semantic-layer/pg_a/_schema/public.yaml', usersManifest); + await seedManifest(collide, 'semantic-layer/sqlite_b/_schema/main.yaml', usersManifest); + const records = await loadLocalSlSourceRecords(collide, { connectionId: '_ktx_federated' }); + expect(records.map((r) => r.source.name).sort()).toEqual(['pg_a.users', 'sqlite_b.users']); + }); + + it('tags member records with the virtual federated connection id so reads round-trip', async () => { + const records = await loadLocalSlSourceRecords(project, { connectionId: '_ktx_federated' }); + // The federated connection owns no directory and is addressed by one virtual + // id; the member-prefixed names (asserted above) prove the union read from + // member dirs, so the (connectionId, name) pair resolves back via `sl read`. + expect(records.map((r) => r.connectionId)).toEqual(['_ktx_federated', '_ktx_federated']); + }); + + it('returns empty for _ktx_federated when fewer than 2 compatible members', async () => { + const records = await loadLocalSlSourceRecords(singleMemberProject, { connectionId: '_ktx_federated' }); + expect(records).toEqual([]); + }); +}); diff --git a/packages/cli/test/context/sl/source-files-reserved.test.ts b/packages/cli/test/context/sl/source-files-reserved.test.ts new file mode 100644 index 00000000..535549fd --- /dev/null +++ b/packages/cli/test/context/sl/source-files-reserved.test.ts @@ -0,0 +1,22 @@ +import { describe, expect, it } from 'vitest'; +import { assertSafeConnectionId, isReservedConnectionId } from '../../../src/context/sl/source-files.js'; + +describe('reserved connection ids', () => { + it('flags _ktx_ prefixed ids as reserved', () => { + expect(isReservedConnectionId('_ktx_federated')).toBe(true); + expect(isReservedConnectionId('_ktx_anything')).toBe(true); + }); + + it('does not flag normal ids', () => { + expect(isReservedConnectionId('pg_books')).toBe(false); + expect(isReservedConnectionId('sqlite_reviews')).toBe(false); + }); + + it('rejects a user-supplied reserved id', () => { + expect(() => assertSafeConnectionId('_ktx_federated')).toThrow(/reserved/i); + }); + + it('still accepts normal ids', () => { + expect(assertSafeConnectionId('pg_books')).toBe('pg_books'); + }); +}); diff --git a/packages/cli/test/ingest-query-executor-federated.test.ts b/packages/cli/test/ingest-query-executor-federated.test.ts new file mode 100644 index 00000000..cc7cb871 --- /dev/null +++ b/packages/cli/test/ingest-query-executor-federated.test.ts @@ -0,0 +1,36 @@ +import { describe, expect, it, vi } from 'vitest'; +import { createKtxCliIngestQueryExecutor } from '../src/ingest-query-executor.js'; + +describe('federated query executor routing', () => { + it('routes _ktx_federated to the DuckDB federated executor, not a single connector', async () => { + const project = { + projectDir: '/tmp/x', + config: { connections: { pg: { driver: 'postgres', url: 'env:PG' }, lite: { driver: 'sqlite', url: '/x.db' } } }, + } as never; + + const federatedSpy = vi.fn(async () => ({ + headers: ['n'], rows: [[1]], totalRows: 1, command: 'SELECT', rowCount: 1, + })); + + const executor = createKtxCliIngestQueryExecutor(project, { executeFederated: federatedSpy }); + const result = await executor.execute({ + connectionId: '_ktx_federated', + connection: undefined, + sql: 'select 1 as n', + }); + + expect(federatedSpy).toHaveBeenCalledOnce(); + expect(result.totalRows).toBe(1); + }); + + it('throws if _ktx_federated requested but fewer than 2 compatible members', async () => { + const project = { + projectDir: '/tmp/x', + config: { connections: { pg: { driver: 'postgres', url: 'env:PG' } } }, + } as never; + const executor = createKtxCliIngestQueryExecutor(project, { executeFederated: vi.fn() }); + await expect( + executor.execute({ connectionId: '_ktx_federated', connection: undefined, sql: 'select 1' }), + ).rejects.toThrow(/2 attach-compatible/i); + }); +}); diff --git a/packages/cli/test/setup-databases-federation-notice.test.ts b/packages/cli/test/setup-databases-federation-notice.test.ts new file mode 100644 index 00000000..4fe7cab2 --- /dev/null +++ b/packages/cli/test/setup-databases-federation-notice.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from 'vitest'; +import { federationNoticeFor } from '../src/setup-databases.js'; + +describe('federationNoticeFor', () => { + it('returns a notice naming members when 2+ compatible exist', () => { + const notice = federationNoticeFor({ + pg_books: { driver: 'postgres' }, + sqlite_reviews: { driver: 'sqlite' }, + } as never, '/proj'); + expect(notice).toMatch(/pg_books/); + expect(notice).toMatch(/sqlite_reviews/); + expect(notice).toMatch(/cross-database/i); + // Cross-DB joins via a source's `joins:` list are unsupported; the notice + // must steer users to raw SQL against the federated connection instead. + expect(notice).toMatch(/_ktx_federated/); + expect(notice).not.toMatch(/joins:/); + }); + + it('returns null with fewer than 2 compatible', () => { + expect(federationNoticeFor({ pg: { driver: 'postgres' } } as never, '/proj')).toBeNull(); + }); + + it('returns null when the second db is incompatible', () => { + expect( + federationNoticeFor({ pg: { driver: 'postgres' }, snow: { driver: 'snowflake' } } as never, '/proj'), + ).toBeNull(); + }); +}); diff --git a/packages/cli/test/sql-federated.integration.test.ts b/packages/cli/test/sql-federated.integration.test.ts new file mode 100644 index 00000000..c9190a9b --- /dev/null +++ b/packages/cli/test/sql-federated.integration.test.ts @@ -0,0 +1,90 @@ +import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { parseKtxProjectConfig, serializeKtxProjectConfig } from '../src/context/project/config.js'; +import { initKtxProject } from '../src/context/project/project.js'; +import type { SqlAnalysisPort } from '../src/context/sql-analysis/ports.js'; +import type { KtxCliIo } from '../src/cli-runtime.js'; +import { runKtxSql } from '../src/sql.js'; + +function fakeIo(): { io: KtxCliIo; out: () => string; err: () => string } { + let out = ''; + let err = ''; + return { + io: { + stdout: { write: (chunk: string) => ((out += chunk), true) }, + stderr: { write: (chunk: string) => ((err += chunk), true) }, + } as unknown as KtxCliIo, + out: () => out, + err: () => err, + }; +} + +// Validation needs the Python daemon, unavailable in unit tests; execution is real. +const stubSqlAnalysis: SqlAnalysisPort = { + analyzeForFingerprint: async () => ({ fingerprint: '', normalizedSql: '', tablesTouched: [], literalSlots: [] }), + analyzeBatch: async () => new Map([['cli-sql', { tablesTouched: [], columnsByClause: {} }]]), + validateReadOnly: async () => ({ ok: true, error: null }), +}; + +describe('ktx sql federated integration', () => { + let dir: string; + + beforeEach(async () => { + dir = await mkdtemp(join(tmpdir(), 'ktx-fed-int-')); + }); + + afterEach(async () => { + await rm(dir, { recursive: true, force: true }); + }); + + it('joins books and reviews across two sqlite files', async () => { + const projectDir = join(dir, 'project'); + await initKtxProject({ projectDir }); + + const books = new Database(join(projectDir, 'books.db')); + books.exec("CREATE TABLE books (id INTEGER PRIMARY KEY, title TEXT); INSERT INTO books VALUES (1, 'Clean Code');"); + books.close(); + const reviews = new Database(join(projectDir, 'reviews.db')); + reviews.exec('CREATE TABLE reviews (id INTEGER PRIMARY KEY, book_id INTEGER, rating INTEGER); INSERT INTO reviews VALUES (1, 1, 5);'); + reviews.close(); + + const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8')); + await writeFile( + join(projectDir, 'ktx.yaml'), + serializeKtxProjectConfig({ + ...config, + connections: { + books_db: { driver: 'sqlite', path: 'books.db' }, + reviews_db: { driver: 'sqlite', path: 'reviews.db' }, + }, + }), + 'utf-8', + ); + + const { io, out, err } = fakeIo(); + const code = await runKtxSql( + { + command: 'execute', + projectDir, + connectionId: '_ktx_federated', + sql: 'SELECT b.title, r.rating FROM books_db.books b JOIN reviews_db.reviews r ON b.id = r.book_id', + maxRows: 100, + json: true, + cliVersion: 'test', + }, + io, + { createSqlAnalysis: () => stubSqlAnalysis }, + ); + + expect(code, err()).toBe(0); + const payload = JSON.parse(out()) as { connectionId: string; headers: string[]; rows: unknown[][] }; + expect(payload.connectionId).toBe('_ktx_federated'); + expect(payload.headers).toEqual(['title', 'rating']); + expect(payload.rows).toHaveLength(1); + expect(payload.rows[0][0]).toBe('Clean Code'); + expect(Number(payload.rows[0][1])).toBe(5); + }); +}); diff --git a/packages/cli/test/sql.test.ts b/packages/cli/test/sql.test.ts index 067f76a2..e1222e47 100644 --- a/packages/cli/test/sql.test.ts +++ b/packages/cli/test/sql.test.ts @@ -345,6 +345,58 @@ describe('runKtxSql', () => { expect(connector.executeReadOnly).not.toHaveBeenCalled(); expect(connector.cleanup).toHaveBeenCalledTimes(1); - expect(io.stderr()).toContain('Connection "warehouse" does not support read-only SQL execution.'); + expect(io.stderr()).toContain('does not support read-only SQL execution.'); + }); + + it('routes _ktx_federated through the shared federated executor', async () => { + const projectDir = join(tempDir, 'project'); + await initKtxProject({ projectDir }); + await writeConnections(projectDir, { + books_db: { driver: 'sqlite', path: 'books.db' }, + reviews_db: { driver: 'sqlite', path: 'reviews.db' }, + }); + const executeFederated = vi.fn(async () => ({ + headers: ['title', 'rating'], + rows: [['Clean Code', 5]], + totalRows: 1, + command: 'SELECT', + rowCount: 1, + })); + const memberConnector = makeConnector({ + executeReadOnly: vi.fn(async () => { + throw new Error('member connector must not be used for federated id'); + }), + }); + const io = makeIo(); + + await expect( + runKtxSql( + { + command: 'execute', + projectDir, + connectionId: '_ktx_federated', + sql: 'select 1', + maxRows: 100, + output: 'json', + json: true, + cliVersion: '0.0.0-test', + }, + io.io, + { + createSqlAnalysis: () => makeSqlAnalysis({ ok: true, error: null }), + createScanConnector: vi.fn(async () => memberConnector), + executeFederated, + }, + ), + ).resolves.toBe(0); + + expect(executeFederated).toHaveBeenCalledTimes(1); + expect(memberConnector.executeReadOnly).not.toHaveBeenCalled(); + expect(JSON.parse(io.stdout())).toEqual({ + connectionId: '_ktx_federated', + headers: ['title', 'rating'], + rows: [['Clean Code', 5]], + rowCount: 1, + }); }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 54401eff..6c1eae07 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -143,6 +143,9 @@ importers: '@commander-js/extra-typings': specifier: 14.0.0 version: 14.0.0(commander@14.0.3) + '@duckdb/node-api': + specifier: 1.5.3-r.3 + version: 1.5.3-r.3 '@google-cloud/bigquery': specifier: ^8.3.1 version: 8.3.1 @@ -747,6 +750,56 @@ packages: '@dabh/diagnostics@2.0.8': resolution: {integrity: sha512-R4MSXTVnuMzGD7bzHdW2ZhhdPC/igELENcq5IjEverBvq5hn1SXCWcsi6eSsdWP0/Ur+SItRRjAktmdoX/8R/Q==} + '@duckdb/node-api@1.5.3-r.3': + resolution: {integrity: sha512-FzuL6sevuFfEFwkgiUMRMUAj4TaVqV//L0oo2FVZ9s9oYpLpALF9qZyQv2ucclTNQZwDCkm8+e6yLMc6t8IjlA==} + + '@duckdb/node-bindings-darwin-arm64@1.5.3-r.3': + resolution: {integrity: sha512-ttD8QBesgzHu7Sc4qouuIGLM7PWedLW8GvFbnZEyMqk24mQz1HWFgaT0ivw6nDRaDPUQLB9QnAOq8MZUh1zWHQ==} + cpu: [arm64] + os: [darwin] + + '@duckdb/node-bindings-darwin-x64@1.5.3-r.3': + resolution: {integrity: sha512-Vp9MYtoYf6zUWHdCmHXwUcJlHq3YaaIeULWeSiPUM1hsDflLiZKXtz5i250Ulz03VsfWBjpO4wdM99sjjrYKkg==} + cpu: [x64] + os: [darwin] + + '@duckdb/node-bindings-linux-arm64-musl@1.5.3-r.3': + resolution: {integrity: sha512-IadRyx+98FEynKLXAk2MzReinFgduiDXgNd5Z8c5VKch+8FgBfqkEUYGOnBMMUPT8kuheKdLj23vpWXaCzOgoQ==} + cpu: [arm64] + os: [linux] + libc: [musl] + + '@duckdb/node-bindings-linux-arm64@1.5.3-r.3': + resolution: {integrity: sha512-3HLcrzQE83947JS51UVR7C9qnXQMltCOk4Dnhiz1CD+9u32DGLMgPTIIxclk7O+Q7EwfqzD8JV86Ud+LT1crcQ==} + cpu: [arm64] + os: [linux] + libc: [glibc] + + '@duckdb/node-bindings-linux-x64-musl@1.5.3-r.3': + resolution: {integrity: sha512-5bulS16YhftXcarki4tvCufVslntpQDLOEF6RZ+FSMOGiv5d7SDXqklmVRy4DKW3C5ekgN7S2oYzuGL/ss9BuA==} + cpu: [x64] + os: [linux] + libc: [musl] + + '@duckdb/node-bindings-linux-x64@1.5.3-r.3': + resolution: {integrity: sha512-TXndAL0ZoETq17Df6wB+SUZjLGDmOsKuDSySxB+wy6sHfpRtbDgQibyXRlajVeUkRDwSzBFC5ymy16YG0Fl4iw==} + cpu: [x64] + os: [linux] + libc: [glibc] + + '@duckdb/node-bindings-win32-arm64@1.5.3-r.3': + resolution: {integrity: sha512-55Vu13S0jUudiAGlNWJd7UvlW1iKjwWehD8s93jBCNm0AdE/EJN4nz5rQ0IuWzPWXpMjAYuKu00yE7NdtbTyug==} + cpu: [arm64] + os: [win32] + + '@duckdb/node-bindings-win32-x64@1.5.3-r.3': + resolution: {integrity: sha512-rlOc9ltWQNHuDq99Ah8XaD80nN1ucrSK5AcH/7ibSp9ogX/jswPYlRVE7ODFJAjnQNf8bVvs++Mp+wyGvuG7ag==} + cpu: [x64] + os: [win32] + + '@duckdb/node-bindings@1.5.3-r.3': + resolution: {integrity: sha512-Dphw1a9kKXZnCiWX1YCEAJsQ7WJQO2Ikgxy7m8jy0QVXqAwB9esr5NGsuEL3vMKL7velZHeZCjGOMnHZEcIsdg==} + '@electric-sql/pglite-socket@0.1.5': resolution: {integrity: sha512-/RAye+3EPKfO9nY4tljzxXmkT7yIpFDm0L3F+c28b+Z6uxPOjy/Zz/QEHYHXcrfuUC88/a9S72EO0+3E0j97wQ==} hasBin: true @@ -6755,6 +6808,47 @@ snapshots: enabled: 2.0.0 kuler: 2.0.0 + '@duckdb/node-api@1.5.3-r.3': + dependencies: + '@duckdb/node-bindings': 1.5.3-r.3 + + '@duckdb/node-bindings-darwin-arm64@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-darwin-x64@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-linux-arm64-musl@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-linux-arm64@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-linux-x64-musl@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-linux-x64@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-win32-arm64@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings-win32-x64@1.5.3-r.3': + optional: true + + '@duckdb/node-bindings@1.5.3-r.3': + dependencies: + detect-libc: 2.1.2 + optionalDependencies: + '@duckdb/node-bindings-darwin-arm64': 1.5.3-r.3 + '@duckdb/node-bindings-darwin-x64': 1.5.3-r.3 + '@duckdb/node-bindings-linux-arm64': 1.5.3-r.3 + '@duckdb/node-bindings-linux-arm64-musl': 1.5.3-r.3 + '@duckdb/node-bindings-linux-x64': 1.5.3-r.3 + '@duckdb/node-bindings-linux-x64-musl': 1.5.3-r.3 + '@duckdb/node-bindings-win32-arm64': 1.5.3-r.3 + '@duckdb/node-bindings-win32-x64': 1.5.3-r.3 + '@electric-sql/pglite-socket@0.1.5(@electric-sql/pglite@0.4.5)': dependencies: '@electric-sql/pglite': 0.4.5