mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat(connections): add execute-only warehouses; stop silent full-project scans
A configured warehouse was always a scan/ingest target. The only way to use a connection purely for SQL execution (ktx sql / sql_execution) was the leaky workaround of an empty setup.database_connection_ids — which actually re-includes every warehouse via the 'fall back to all' branch — so e.g. a BigQuery connection meant only for read-only queries triggered a full-billing-project scan. - Add a per-connection scan_enabled flag (default true) to warehouse connections. scan_enabled: false registers the connection for execution only and never as a scan target. - Route every scan-target selection path through one predicate (isScanTargetWarehouse): both ingest (primaryWarehouseConnectionIds, including the all-warehouses fallback) and setup (configuredPrimaryConnectionIds) now exclude execute-only connections. Setup validates the credential but skips scope discovery and scan for them. Execution paths are untouched — the warehouse descriptor still resolves, so ktx sql / sql_execution keep working. - Scripted setup with no --database-schema no longer silently scopes the scan to every discovered schema/dataset: it warns with the count and names how to narrow (--database-schema) or opt out (scan_enabled: false).
This commit is contained in:
parent
a02fcab487
commit
ece0dfb2c8
10 changed files with 187 additions and 3 deletions
|
|
@ -15,6 +15,10 @@ Use `ktx sql` with a required connection id and positional SQL text.
|
|||
ktx sql --connection <id> [options] <sql...>
|
||||
```
|
||||
|
||||
`ktx sql` runs against any configured connection, whether or not it is a scan or
|
||||
ingest target. Connections marked `scan_enabled: false` (execute-only) work here
|
||||
too — see [execute-only connections](/docs/configuration/ktx-yaml#execute-only-connections).
|
||||
|
||||
## Options
|
||||
|
||||
Use output flags to choose between terminal display, TSV rows, and structured
|
||||
|
|
|
|||
|
|
@ -158,6 +158,29 @@ connections:
|
|||
dataset_ids: [analytics, mart]
|
||||
```
|
||||
|
||||
#### Execute-only connections
|
||||
|
||||
Set `scan_enabled: false` to register a warehouse for SQL execution only. The
|
||||
connection is usable by `ktx sql` and the agent `sql_execution` tool, but **ktx**
|
||||
never introspects, scans, or ingests it — and `ktx setup` validates the
|
||||
credential without discovering or scanning its schemas. This is the supported way
|
||||
to run read-only queries against shared or public data (for example a BigQuery
|
||||
billing project full of unrelated datasets) without making it a context source.
|
||||
|
||||
```yaml
|
||||
connections:
|
||||
public_bq:
|
||||
driver: bigquery
|
||||
credentials_json: file:./service-account.json
|
||||
scan_enabled: false
|
||||
```
|
||||
|
||||
Without `scan_enabled`, a warehouse is a scan target. In scripted setup
|
||||
(`--no-input`) with no `--database-schema` and no `dataset_ids`/`schemas`, **ktx**
|
||||
scopes the scan to every schema or dataset the credential can see and prints a
|
||||
warning naming the count; pass `--database-schema` to narrow it, or
|
||||
`scan_enabled: false` to register it for execution only.
|
||||
|
||||
For Postgres, MySQL, SQL Server, and Snowflake connections, set
|
||||
`maxConnections` when scan or ingest work needs to stay below the target's
|
||||
connection cap. Postgres, MySQL, and SQL Server default to `10`; Snowflake
|
||||
|
|
|
|||
|
|
@ -72,6 +72,24 @@ export function localConnectionToWarehouseDescriptor(
|
|||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* True when the connection is registered for SQL execution only (`scan_enabled: false`) and
|
||||
* must never be used as a scan/ingest target. Execution paths (`ktx sql`, `sql_execution`) are
|
||||
* unaffected — they resolve the warehouse via {@link localConnectionToWarehouseDescriptor}.
|
||||
*/
|
||||
export function isExecuteOnlyConnection(connection: KtxProjectConnectionConfig | undefined): boolean {
|
||||
return (connection as { scan_enabled?: boolean } | undefined)?.scan_enabled === false;
|
||||
}
|
||||
|
||||
/**
|
||||
* True when the connection is a warehouse AND eligible to be scanned/ingested. This is the single
|
||||
* predicate every scan-target selection path routes through, so execute-only connections are
|
||||
* excluded consistently — including the "fall back to all warehouses" path.
|
||||
*/
|
||||
export function isScanTargetWarehouse(id: string, connection: KtxProjectConnectionConfig | undefined): boolean {
|
||||
return localConnectionToWarehouseDescriptor(id, connection) !== null && !isExecuteOnlyConnection(connection);
|
||||
}
|
||||
|
||||
export function localConnectionTypeForConfig(id: string, connection: KtxProjectConnectionConfig | undefined): string {
|
||||
const descriptor = localConnectionToWarehouseDescriptor(id, connection);
|
||||
if (descriptor) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { join } from 'node:path';
|
||||
import { localConnectionToWarehouseDescriptor } from '../../context/connections/local-warehouse-descriptor.js';
|
||||
import { isScanTargetWarehouse, localConnectionToWarehouseDescriptor } from '../../context/connections/local-warehouse-descriptor.js';
|
||||
import { notionConnectionToPullConfig, parseNotionConnectionConfig } from '../../context/connections/notion-config.js';
|
||||
import { resolveKtxConfigReference } from '../core/config-reference.js';
|
||||
import { ktxLocalStateDbPath } from '../../context/project/local-state-db.js';
|
||||
|
|
@ -147,14 +147,14 @@ export function createDefaultLocalIngestAdapters(
|
|||
function primaryWarehouseConnectionIds(project: KtxLocalProject): string[] {
|
||||
const configuredPrimaryIds = project.config.setup?.database_connection_ids ?? [];
|
||||
const configured = configuredPrimaryIds.filter((connectionId) =>
|
||||
Boolean(localConnectionToWarehouseDescriptor(connectionId, project.config.connections[connectionId])),
|
||||
isScanTargetWarehouse(connectionId, project.config.connections[connectionId]),
|
||||
);
|
||||
if (configured.length > 0) {
|
||||
return [...new Set(configured)];
|
||||
}
|
||||
|
||||
return Object.entries(project.config.connections)
|
||||
.filter(([connectionId, connection]) => Boolean(localConnectionToWarehouseDescriptor(connectionId, connection)))
|
||||
.filter(([connectionId, connection]) => isScanTargetWarehouse(connectionId, connection))
|
||||
.map(([connectionId]) => connectionId)
|
||||
.sort((left, right) => left.localeCompare(right));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,12 @@ function warehouseConnectionSchema<const Driver extends WarehouseDriver>(driver:
|
|||
.describe(
|
||||
'Optional allowlist of fully-qualified table names ("schema.table") to ingest. When set, live-database ingest discards any table whose schema-qualified name is not in this list. Useful for smoke-testing ingest on a single table.',
|
||||
),
|
||||
scan_enabled: z
|
||||
.boolean()
|
||||
.optional()
|
||||
.describe(
|
||||
'When false, this connection is registered for SQL execution only (ktx sql / sql_execution) and is never used as a scan/ingest target. Omit (or true) to scan and ingest it as a primary warehouse.',
|
||||
),
|
||||
})
|
||||
.describe(
|
||||
`${driver} warehouse connection. Additional driver-tunable fields (e.g. context.queryHistory) are accepted and passed through.`,
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import { delimiter, dirname, join } from 'node:path';
|
|||
import { fileURLToPath } from 'node:url';
|
||||
import { promisify } from 'node:util';
|
||||
import { getDriverRegistration } from './context/connections/drivers.js';
|
||||
import { isExecuteOnlyConnection } from './context/connections/local-warehouse-descriptor.js';
|
||||
import { createLocalKtxLlmRuntimeFromConfig } from './context/llm/local-config.js';
|
||||
import type { KtxLlmRuntimePort } from './context/llm/runtime-port.js';
|
||||
import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js';
|
||||
|
|
@ -459,12 +460,14 @@ function configuredPrimaryConnectionIds(
|
|||
const configuredIds =
|
||||
setupConnectionIds
|
||||
?.filter((connectionId) => normalizeDriver(connections[connectionId]?.driver) !== null)
|
||||
.filter((connectionId) => !isExecuteOnlyConnection(connections[connectionId]))
|
||||
.filter((connectionId, index, ids) => ids.indexOf(connectionId) === index) ?? [];
|
||||
if (configuredIds.length > 0) {
|
||||
return configuredIds;
|
||||
}
|
||||
return Object.entries(connections)
|
||||
.filter(([, connection]) => normalizeDriver(connection.driver) !== null)
|
||||
.filter(([, connection]) => !isExecuteOnlyConnection(connection))
|
||||
.map(([connectionId]) => connectionId)
|
||||
.sort((left, right) => left.localeCompare(right));
|
||||
}
|
||||
|
|
@ -1384,11 +1387,13 @@ async function maybeConfigureDatabaseScope(input: {
|
|||
if (input.args.inputMode === 'disabled') {
|
||||
if (spec) {
|
||||
let scopeToWrite: string[] = cliSchemas;
|
||||
let scopedFromDiscovery = false;
|
||||
if (scopeToWrite.length === 0) {
|
||||
try {
|
||||
scopeToWrite = unique(
|
||||
await (input.deps.listSchemas ?? defaultListSchemas)(input.projectDir, input.connectionId),
|
||||
);
|
||||
scopedFromDiscovery = true;
|
||||
} catch (error) {
|
||||
const detail = error instanceof Error ? error.message : String(error);
|
||||
input.io.stderr.write(
|
||||
|
|
@ -1397,6 +1402,18 @@ async function maybeConfigureDatabaseScope(input: {
|
|||
return okValidateResult();
|
||||
}
|
||||
}
|
||||
// Scripted setup with no explicit scope would otherwise silently scan every discovered
|
||||
// schema/dataset the credential can see — including unrelated ones on a shared billing
|
||||
// account. Surface that so the operator can narrow it or register the connection as
|
||||
// execute-only instead of discovering it as a silent full-warehouse scan.
|
||||
if (scopedFromDiscovery && scopeToWrite.length > 1) {
|
||||
input.io.stderr.write(
|
||||
`No --database-schema given for ${input.connectionId}; scanning all ${scopeToWrite.length} ` +
|
||||
`discovered ${spec.nounPlural} (${scopeToWrite.join(', ')}). Pass --database-schema to narrow ` +
|
||||
'the scan, or set connections.' +
|
||||
`${input.connectionId}.scan_enabled: false to register it for SQL execution only.\n`,
|
||||
);
|
||||
}
|
||||
if (scopeToWrite.length > 0) {
|
||||
await writeScopeConfig({
|
||||
projectDir: input.projectDir,
|
||||
|
|
@ -1894,6 +1911,15 @@ async function validateAndScanConnection(input: {
|
|||
const testLines = ['✓ Connection test passed', `Driver: ${driverDisplay}`];
|
||||
writeSetupSection(input.io, `Testing ${input.connectionId}`, testLines);
|
||||
|
||||
// Execute-only connections (scan_enabled: false) are registered for SQL execution only:
|
||||
// the credential is validated above, but ktx never introspects/scans the warehouse.
|
||||
if (isExecuteOnlyConnection(project.config.connections[input.connectionId])) {
|
||||
writeSetupSection(input.io, `Registering ${input.connectionId}`, [
|
||||
'Execute-only connection (scan_enabled: false) — skipping schema scan.',
|
||||
]);
|
||||
return okValidateResult();
|
||||
}
|
||||
|
||||
const scopeStatus = await maybeConfigureDatabaseScope({ ...input, forcePrompt: input.forceScopeAndTables });
|
||||
if (scopeStatus.status !== 'ok') {
|
||||
return scopeStatus;
|
||||
|
|
|
|||
|
|
@ -1,10 +1,32 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
import {
|
||||
isExecuteOnlyConnection,
|
||||
isScanTargetWarehouse,
|
||||
localConnectionInfoFromConfig,
|
||||
localConnectionToWarehouseDescriptor,
|
||||
localConnectionTypeForConfig,
|
||||
} from '../../../src/context/connections/local-warehouse-descriptor.js';
|
||||
|
||||
describe('execute-only warehouse connections', () => {
|
||||
it('treats a warehouse without scan_enabled as a scan target', () => {
|
||||
const connection = { driver: 'postgres', url: 'postgresql://db/a' } as const;
|
||||
expect(isExecuteOnlyConnection(connection)).toBe(false);
|
||||
expect(isScanTargetWarehouse('w', connection)).toBe(true);
|
||||
});
|
||||
|
||||
it('excludes a warehouse with scan_enabled: false from scan targets but still resolves it as a warehouse', () => {
|
||||
const connection = { driver: 'postgres', url: 'postgresql://db/a', scan_enabled: false } as const;
|
||||
expect(isExecuteOnlyConnection(connection)).toBe(true);
|
||||
expect(isScanTargetWarehouse('w', connection)).toBe(false);
|
||||
// Execution paths must still see it as a warehouse so `ktx sql` works.
|
||||
expect(localConnectionToWarehouseDescriptor('w', connection)).not.toBeNull();
|
||||
});
|
||||
|
||||
it('does not treat non-warehouse connections as scan targets', () => {
|
||||
expect(isScanTargetWarehouse('n', { driver: 'notion', auth_token: 'x' } as never)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('localConnectionToWarehouseDescriptor', () => {
|
||||
it('maps local Postgres URLs to canonical warehouse descriptors', () => {
|
||||
expect(
|
||||
|
|
|
|||
|
|
@ -634,6 +634,21 @@ describe('local ingest adapters', () => {
|
|||
await expect(adapter?.listTargetConnectionIds?.('/tmp/staged-dbt')).resolves.toEqual(['warehouse']);
|
||||
});
|
||||
|
||||
it('excludes execute-only (scan_enabled: false) warehouses from primary scan targets', async () => {
|
||||
const adapters = createDefaultLocalIngestAdapters(
|
||||
projectWithConnections({
|
||||
scannable: { driver: 'postgres', url: 'postgresql://db/a' },
|
||||
executeonly: { driver: 'postgres', url: 'postgresql://db/b', scan_enabled: false },
|
||||
docs: { driver: 'dbt', source_dir: './dbt' },
|
||||
} as never),
|
||||
);
|
||||
|
||||
// No setup.database_connection_ids → falls back to "all warehouses", which must now
|
||||
// skip the execute-only connection rather than re-including it.
|
||||
const dbt = adapters.find((adapter) => adapter.source === 'dbt');
|
||||
await expect(dbt?.listTargetConnectionIds?.('/tmp/staged-dbt')).resolves.toEqual(['scannable']);
|
||||
});
|
||||
|
||||
it('passes primary warehouse connection ids to the local Notion adapter', async () => {
|
||||
const adapters = createDefaultLocalIngestAdapters(
|
||||
projectWithConnections({
|
||||
|
|
|
|||
|
|
@ -129,6 +129,18 @@ connections:
|
|||
expect(serialized).not.toContain('completed_steps:');
|
||||
});
|
||||
|
||||
it('parses and serializes a warehouse connection marked execute-only (scan_enabled: false)', () => {
|
||||
const config = parseKtxProjectConfig(`
|
||||
connections:
|
||||
public_bq:
|
||||
driver: bigquery
|
||||
scan_enabled: false
|
||||
`);
|
||||
|
||||
expect(config.connections.public_bq).toMatchObject({ driver: 'bigquery', scan_enabled: false });
|
||||
expect(serializeKtxProjectConfig(config)).toContain('scan_enabled: false');
|
||||
});
|
||||
|
||||
it('parses global direct Anthropic LLM config', () => {
|
||||
const config = parseKtxProjectConfig(`
|
||||
llm:
|
||||
|
|
|
|||
|
|
@ -1586,6 +1586,64 @@ describe('setup databases step', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('registers an execute-only connection (scan_enabled: false) without scanning it', async () => {
|
||||
await writeFile(
|
||||
join(tempDir, 'ktx.yaml'),
|
||||
['connections:', ' public_bq:', ' driver: bigquery', ' scan_enabled: false', ''].join('\n'),
|
||||
'utf-8',
|
||||
);
|
||||
const io = makeIo();
|
||||
const testConnection = vi.fn(async () => 0);
|
||||
const scanConnection = vi.fn(async () => 0);
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
inputMode: 'disabled',
|
||||
databaseConnectionIds: ['public_bq'],
|
||||
databaseSchemas: [],
|
||||
skipDatabases: false,
|
||||
},
|
||||
io.io,
|
||||
{ testConnection, scanConnection, listSchemas: vi.fn(async () => ['a', 'b', 'c']) },
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
// The credential is validated, but the warehouse is never introspected/scanned.
|
||||
expect(testConnection).toHaveBeenCalledWith(tempDir, 'public_bq', expect.anything());
|
||||
expect(scanConnection).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('warns instead of silently scanning every discovered dataset when scripted setup has no scope', async () => {
|
||||
await writeFile(
|
||||
join(tempDir, 'ktx.yaml'),
|
||||
['connections:', ' warehouse:', ' driver: bigquery', ''].join('\n'),
|
||||
'utf-8',
|
||||
);
|
||||
const io = makeIo();
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
inputMode: 'disabled',
|
||||
databaseConnectionIds: ['warehouse'],
|
||||
databaseSchemas: [],
|
||||
skipDatabases: false,
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
listSchemas: vi.fn(async () => ['stripe', 'posthog', 'linear']),
|
||||
listTables: vi.fn(async () => []),
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
expect(io.stderr()).toContain('No --database-schema given for warehouse');
|
||||
expect(io.stderr()).toContain('scan_enabled: false');
|
||||
});
|
||||
|
||||
it('keeps scripted database ids fail-fast even when input mode is auto', async () => {
|
||||
await writeFile(
|
||||
join(tempDir, 'ktx.yaml'),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue