feat(duckdb): cross-database federation via derived DuckDB connection (#295)

* feat(duckdb): add @duckdb/node-api dependency for federation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(connectors): extract resolveStringReference to shared module

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(connectors): route all identical connectors through shared resolveStringReference

Collapse the 5 remaining private copies in bigquery, clickhouse, mysql,
snowflake, and sqlserver into the shared module. Fix a latent bug in the
shared module where `~/path` was incorrectly sliced (dropping only `~`,
leaving the leading `/` and making resolve() ignore homedir). Add a
tilde-expansion test that caught the bug and now covers that branch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(sl): reserve _ktx_ connection-id prefix for virtual connections

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(connections): derive virtual federated connection from compatible members

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(duckdb): federated executor builds READ_ONLY attaches and runs SQL

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(duckdb): close federated DuckDB instance and escape quotes in attach url

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(sl): union member source directories for _ktx_federated

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(query): route _ktx_federated through DuckDB executor

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(sl): use duckdb dialect for federated query compilation

Bypass assertSafeConnectionId for _ktx_federated in resolveLocalConnectionId
and loadComputableSources, and resolve the compute dialect to 'duckdb' when
connectionId is FEDERATED_CONNECTION_ID instead of falling through to the
default postgres lookup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(duckdb): end-to-end cross-catalog federated join

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* test(duckdb): harden federated join test with multi-book join-key coverage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(ingest): keep declared cross-DB joins to federated siblings

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(setup): surface federated connection availability after adding a member

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* chore(setup): mark federationNoticeFor @internal for dead-code gate

Also marks attachTypeForDriver, buildAttachStatements, and
isReservedConnectionId @internal — all three are exported solely for
unit-test access with no production cross-file consumer.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(concepts): document cross-database federation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(concepts): correct sqlite two-part naming in federation doc

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(duckdb): quote federated catalog alias so hyphenated connection ids attach

* refactor(duckdb): single-source federation driver list, dedup attach loads

Collapse the parallel ATTACH_COMPATIBLE_DRIVERS set and ATTACH_TYPE_BY_DRIVER
map into one map in federation.ts whose keys are the membership rule. Replace
FederatedMember.config (read only via a type-erasing cast) with a typed url
field extracted at derive time. Emit INSTALL/LOAD once per distinct driver
type instead of once per member.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(duckdb): close federated DuckDB instance on connect failure; dedup id validation

Wrap the federated DuckDB instance in its own try/finally so a failing
connect() or a throwing connection.closeSync() no longer leaks the native
instance. Route setup-sources connection-id validation through the canonical
assertSafeConnectionId so the reserved _ktx_ prefix guard applies there too.
Derive the federated dialect through sqlAnalysisDialectForDriver instead of a
hardcoded literal.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* refactor(federation): carry member connection config and projectDir on FederatedMember

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(federation): resolve per-member attach targets via canonical connector resolvers

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): quote mysql attach-string values like postgres

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): resolve member attach targets via canonical resolvers, supporting sqlite path:

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* refactor(federation): thread projectDir through deriveFederatedConnection callers

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(federation): add shared project read-only SQL executor that routes _ktx_federated

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* test(federation): exercise shared executor default federated path with real DuckDB

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* refactor(federation): route ingest query executor through shared executor

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): route MCP sql_execution _ktx_federated through shared executor

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): preserve cross-DB joins to federated siblings in manifest re-emit

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): preserve declared cross-DB joins through scan re-ingest

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* refactor(federation): document sibling-ref invariant, drop unsafe casts in test

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): namespace federated source names by member to avoid collisions

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* docs(federation): document member-namespaced federated source names

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): preserve member SSL/search_path in attach, classify federated MCP errors

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* refactor(federation): simplify federated dispatch and parallelize sibling reads

Dedup the federated driver ternary in local-query, derive the prefixed
source.name from the already-built name, drop the duplicated error in
federatedAttachTarget's exhaustive switch, inline the one-line
cleanupConnector wrapper, and parallelize federatedSiblingTargets' shard
reads (was sequential await-in-for on the scan hot path).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(federation): carry headerTypes through shared SQL executor

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(federation): add shared federated connection listing builder

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): route ktx sql through shared executor for _ktx_federated parity

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(federation): show _ktx_federated in ktx connection list

Surfaces the virtual federated connection in the output of
`ktx connection list` so agents and users can discover cross-database
querying when 2+ attach-compatible connections are configured.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(federation): surface _ktx_federated in MCP connection_list

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* test(federation): ktx sql federated cross-file join end-to-end

Drive runKtxSql with the real federated DuckDB executor against two on-disk
sqlite files, stubbing only SQL validation. The test surfaced that the JSON
output path could not serialize bigint values DuckDB returns for integer
columns; printJson now coerces bigint to JSON numbers, matching the
plain/pretty paths.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* docs(federation): document direct _ktx_federated query surface

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): coerce DuckDB bigint to number in shared federated executor

DuckDB returns integer columns as JS bigint, which JSON.stringify cannot
serialize. The CLI --json path worked around this with a replacer, but the
MCP sql_execution tool serializes via plain JSON.stringify and crashed on
any federated query selecting an integer column. Coerce bigint to Number
once in executeFederatedQuery so every consumer (CLI, MCP, ingest, SL)
gets a JSON-safe result, and remove the now-redundant CLI replacer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* refactor(federation): simplify driver map and collapse forked MCP SQL path

- Replace the identity-valued ATTACH_TYPE_BY_DRIVER record with a
  ATTACH_COMPATIBLE_DRIVERS Set; the driver name doubles as the attach
  type, so the map encoded nothing beyond membership.
- Switch federatedAttachTarget directly on the driver with a default
  throw, dropping the unreachable post-switch throw and its comment.
- Route the MCP sql_execution standard-connection case through the
  shared executeProjectReadOnlySql instead of reimplementing the
  connector create/capability-check/execute/cleanup ceremony, so
  federated and standard connections share one execution path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* chore(federation): allowlist placeholder credentials for detect-secrets

The federation doc example URL and the federated-attach test fixtures use
literal placeholder credentials that trip detect-secrets. Mark them with
line-scoped pragma allowlist comments so a real secret added later is still
caught.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(federation): correct SL addressing, join pruning, and id-quoting guidance

- Federated SL list/search records carry the virtual `_ktx_federated`
  connection id (member origin stays in the prefixed source name), so rows
  round-trip to `ktx sl -c _ktx_federated read` and the fts index no longer
  clobbers per-connection partitions.
- Prune semantic-layer joins by membership in the connection's own source set
  instead of matching the target's first dotted segment against other
  connection ids; a same-connection join whose target name collides with a
  sibling connection id is preserved, and orphan targets that would poison the
  planner are dropped.
- Document double-quoting for connection ids that are not bare SQL identifiers
  (e.g. "books-db".public.books) in the federated naming hint, the sl-query
  rejection error, and the federation docs.
- Preserve exact federated BIGINT values beyond 2^53 as strings instead of
  rounding, and steer the setup federation notice to raw SQL against
  `_ktx_federated`.

* fix(federation): carry ssl:true into postgres URL attach target

A postgres member configured with `url` plus `ssl: true` resolved to both a
connectionString and an ssl flag, but the federated attach builder early-returned
the bare URL and dropped the ssl intent. DuckDB then handed libpq a URL with no
sslmode, so the URL path silently diverged from the discrete-field path (which
emits sslmode=require) and from the direct scan path (which enforces TLS).

Append sslmode=require to the URL when the member sets ssl, unless the URL
already pins a stronger sslmode.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Andrey Avtomonov <andreybavt@gmail.com>
This commit is contained in:
Kevin Messiaen 2026-06-15 22:01:39 +07:00 committed by GitHub
parent b81391cd9f
commit 6c815ef529
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 2608 additions and 271 deletions

View file

@ -6,6 +6,7 @@ import { type NotionBotInfo, NotionClient } from './context/ingest/adapters/noti
import { createLocalLookerCredentialResolver } from './context/ingest/adapters/looker/local-looker.adapter.js';
import { metabaseRuntimeConfigFromLocalConnection } from './context/ingest/adapters/metabase/local-metabase.adapter.js';
import { testRepoConnection } from './context/ingest/repo-fetch.js';
import { federatedConnectionListing } from './context/connections/federation.js';
import { getDriverRegistration } from './context/connections/drivers.js';
import { parseNotionConnectionConfig, resolveNotionConnectionAuthToken } from './context/connections/notion-config.js';
import { resolveKtxConfigReference } from './context/core/config-reference.js';
@ -447,15 +448,23 @@ export async function runKtxConnection(
io.stdout.write('No connections configured. Run `ktx setup` to add one.\n');
return 0;
}
const idWidth = Math.max('ID'.length, ...entries.map(([id]) => id.length));
const driverWidth = Math.max(
'DRIVER'.length,
const federated = federatedConnectionListing(project.config.connections, args.projectDir);
const idCandidates = [...entries.map(([id]) => id), ...(federated ? [federated.id] : [])];
const driverLengths = [
...entries.map(([, c]) => (c.driver ?? 'unknown').length),
);
...(federated ? [federated.driver.length] : []),
];
const idWidth = Math.max('ID'.length, ...idCandidates.map((id) => id.length));
const driverWidth = Math.max('DRIVER'.length, ...driverLengths);
io.stdout.write(`${'ID'.padEnd(idWidth)} ${'DRIVER'.padEnd(driverWidth)}\n`);
for (const [id, connection] of entries) {
io.stdout.write(`${id.padEnd(idWidth)} ${(connection.driver ?? 'unknown').padEnd(driverWidth)}\n`);
}
if (federated) {
io.stdout.write(`${federated.id.padEnd(idWidth)} ${federated.driver.padEnd(driverWidth)}\n`);
io.stdout.write(` federates: ${federated.members.join(', ')}\n`);
io.stdout.write(` ${federated.hint}\n`);
}
return 0;
}

View file

@ -26,9 +26,7 @@ import {
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '../../context/scan/types.js';
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { resolveStringReference } from '../shared/string-reference.js';
export interface KtxBigQueryConnectionConfig {
driver?: string;
@ -138,18 +136,6 @@ class DefaultBigQueryClientFactory implements KtxBigQueryClientFactory {
}
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function stringConfigValue(
connection: KtxBigQueryConnectionConfig | undefined,
key: keyof KtxBigQueryConnectionConfig,

View file

@ -3,10 +3,8 @@ import { getDialectForDriver } from '../../context/connections/dialects.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { connectorTestFailure, createKtxConnectorCapabilities, type KtxConnectorTestResult, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js';
import { scopedTableNames } from '../../context/scan/table-ref.js';
import { readFileSync } from 'node:fs';
import { resolveStringReference } from '../shared/string-reference.js';
import { Agent as HttpsAgent } from 'node:https';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
export interface KtxClickHouseConnectionConfig {
driver?: string;
@ -142,19 +140,6 @@ function stringConfigValue(
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
const envName = value.slice('env:'.length);
return env[envName] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function maybeNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}

View file

@ -0,0 +1,90 @@
import { sqliteDatabasePathFromConfig, type KtxSqliteConnectionConfig } from '../sqlite/connector.js';
import { postgresPoolConfigFromConfig, type KtxPostgresConnectionConfig } from '../postgres/connector.js';
import {
mysqlConnectionPoolConfigFromConfig,
type KtxMysqlConnectionConfig,
} from '../mysql/connector.js';
import type { FederatedMember } from '../../context/connections/federation.js';
function kvKeyword(value: string): string {
// libpq/DuckDB key-value values quote with single quotes and backslash-escape.
return /[\s'\\]/.test(value) ? `'${value.replaceAll('\\', '\\\\').replaceAll("'", "\\'")}'` : value;
}
function withRequiredSslMode(connectionString: string): string {
// DuckDB passes this libpq URL straight to the server, so an ssl:true member
// must carry sslmode in the URL itself; keep a stronger mode the URL already pins.
const url = new URL(connectionString);
if (url.searchParams.has('sslmode')) {
return connectionString;
}
url.searchParams.set('sslmode', 'require');
return url.toString();
}
function postgresAttachString(member: FederatedMember, env: NodeJS.ProcessEnv): string {
const cfg = postgresPoolConfigFromConfig({
connectionId: member.connectionId,
connection: member.connection as KtxPostgresConnectionConfig,
env,
});
if (cfg.connectionString) {
return cfg.ssl ? withRequiredSslMode(cfg.connectionString) : cfg.connectionString;
}
const parts: string[] = [];
if (cfg.host) parts.push(`host=${kvKeyword(cfg.host)}`);
if (cfg.port) parts.push(`port=${cfg.port}`);
if (cfg.database) parts.push(`dbname=${kvKeyword(cfg.database)}`);
if (cfg.user) parts.push(`user=${kvKeyword(cfg.user)}`);
if (cfg.password) parts.push(`password=${kvKeyword(cfg.password)}`);
if (cfg.ssl) {
parts.push('sslmode=require');
}
if (cfg.options) {
parts.push(`options=${kvKeyword(cfg.options)}`);
}
return parts.join(' ');
}
function mysqlAttachString(member: FederatedMember, env: NodeJS.ProcessEnv): string {
const cfg = mysqlConnectionPoolConfigFromConfig({
connectionId: member.connectionId,
connection: member.connection as KtxMysqlConnectionConfig,
env,
});
const parts: string[] = [
`host=${kvKeyword(cfg.host)}`,
`port=${cfg.port}`,
`database=${kvKeyword(cfg.database)}`,
`user=${kvKeyword(cfg.user)}`,
];
if (cfg.password) {
parts.push(`password=${kvKeyword(cfg.password)}`);
}
if (cfg.ssl) {
parts.push('ssl_mode=REQUIRED');
}
return parts.join(' ');
}
/**
* Resolves a federated member's ktx.yaml config into the connection target
* DuckDB's ATTACH wants for that driver, reusing each connector's canonical
* resolver so federation and standalone scans agree on config interpretation.
*/
export function federatedAttachTarget(member: FederatedMember, env: NodeJS.ProcessEnv): string {
switch (member.driver.toLowerCase()) {
case 'sqlite':
return sqliteDatabasePathFromConfig({
connectionId: member.connectionId,
projectDir: member.projectDir,
connection: member.connection as KtxSqliteConnectionConfig,
});
case 'postgres':
return postgresAttachString(member, env);
case 'mysql':
return mysqlAttachString(member, env);
default:
throw new Error(`Driver "${member.driver}" cannot be attached by DuckDB federation.`);
}
}

View file

@ -0,0 +1,78 @@
import { DuckDBInstance } from '@duckdb/node-api';
import { federatedAttachTarget } from './federated-attach.js';
import type {
KtxSqlQueryExecutionInput,
KtxSqlQueryExecutionResult,
} from '../../context/connections/query-executor.js';
import { normalizeQueryRows } from '../../context/connections/query-executor.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { attachTypeForDriver, type FederatedMember } from '../../context/connections/federation.js';
function quoteDuckdbIdentifier(id: string): string {
return `"${id.replaceAll('"', '""')}"`;
}
const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER);
const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER);
// DuckDB returns integer columns as JS bigint (unserializable by JSON). Values
// in Number's safe range become Number; larger magnitudes become strings so a
// BIGINT beyond 2^53 keeps its exact value instead of silently rounding.
function jsonSafeBigint(value: bigint): number | string {
return value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT ? Number(value) : value.toString();
}
function toJsonSafeRows(rows: unknown[][]): unknown[][] {
return rows.map((row) => row.map((cell) => (typeof cell === 'bigint' ? jsonSafeBigint(cell) : cell)));
}
/** @internal */
export function buildAttachStatements(members: FederatedMember[], env: NodeJS.ProcessEnv): string[] {
const attachments = members.map((member) => ({
type: attachTypeForDriver(member.driver),
url: federatedAttachTarget(member, env),
alias: member.connectionId,
}));
const loadStatements = [...new Set(attachments.map((a) => a.type))].map(
(type) => `INSTALL ${type}; LOAD ${type};`,
);
const attachStatements = attachments.map(
({ type, url, alias }) =>
`ATTACH '${url.replaceAll("'", "''")}' AS ${quoteDuckdbIdentifier(alias)} (TYPE ${type}, READ_ONLY);`,
);
return [...loadStatements, ...attachStatements];
}
export async function executeFederatedQuery(
members: FederatedMember[],
input: KtxSqlQueryExecutionInput,
env: NodeJS.ProcessEnv = process.env,
): Promise<KtxSqlQueryExecutionResult> {
const sql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const attachStatements = buildAttachStatements(members, env);
const instance = await DuckDBInstance.create(':memory:');
try {
const connection = await instance.connect();
try {
for (const statement of attachStatements) {
await connection.run(statement);
}
const reader = await connection.runAndReadAll(sql);
const rows = toJsonSafeRows(normalizeQueryRows(reader.getRows()));
const headers = reader.columnNames();
return {
headers,
rows,
totalRows: rows.length,
command: 'SELECT',
rowCount: rows.length,
};
} finally {
connection.closeSync();
}
} finally {
instance.closeSync();
}
}

View file

@ -1,8 +1,6 @@
import mysql, { type FieldPacket, type Pool, type RowDataPacket } from 'mysql2/promise';
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { getDialectForDriver } from '../../context/connections/dialects.js';
import { resolveStringReference } from '../shared/string-reference.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import {
constraintDiscoveryWarning,
@ -183,19 +181,6 @@ function stringConfigValue(
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
const envName = value.slice('env:'.length);
return env[envName] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function maybeNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}

View file

@ -1,6 +1,4 @@
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { resolveStringReference } from '../shared/string-reference.js';
import { getDialectForDriver } from '../../context/connections/dialects.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js';
@ -281,17 +279,6 @@ function stringConfigValue(
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;

View file

@ -0,0 +1,20 @@
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
/**
* Resolves a config string that may reference an environment variable
* (`env:NAME`) or a file (`file:/path`, `~` expands to the home dir).
* Plain values pass through unchanged.
*/
export function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(rawPath[1] === '/' ? 2 : 1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}

View file

@ -1,8 +1,6 @@
import { createPrivateKey } from 'node:crypto';
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { getDialectForDriver } from '../../context/connections/dialects.js';
import { resolveStringReference } from '../shared/string-reference.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js';
import { scopedTableNames } from '../../context/scan/table-ref.js';
@ -135,18 +133,6 @@ export interface KtxSnowflakeColumnDistinctValuesResult {
const DATE_TYPES = ['DATE', 'TIMESTAMP', 'TIMESTAMP_LTZ', 'TIMESTAMP_NTZ', 'TIMESTAMP_TZ', 'TIME'];
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function stringConfigValue(
connection: KtxSnowflakeConnectionConfig | undefined,
key: keyof KtxSnowflakeConnectionConfig,

View file

@ -25,10 +25,8 @@ import {
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '../../context/scan/types.js';
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import sql from 'mssql';
import { resolveStringReference } from '../shared/string-reference.js';
export interface KtxSqlServerConnectionConfig {
driver?: string;
@ -208,18 +206,6 @@ function stringConfigValue(
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function parseSqlServerUrl(url: string): Partial<KtxSqlServerConnectionConfig> {
const parsed = new URL(url);
return {

View file

@ -0,0 +1,83 @@
import type { KtxProjectConnectionConfig } from '../project/config.js';
/** Stable id for the runtime-derived federated connection. Never written to ktx.yaml. */
export const FEDERATED_CONNECTION_ID = '_ktx_federated';
/**
* Drivers DuckDB can ATTACH for federation. The driver name doubles as the
* DuckDB extension/TYPE name, so this set is the single source of truth for
* both membership (a driver participates iff it appears here) and attach type.
*/
const ATTACH_COMPATIBLE_DRIVERS = new Set(['postgres', 'mysql', 'sqlite']);
export function attachTypeForDriver(driver: string): string {
const normalized = driver.toLowerCase();
if (!ATTACH_COMPATIBLE_DRIVERS.has(normalized)) {
throw new Error(`Driver "${driver}" cannot be attached by DuckDB federation.`);
}
return normalized;
}
export interface FederatedMember {
connectionId: string;
driver: string;
projectDir: string;
connection: KtxProjectConnectionConfig;
}
export interface FederatedConnectionDescriptor {
id: typeof FEDERATED_CONNECTION_ID;
driver: 'duckdb';
members: FederatedMember[];
}
/**
* Derives a virtual federated connection when a project declares 2+
* attach-compatible databases. Returns null otherwise single-DB and
* incompatible projects are unaffected.
*/
export function deriveFederatedConnection(
connections: Record<string, KtxProjectConnectionConfig>,
projectDir: string,
): FederatedConnectionDescriptor | null {
const members: FederatedMember[] = Object.entries(connections)
.filter(([, config]) => ATTACH_COMPATIBLE_DRIVERS.has(config.driver.toLowerCase()))
.map(([connectionId, config]) => ({
connectionId,
driver: config.driver.toLowerCase(),
projectDir,
connection: config,
}));
if (members.length < 2) {
return null;
}
return { id: FEDERATED_CONNECTION_ID, driver: 'duckdb', members };
}
export interface FederatedConnectionListing {
id: typeof FEDERATED_CONNECTION_ID;
driver: 'duckdb';
members: string[];
hint: string;
}
/**
* Listing-facing view of the virtual federated connection for `ktx connection`
* and MCP `connection_list`. Derived from the same declared state as
* deriveFederatedConnection, so both surfaces describe one connection.
*/
export function federatedConnectionListing(
connections: Record<string, KtxProjectConnectionConfig>,
projectDir: string,
): FederatedConnectionListing | null {
const descriptor = deriveFederatedConnection(connections, projectDir);
if (!descriptor) {
return null;
}
return {
id: FEDERATED_CONNECTION_ID,
driver: 'duckdb',
members: descriptor.members.map((member) => member.connectionId),
hint: 'Cross-database queries run here. Name tables connectionId.schema.table (or connectionId.table for sqlite); double-quote any id that is not a bare SQL identifier, e.g. "books-db".public.books.',
};
}

View file

@ -16,6 +16,8 @@ export interface LocalConnectionInfo {
id: string;
name: string;
connectionType: string;
members?: string[];
hint?: string;
}
const DRIVER_TO_CONNECTION_TYPE: Record<string, ConnectionType> = {

View file

@ -0,0 +1,58 @@
import { executeFederatedQuery } from '../../connectors/duckdb/federated-executor.js';
import type { KtxLocalProject } from '../project/project.js';
import type { KtxScanConnector, KtxScanContext } from '../scan/types.js';
import { deriveFederatedConnection, FEDERATED_CONNECTION_ID } from './federation.js';
import type { KtxSqlQueryExecutionInput, KtxSqlQueryExecutionResult } from './query-executor.js';
export interface ExecuteProjectReadOnlySqlDeps {
project: KtxLocalProject;
input: KtxSqlQueryExecutionInput;
createConnector: (connectionId: string) => Promise<KtxScanConnector> | KtxScanConnector;
executeFederated?: typeof executeFederatedQuery;
runId?: string;
}
/**
* Single resolve-and-execute path for project read-only SQL. The federated
* connection is derived from declared state here so every executor entry point
* routes `_ktx_federated` identically; standard connections go through the
* scan connector.
*/
export async function executeProjectReadOnlySql(
deps: ExecuteProjectReadOnlySqlDeps,
): Promise<KtxSqlQueryExecutionResult> {
const { project, input } = deps;
if (input.connectionId === FEDERATED_CONNECTION_ID) {
const descriptor = deriveFederatedConnection(project.config.connections, project.projectDir);
if (!descriptor) {
throw new Error('Federated execution requested but fewer than 2 attach-compatible connections exist.');
}
const runFederated = deps.executeFederated ?? executeFederatedQuery;
return runFederated(descriptor.members, input);
}
let connector: KtxScanConnector | null = null;
try {
connector = await deps.createConnector(input.connectionId);
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(
`Connection "${input.connectionId}" driver "${connector.driver}" does not support read-only SQL execution.`,
);
}
const ctx: KtxScanContext = { runId: deps.runId ?? 'sql-execution' };
const result = await connector.executeReadOnly(
{ connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows },
ctx,
);
return {
headers: result.headers,
...(result.headerTypes ? { headerTypes: result.headerTypes } : {}),
rows: result.rows,
totalRows: result.totalRows,
command: 'SELECT',
rowCount: result.rowCount,
};
} finally {
await connector?.cleanup?.();
}
}

View file

@ -8,8 +8,9 @@ export interface KtxSqlQueryExecutionInput {
maxRows?: number;
}
interface KtxSqlQueryExecutionResult {
export interface KtxSqlQueryExecutionResult {
headers: string[];
headerTypes?: string[];
rows: unknown[][];
totalRows: number;
command: string;

View file

@ -86,6 +86,9 @@ export interface BuildLiveDatabaseManifestShardsInput {
existingPreservedJoins?: Map<string, LiveDatabaseManifestJoinEntry[]>;
existingDescriptions?: Map<string, LiveDatabaseManifestExistingDescriptions>;
existingUsage?: Map<string, TableUsageOutput>;
// Table refs owned by other federated members; declared cross-DB joins to
// these survive even though the target has no shard in this snapshot.
federatedSiblingTargets?: Set<string>;
}
export interface BuildLiveDatabaseManifestShardsResult {
@ -204,15 +207,20 @@ function joinCondition(
.join(' AND ');
}
function buildJoinsByTable(
/** @internal */
export function buildJoinsByTable(
tableNames: Set<string>,
joins: LiveDatabaseManifestJoinData[],
preservedJoins: Map<string, LiveDatabaseManifestJoinEntry[]>,
federatedSiblingTargets: Set<string> = new Set(),
): Map<string, LiveDatabaseManifestJoinEntry[]> {
const joinsByTable = new Map<string, LiveDatabaseManifestJoinEntry[]>();
for (const join of joins) {
if (!tableNames.has(join.fromTable) || !tableNames.has(join.toTable)) {
const fromLocal = tableNames.has(join.fromTable);
const toLocal = tableNames.has(join.toTable);
const toSibling = federatedSiblingTargets.has(join.toTable);
if (!fromLocal || (!toLocal && !toSibling)) {
continue;
}
const relationship = RELATIONSHIP_MAP[join.relationship] ?? join.relationship;
@ -223,13 +231,17 @@ function buildJoinsByTable(
source: join.source,
});
const reverseRelationship = RELATIONSHIP_INVERSE[relationship] ?? 'one_to_many';
addJoinOnce(joinsByTable, join.toTable, {
to: join.fromTable,
on: joinCondition(join.toTable, join.toColumns, join.fromTable, join.fromColumns),
relationship: reverseRelationship,
source: join.source,
});
// Reverse direction only when the target is a local table in THIS snapshot;
// a federated sibling has no shard here, so it gets no reverse entry.
if (toLocal) {
const reverseRelationship = RELATIONSHIP_INVERSE[relationship] ?? 'one_to_many';
addJoinOnce(joinsByTable, join.toTable, {
to: join.fromTable,
on: joinCondition(join.toTable, join.toColumns, join.fromTable, join.fromColumns),
relationship: reverseRelationship,
source: join.source,
});
}
}
for (const [tableName, tableJoins] of preservedJoins) {
@ -237,7 +249,7 @@ function buildJoinsByTable(
continue;
}
for (const join of tableJoins) {
if (tableNames.has(join.to)) {
if (tableNames.has(join.to) || federatedSiblingTargets.has(join.to)) {
addJoinOnce(joinsByTable, tableName, join);
}
}
@ -250,7 +262,12 @@ export function buildLiveDatabaseManifestShards(
input: BuildLiveDatabaseManifestShardsInput,
): BuildLiveDatabaseManifestShardsResult {
const tableNames = new Set(input.tables.map((table) => table.name));
const joinsByTable = buildJoinsByTable(tableNames, input.joins, input.existingPreservedJoins ?? new Map());
const joinsByTable = buildJoinsByTable(
tableNames,
input.joins,
input.existingPreservedJoins ?? new Map(),
input.federatedSiblingTargets ?? new Set(),
);
const shards = new Map<string, LiveDatabaseManifestShard>();
for (const table of input.tables) {

View file

@ -56,7 +56,7 @@ const toolAnnotations = {
const toolDescriptions = {
connection_list:
'List configured read-only data connections available to this ktx project. Use this before connection-scoped tools when the project may have multiple warehouses.',
'List configured read-only data connections available to this ktx project. Use this before connection-scoped tools when the project may have multiple warehouses. A "_ktx_federated" entry (when present) queries all its member databases together; use its id for cross-database joins.',
discover_data:
'Search across ktx wiki pages, semantic-layer sources, measures, dimensions, raw tables, and columns. Example: discover_data({ query: "monthly orders by customer", connectionId: "warehouse", kinds: ["sl_source", "table"] }).',
wiki_search:
@ -227,6 +227,8 @@ const connectionListOutputSchema = z.object({
id: z.string(),
name: z.string(),
connectionType: z.string(),
members: z.array(z.string()).optional(),
hint: z.string().optional(),
}),
),
});

View file

@ -1,12 +1,16 @@
import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-executor.js';
import { resolveConfiguredConnection } from '../../context/connections/resolve-connection.js';
import { KtxExpectedError, KtxQueryError, isNativeProgrammingFault } from '../../errors.js';
import { localConnectionInfoFromConfig } from '../../context/connections/local-warehouse-descriptor.js';
import { executeProjectReadOnlySql } from '../../context/connections/project-sql-executor.js';
import { FEDERATED_CONNECTION_ID, federatedConnectionListing } from '../../context/connections/federation.js';
import { resolveConfiguredConnection } from '../../context/connections/resolve-connection.js';
import {
type LocalConnectionInfo,
localConnectionInfoFromConfig,
} from '../../context/connections/local-warehouse-descriptor.js';
import type { KtxEmbeddingPort } from '../../context/core/embedding.js';
import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { createKtxEntityDetailsService } from '../../context/scan/entity-details.js';
import type { KtxScanConnector } from '../../context/scan/types.js';
import type { LocalScanMcpOptions } from '../../context/scan/local-scan.js';
import { createKtxDiscoverDataService } from '../../context/search/discover.js';
import { sqlAnalysisDialectForDriver } from '../../context/sql-analysis/dialect.js';
@ -26,12 +30,6 @@ interface CreateLocalProjectMcpContextPortsOptions {
embeddingService: KtxEmbeddingPort | null;
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
if (connector?.cleanup) {
await connector.cleanup();
}
}
async function executeValidatedReadOnlySql(
project: KtxLocalProject,
options: CreateLocalProjectMcpContextPortsOptions,
@ -39,60 +37,57 @@ async function executeValidatedReadOnlySql(
onProgress?: KtxMcpProgressCallback,
): Promise<KtxSqlExecutionResponse> {
await onProgress?.({ progress: 0, message: 'Validating SQL' });
const connectionId = assertSafeConnectionId(input.connectionId);
const connection = resolveConfiguredConnection(project.config, connectionId);
if (!options.sqlAnalysis) {
throw new Error('sql_execution requires parser-backed SQL validation.');
}
const validation = await options.sqlAnalysis.validateReadOnly(input.sql, sqlAnalysisDialectForDriver(connection.driver));
if (!validation.ok) {
// A read-only guard rejecting the agent's SQL is an expected outcome, not a
// ktx fault: classify it so reportException keeps it out of Error Tracking.
throw new KtxQueryError(validation.error ?? 'SQL is not read-only.');
}
const createConnector = options.localScan?.createConnector;
if (!createConnector) {
throw new Error('sql_execution requires a local scan connector factory.');
}
let connector: KtxScanConnector | null = null;
try {
connector = await createConnector(connectionId);
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(`Connection "${connectionId}" does not support read-only SQL execution.`);
}
await onProgress?.({ progress: 0.3, message: 'Executing' });
const result = await connector
.executeReadOnly(
{
connectionId,
sql: input.sql,
maxRows: input.maxRows,
},
{ runId: 'mcp-sql-execution' },
)
.catch((error: unknown) => {
// A warehouse/driver rejection (e.g. the agent's SQL failed to compile)
// is a surfaced operational outcome, not a ktx fault: mark it expected
// while preserving the warehouse's own diagnostics. A native JS error
// (TypeError, etc.) signals a bug in connector code — let it propagate
// unchanged so Error Tracking still sees it.
if (isNativeProgrammingFault(error) || error instanceof KtxExpectedError) {
throw error;
}
throw new KtxQueryError(error instanceof Error ? error.message : String(error), { cause: error });
});
const response = {
headers: result.headers,
...(result.headerTypes ? { headerTypes: result.headerTypes } : {}),
rows: result.rows,
rowCount: result.rowCount ?? result.rows.length,
};
await onProgress?.({ progress: 1, message: `Fetched ${response.rowCount} rows` });
return response;
} finally {
await cleanupConnector(connector);
const isFederated = input.connectionId === FEDERATED_CONNECTION_ID;
const connectionId = isFederated ? input.connectionId : assertSafeConnectionId(input.connectionId);
const connection = isFederated ? undefined : resolveConfiguredConnection(project.config, connectionId);
const dialect = sqlAnalysisDialectForDriver(isFederated ? 'duckdb' : connection!.driver);
const validation = await options.sqlAnalysis.validateReadOnly(input.sql, dialect);
if (!validation.ok) {
// A read-only guard rejecting the agent's SQL is an expected outcome, not a
// ktx fault: classify it so reportException keeps it out of Error Tracking.
throw new KtxQueryError(validation.error ?? 'SQL is not read-only.');
}
await onProgress?.({ progress: 0.3, message: 'Executing' });
const result = await executeProjectReadOnlySql({
project,
input: {
connectionId,
projectDir: project.projectDir,
connection,
sql: input.sql,
maxRows: input.maxRows,
},
createConnector,
runId: 'mcp-sql-execution',
}).catch((error: unknown) => {
// A warehouse/driver rejection (e.g. the agent's SQL failed to compile) is a
// surfaced operational outcome, not a ktx fault: mark it expected while
// preserving the warehouse's own diagnostics. A native JS error (TypeError,
// etc.) signals a bug in connector code — let it propagate unchanged so Error
// Tracking still sees it.
if (isNativeProgrammingFault(error) || error instanceof KtxExpectedError) {
throw error;
}
throw new KtxQueryError(error instanceof Error ? error.message : String(error), { cause: error });
});
const response = {
headers: result.headers,
...(result.headerTypes ? { headerTypes: result.headerTypes } : {}),
rows: result.rows,
rowCount: result.rowCount ?? result.rows.length,
};
await onProgress?.({ progress: 1, message: `Fetched ${response.rowCount} rows` });
return response;
}
export function createLocalProjectMcpContextPorts(
@ -103,12 +98,21 @@ export function createLocalProjectMcpContextPorts(
const ports: KtxMcpContextPorts = {
connections: {
async list() {
return Object.entries(project.config.connections)
const configured = Object.entries(project.config.connections)
.map(([id, config]) => localConnectionInfoFromConfig(id, config))
.filter(
(connection): connection is { id: string; name: string; connectionType: string } => connection !== null,
)
.filter((connection): connection is LocalConnectionInfo => connection !== null)
.sort((a, b) => a.id.localeCompare(b.id));
const federated = federatedConnectionListing(project.config.connections, project.projectDir);
if (federated) {
configured.push({
id: federated.id,
name: federated.id,
connectionType: 'DUCKDB',
members: federated.members,
hint: federated.hint,
});
}
return configured;
},
},
knowledge: {

View file

@ -78,6 +78,8 @@ interface KtxConnectionSummary {
id: string;
name: string;
connectionType: string;
members?: string[];
hint?: string;
}
interface KtxConnectionsMcpPort {

View file

@ -4,6 +4,7 @@ import type { TableUsageOutput } from '../../context/ingest/adapters/historic-sq
import type { KtxScanRelationshipConfig } from '../project/config.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { isSlYamlPath } from '../../context/sl/source-files.js';
import { deriveFederatedConnection } from '../connections/federation.js';
import type { KtxLocalScanEnrichmentResult } from './local-enrichment.js';
import {
buildKtxRelationshipArtifacts,
@ -193,10 +194,43 @@ function joinReferencesExistingColumns(
return true;
}
async function federatedSiblingTargets(
project: KtxLocalProject,
connectionId: string,
): Promise<Set<string>> {
const descriptor = deriveFederatedConnection(project.config.connections, project.projectDir);
if (!descriptor) {
return new Set();
}
const siblings = descriptor.members.filter((member) => member.connectionId !== connectionId);
const perSibling = await Promise.all(siblings.map((sibling) => siblingJoinTargets(project, sibling.connectionId)));
return new Set(perSibling.flat());
}
async function siblingJoinTargets(project: KtxLocalProject, connectionId: string): Promise<string[]> {
const listed = await project.fileStore.listFiles(schemaDir(connectionId)).catch(() => ({ files: [] }));
const files = listed.files.filter(isSlYamlPath);
const perFile = await Promise.all(
files.map(async (file) => {
const shard = await project.fileStore
.readFile(file)
.then(({ content }) => YAML.parse(content) as LiveDatabaseManifestShard | null)
.catch(() => null);
// entry.table is buildTableRef's member-local ref (1-3 parts:
// table / schema.table / catalog.schema.table), never connectionId-
// prefixed — so prefixing with the member id yields the fully-qualified
// `to:` form authored in cross-DB joins.
return Object.values(shard?.tables ?? {}).map((entry) => `${connectionId}.${entry.table}`);
}),
);
return perFile.flat();
}
async function loadExistingManifestState(
project: KtxLocalProject,
connectionId: string,
snapshot: KtxSchemaSnapshot,
siblingTargets: Set<string>,
): Promise<ExistingManifestState> {
const descriptions = new Map<string, LiveDatabaseManifestExistingDescriptions>();
const preservedJoins = new Map<string, LiveDatabaseManifestJoinEntry[]>();
@ -236,7 +270,7 @@ async function loadExistingManifestState(
const joins = (entry.joins ?? []).filter((join) => {
return (
(join.source === 'manual' || join.source === 'inferred') &&
validTableNames.has(join.to) &&
(validTableNames.has(join.to) || siblingTargets.has(join.to)) &&
joinReferencesExistingColumns(join, columnsByTable)
);
});
@ -277,7 +311,13 @@ export async function writeLocalScanManifestShards(
};
}
const existing = await loadExistingManifestState(input.project, input.connectionId, input.snapshot);
const siblingTargets = await federatedSiblingTargets(input.project, input.connectionId);
const existing = await loadExistingManifestState(
input.project,
input.connectionId,
input.snapshot,
siblingTargets,
);
const { shards } = buildLiveDatabaseManifestShards({
connectionType: input.driver.toUpperCase(),
tables: snapshotTablesToManifestData(input.snapshot, input.descriptionUpdates),
@ -285,6 +325,7 @@ export async function writeLocalScanManifestShards(
existingDescriptions: existing.descriptions,
existingPreservedJoins: existing.preservedJoins,
existingUsage: existing.usage,
federatedSiblingTargets: siblingTargets,
mapColumnType: (dimensionType) => dimensionType,
});

View file

@ -2,16 +2,23 @@ import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-ex
import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js';
import type { KtxMcpProgressCallback } from '../mcp/types.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { FEDERATED_CONNECTION_ID } from '../connections/federation.js';
import { resolveRequiredConnectionId } from '../connections/resolve-connection.js';
import { sqlAnalysisDialectForDriver } from '../sql-analysis/dialect.js';
import { loadLocalSlSourceRecords } from './local-sl.js';
import { toResolvedWire } from './semantic-layer.service.js';
import { assertSafeConnectionId } from './source-files.js';
import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput } from './types.js';
import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput, SemanticLayerSource } from './types.js';
const COMPILE_ONLY_REASON =
'Local semantic-layer query compiled SQL but no data-source execution adapter is configured.';
const FEDERATED_SL_QUERY_UNSUPPORTED =
`Semantic-layer queries are per-connection and cannot target the federated connection '${FEDERATED_CONNECTION_ID}'. ` +
`Run a cross-database query as read-only SQL instead — ktx sql -c ${FEDERATED_CONNECTION_ID} "SELECT ..." or the sql_execution tool — ` +
'using catalog-qualified table names (connectionId.schema.table, or connectionId.table for sqlite; ' +
'double-quote ids that are not bare identifiers, e.g. "books-db".public.books).';
export interface CompileLocalSlQueryOptions {
connectionId?: string;
query: SemanticLayerQueryInput;
@ -31,13 +38,33 @@ function resolveLocalConnectionId(project: KtxLocalProject, requested: string |
return assertSafeConnectionId(resolveRequiredConnectionId(project.config, requested));
}
// The planner rejects a source set carrying a join whose `to` names a source
// outside that set, which would break every query for this connection. Keep only
// joins resolvable within the connection's own sources; a cross-database join
// (its `to` qualified by a sibling connection id) is just one such unresolvable
// target and runs as raw SQL instead. Membership is the test, not a connection-id
// prefix match, so a same-connection target whose name collides with a sibling
// connection id is preserved.
function withResolvableJoinsOnly(
source: SemanticLayerSource,
knownSourceNames: ReadonlySet<string>,
): SemanticLayerSource {
if (source.joins.length === 0) {
return source;
}
const joins = source.joins.filter((join) => knownSourceNames.has(join.to));
return joins.length === source.joins.length ? source : { ...source, joins };
}
async function loadComputableSources(
project: KtxLocalProject,
connectionId: string,
): Promise<ReturnType<typeof toResolvedWire>[]> {
return (await loadLocalSlSourceRecords(project, { connectionId: assertSafeConnectionId(connectionId) }))
.filter((record) => record.source.table || record.source.sql)
.map((record) => toResolvedWire(record.source));
const records = (await loadLocalSlSourceRecords(project, { connectionId })).filter(
(record) => record.source.table || record.source.sql,
);
const knownSourceNames = new Set(records.map((record) => record.source.name));
return records.map((record) => toResolvedWire(withResolvableJoinsOnly(record.source, knownSourceNames)));
}
function headersFromColumns(columns: Array<Record<string, unknown>>): string[] {
@ -50,9 +77,13 @@ export async function compileLocalSlQuery(
project: KtxLocalProject,
options: CompileLocalSlQueryOptions,
): Promise<CompileLocalSlQueryResult> {
if (options.connectionId === FEDERATED_CONNECTION_ID) {
throw new Error(FEDERATED_SL_QUERY_UNSUPPORTED);
}
await options.onProgress?.({ progress: 0, message: 'Compiling query' });
const connectionId = resolveLocalConnectionId(project, options.connectionId);
const dialect = sqlAnalysisDialectForDriver(project.config.connections[connectionId]?.driver);
const driver = project.config.connections[connectionId]?.driver;
const dialect = sqlAnalysisDialectForDriver(driver);
const sources = await loadComputableSources(project, connectionId);
await options.onProgress?.({ progress: 0.3, message: 'Generating SQL' });
@ -107,7 +138,7 @@ export async function compileLocalSlQuery(
...response.plan,
execution: {
mode: 'executed',
driver: project.config.connections[connectionId]?.driver ?? 'unknown',
driver: driver ?? 'unknown',
maxRows,
rowCount: execution.rowCount,
},

View file

@ -2,6 +2,7 @@ import { join } from 'node:path';
import YAML from 'yaml';
import { z } from 'zod';
import type { KtxEmbeddingPort } from '../../context/core/embedding.js';
import { deriveFederatedConnection, FEDERATED_CONNECTION_ID } from '../connections/federation.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { HybridSearchCore } from '../../context/search/hybrid-search-core.js';
import type { SearchCandidateGenerator } from '../../context/search/types.js';
@ -169,7 +170,38 @@ export async function loadLocalSlSourceRecords(
project: KtxLocalProject,
input: { connectionId: string },
): Promise<LocalSlSourceRecord[]> {
const connectionId = assertSafeConnectionId(input.connectionId);
if (input.connectionId === FEDERATED_CONNECTION_ID) {
const descriptor = deriveFederatedConnection(project.config.connections, project.projectDir);
if (!descriptor) {
return [];
}
const perMember = await Promise.all(
descriptor.members.map(async (member) => {
const records = await loadSingleConnectionSourceRecords(project, member.connectionId);
return records.map((record) => {
// The federated view is one virtual connection: rows carry its id and a
// member-prefixed name, so a listing/search row round-trips to
// `ktx sl -c _ktx_federated read <name>`. Member origin lives in the name.
const name = `${member.connectionId}.${record.name}`;
return {
...record,
connectionId: FEDERATED_CONNECTION_ID,
name,
source: { ...record.source, name },
};
});
}),
);
return perMember.flat();
}
return loadSingleConnectionSourceRecords(project, input.connectionId);
}
async function loadSingleConnectionSourceRecords(
project: KtxLocalProject,
rawConnectionId: string,
): Promise<LocalSlSourceRecord[]> {
const connectionId = assertSafeConnectionId(rawConnectionId);
const dir = `semantic-layer/${connectionId}`;
const schemaDir = `${dir}/_schema`;
const listed = await project.fileStore.listFiles(dir);

View file

@ -23,7 +23,15 @@ function assertSafePathToken(kind: string, value: string): string {
return value;
}
/** @internal */
export function isReservedConnectionId(connectionId: string): boolean {
return connectionId.startsWith('_ktx_');
}
export function assertSafeConnectionId(connectionId: string): string {
if (isReservedConnectionId(connectionId)) {
throw new Error(`Connection id "${connectionId}" uses the reserved "_ktx_" prefix.`);
}
if (!isSafeConnectionId(connectionId)) {
throw new Error(`Unsafe connection id: ${connectionId}`);
}

View file

@ -1,16 +1,14 @@
import { executeFederatedQuery } from './connectors/duckdb/federated-executor.js';
import type { KtxSqlQueryExecutionInput, KtxSqlQueryExecutorPort } from './context/connections/query-executor.js';
import { executeProjectReadOnlySql } from './context/connections/project-sql-executor.js';
import type { KtxLocalProject } from './context/project/project.js';
import type { KtxScanConnector, KtxScanContext } from './context/scan/types.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
type CreateConnector = typeof createKtxCliScanConnector;
export interface KtxCliIngestQueryExecutorDeps {
createConnector?: CreateConnector;
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
await connector?.cleanup?.();
executeFederated?: typeof executeFederatedQuery;
}
export function createKtxCliIngestQueryExecutor(
@ -20,30 +18,13 @@ export function createKtxCliIngestQueryExecutor(
const createConnector = deps.createConnector ?? createKtxCliScanConnector;
return {
async execute(input: KtxSqlQueryExecutionInput) {
let connector: KtxScanConnector | null = null;
try {
connector = await createConnector(project, input.connectionId);
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(
`Connection "${input.connectionId}" driver "${connector.driver}" does not support read-only SQL execution.`,
);
}
const ctx: KtxScanContext = { runId: 'ingest-sql-execution' };
const result = await connector.executeReadOnly(
{ connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows },
ctx,
);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
command: 'SELECT',
rowCount: result.rowCount,
};
} finally {
await cleanupConnector(connector);
}
return executeProjectReadOnlySql({
project,
input,
createConnector: (connectionId) => createConnector(project, connectionId),
executeFederated: deps.executeFederated,
runId: 'ingest-sql-execution',
});
},
};
}

View file

@ -3,6 +3,7 @@ import { readFile, writeFile } from 'node:fs/promises';
import { delimiter, dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { promisify } from 'node:util';
import { deriveFederatedConnection, FEDERATED_CONNECTION_ID } from './context/connections/federation.js';
import { getDriverRegistration } from './context/connections/drivers.js';
import { createLocalKtxLlmRuntimeFromConfig } from './context/llm/local-config.js';
import type { KtxLlmRuntimePort } from './context/llm/runtime-port.js';
@ -1171,6 +1172,26 @@ async function writeConnectionConfig(input: {
if (queryHistory?.enabled === true) {
await ensureHistoricSqlIngestDefaults(input.projectDir);
}
if (input.io) {
const federationNotice = federationNoticeFor(config.connections, input.projectDir);
if (federationNotice) {
writeSetupSection(input.io, 'Federated connection available', [federationNotice]);
}
}
}
/** @internal */
export function federationNoticeFor(
connections: Record<string, KtxProjectConnectionConfig>,
projectDir: string,
): string | null {
const descriptor = deriveFederatedConnection(connections, projectDir);
if (!descriptor) {
return null;
}
const names = descriptor.members.map((m) => m.connectionId).join(', ');
return `Detected ${descriptor.members.length} attach-compatible databases (${names}). Run a cross-database join as read-only SQL against \`${FEDERATED_CONNECTION_ID}\` (ktx sql -c ${FEDERATED_CONNECTION_ID} "SELECT ..."), using catalog-qualified table names.`;
}
async function disableConnectionQueryHistory(projectDir: string, connectionId: string): Promise<void> {

View file

@ -20,6 +20,7 @@ import type { KtxCliIo } from './cli-runtime.js';
import { createCliSpinner, errorMessage, writePrefixedLines } from './clack.js';
import { pickNotionRootPages } from './notion-page-picker.js';
import { runKtxSourceMapping } from './source-mapping.js';
import { assertSafeConnectionId } from './context/sl/source-files.js';
import {
runConnectionSetupWithRecovery,
type ConfigureResult,
@ -206,12 +207,6 @@ async function promptText(
return await prompts.text({ ...options, message: withTextInputNavigation(options.message) });
}
function assertSafeConnectionId(connectionId: string): void {
if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(connectionId)) {
throw new Error(`Unsafe connection id: ${connectionId}`);
}
}
function credentialRef(value: string | undefined, label: string): string {
const ref = value?.trim();
if (!ref) {

View file

@ -1,6 +1,10 @@
import { executeFederatedQuery } from './connectors/duckdb/federated-executor.js';
import { FEDERATED_CONNECTION_ID } from './context/connections/federation.js';
import { executeProjectReadOnlySql } from './context/connections/project-sql-executor.js';
import type { KtxSqlQueryExecutionResult } from './context/connections/query-executor.js';
import { resolveConfiguredConnection } from './context/connections/resolve-connection.js';
import { loadKtxProject, type KtxLocalProject } from './context/project/project.js';
import type { KtxQueryResult, KtxScanConnector } from './context/scan/types.js';
import { sqlAnalysisDialectForDriver } from './context/sql-analysis/dialect.js';
import type { SqlAnalysisDialect, SqlAnalysisPort } from './context/sql-analysis/ports.js';
import type { KtxCliIo } from './cli-runtime.js';
import { type KtxOutputMode, resolveOutputMode } from './io/mode.js';
@ -31,6 +35,7 @@ export interface KtxSqlDeps {
loadProject?: typeof loadKtxProject;
createSqlAnalysis?: () => SqlAnalysisPort;
createScanConnector?: typeof createKtxCliScanConnector;
executeFederated?: typeof executeFederatedQuery;
}
interface SqlExecutionOutput {
@ -41,20 +46,6 @@ interface SqlExecutionOutput {
rowCount: number;
}
function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDialect {
const normalized = String(driver ?? '').trim().toLowerCase();
const map: Record<string, SqlAnalysisDialect> = {
postgres: 'postgres',
bigquery: 'bigquery',
snowflake: 'snowflake',
mysql: 'mysql',
sqlserver: 'tsql',
sqlite: 'sqlite',
clickhouse: 'clickhouse',
};
return map[normalized] ?? 'postgres';
}
function queryVerb(sql: string): 'select' | 'explain' | 'show' | 'with' | 'other' {
const first = sql.trim().split(/\s+/, 1)[0]?.toLowerCase();
if (first === 'select' || first === 'explain' || first === 'show' || first === 'with') {
@ -124,13 +115,7 @@ function printSqlResult(output: SqlExecutionOutput, mode: KtxSqlOutputMode, io:
printPretty(output, io);
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
if (connector?.cleanup) {
await connector.cleanup();
}
}
function resultOutput(connectionId: string, result: KtxQueryResult): SqlExecutionOutput {
function resultOutput(connectionId: string, result: KtxSqlQueryExecutionResult): SqlExecutionOutput {
return {
connectionId,
headers: result.headers,
@ -147,9 +132,10 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps:
let project: KtxLocalProject | undefined;
try {
project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir });
const connection = resolveConfiguredConnection(project.config, args.connectionId);
driver = String(connection.driver ?? 'unknown').toLowerCase();
demoConnection = isDemoConnection(args.connectionId, connection);
const isFederated = args.connectionId === FEDERATED_CONNECTION_ID;
const connection = isFederated ? undefined : resolveConfiguredConnection(project.config, args.connectionId);
driver = isFederated ? 'duckdb' : String(connection?.driver ?? 'unknown').toLowerCase();
demoConnection = isFederated ? false : isDemoConnection(args.connectionId, connection);
const createSqlAnalysis =
deps.createSqlAnalysis ??
@ -161,7 +147,7 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps:
io,
}));
const analysisPort = createSqlAnalysis();
const dialect = sqlAnalysisDialectForDriver(connection.driver);
const dialect: SqlAnalysisDialect = isFederated ? 'duckdb' : sqlAnalysisDialectForDriver(connection?.driver);
const validation = await analysisPort.validateReadOnly(args.sql, dialect);
if (!validation.ok) {
throw new Error(validation.error ?? 'SQL is not read-only.');
@ -169,39 +155,35 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps:
const referencedTableCount = await safeReferencedTableCount(analysisPort, args.sql, dialect);
const createScanConnector = deps.createScanConnector ?? createKtxCliScanConnector;
let connector: KtxScanConnector | null = null;
try {
connector = await createScanConnector(project, args.connectionId);
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(`Connection "${args.connectionId}" does not support read-only SQL execution.`);
}
const result = await connector.executeReadOnly(
{
connectionId: args.connectionId,
sql: args.sql,
maxRows: args.maxRows,
},
{ runId: 'cli-sql' },
);
const mode = resolveOutputMode({ explicit: args.output, json: args.json, io });
printSqlResult(resultOutput(args.connectionId, result), mode, io);
await emitTelemetryEvent({
name: 'sql_completed',
const result = await executeProjectReadOnlySql({
project,
input: {
connectionId: args.connectionId,
projectDir: args.projectDir,
io,
fields: {
driver,
isDemoConnection: demoConnection,
queryVerb: queryVerb(args.sql),
referencedTableCount,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'ok',
},
});
return 0;
} finally {
await cleanupConnector(connector);
}
connection,
sql: args.sql,
maxRows: args.maxRows,
},
createConnector: (connectionId) => createScanConnector(project!, connectionId),
executeFederated: deps.executeFederated,
runId: 'cli-sql',
});
const mode = resolveOutputMode({ explicit: args.output, json: args.json, io });
printSqlResult(resultOutput(args.connectionId, result), mode, io);
await emitTelemetryEvent({
name: 'sql_completed',
projectDir: args.projectDir,
io,
fields: {
driver,
isDemoConnection: demoConnection,
queryVerb: queryVerb(args.sql),
referencedTableCount,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'ok',
},
});
return 0;
} catch (error) {
const errorClass = scrubErrorClass(error);
await emitTelemetryEvent({