feat(cli): add read-only sql command (#126)

* feat(cli): add read-only sql command

* fix(cli): rename sql connection flag
This commit is contained in:
Andrey Avtomonov 2026-05-17 10:29:07 +02:00 committed by GitHub
parent c89af7733a
commit 33a142f769
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 742 additions and 1 deletions

View file

@ -70,6 +70,7 @@ Agent integration ready: yes (codex:project)
| `ktx sl search "revenue"` | Search semantic-layer sources | | `ktx sl search "revenue"` | Search semantic-layer sources |
| `ktx sl validate <source> --connection-id <id>` | Validate a semantic source | | `ktx sl validate <source> --connection-id <id>` | Validate a semantic source |
| `ktx sl query --measure <measure> --format sql` | Compile semantic-layer SQL | | `ktx sl query --measure <measure> --format sql` | Compile semantic-layer SQL |
| `ktx sql --connection <id> "select 1"` | Execute read-only SQL |
| `ktx wiki search "revenue definition"` | Search local wiki context | | `ktx wiki search "revenue definition"` | Search local wiki context |
| `ktx mcp start` | Start the local MCP server for agent clients | | `ktx mcp start` | Start the local MCP server for agent clients |

View file

@ -25,6 +25,7 @@ ktx
search <query> search <query>
validate <sourceName> validate <sourceName>
query query
sql
status status
mcp mcp
start start
@ -79,6 +80,9 @@ ktx ingest --all
ktx sl search "revenue" ktx sl search "revenue"
ktx wiki search "revenue recognition" ktx wiki search "revenue recognition"
# Execute read-only SQL
ktx sql --connection warehouse "select count(*) from public.orders"
# Start the local MCP server for agent clients # Start the local MCP server for agent clients
ktx mcp start ktx mcp start
``` ```

View file

@ -0,0 +1,103 @@
---
title: "ktx sql"
description: "Execute parser-validated read-only SQL against a configured connection."
---
Run read-only SQL against a database connection in your KTX project. The command
validates the statement before execution and only accepts a single `SELECT` or
`WITH` query.
## Command signature
Use `ktx sql` with a required connection id and positional SQL text.
```bash
ktx sql --connection <id> [options] <sql...>
```
## Options
Use output flags to choose between terminal display, TSV rows, and structured
JSON.
| Flag | Description | Default |
|------|-------------|---------|
| `-c`, `--connection <id>` | KTX database connection id. Required. | - |
| `--max-rows <n>` | Maximum rows to return. Must be between `1` and `10000`. | `1000` |
| `--output <mode>` | Output mode: `pretty`, `plain` (TSV), or `json`. | `pretty` |
| `--json` | Shortcut for `--output=json` (overrides `--output`). | `false` |
## Examples
Quote SQL in shell scripts and when the query contains spaces or punctuation.
```bash
# Count rows in a table
ktx sql --connection warehouse "select count(*) from public.orders"
# Return a small result set
ktx sql \
--connection warehouse \
--max-rows 25 \
"select id, status from public.orders order by created_at desc"
# Print JSON for agents or scripts
ktx sql \
--connection warehouse \
--json \
"select status, count(*) from public.orders group by status"
# Print TSV rows
ktx sql \
-c warehouse \
--output plain \
"select id, status from public.orders"
```
## Output
Pretty output prints aligned columns and a final row count.
```text
status count
------ -----
paid 42
open 7
2 rows
```
Plain output prints a TSV header row followed by TSV data rows.
```text
status count
paid 42
open 7
```
JSON output preserves connection id, headers, optional header types, rows, and
row count.
```json
{
"connectionId": "warehouse",
"headers": ["status", "count"],
"headerTypes": ["text", "bigint"],
"rows": [
["paid", 42],
["open", 7]
],
"rowCount": 2
}
```
## Common errors
Use the error text to distinguish validation failures from connection failures.
| Error | Cause | Recovery |
|-------|-------|----------|
| `Only one SQL statement can be executed.` | The SQL text contains multiple statements. | Run one query at a time. |
| `SQL contains read/write operation` | The statement is not read-only. | Use a single `SELECT` or `WITH` query. |
| `Connection "<id>" is not configured in ktx.yaml` | The connection id is wrong or missing from the project. | Run `ktx connection list` and retry with an exact id. |
| `does not support read-only SQL execution` | The connection type has no local SQL executor. | Use a supported database connection or query through MCP where available. |

View file

@ -7,6 +7,7 @@
"ktx-connection", "ktx-connection",
"ktx-ingest", "ktx-ingest",
"ktx-sl", "ktx-sl",
"ktx-sql",
"ktx-wiki", "ktx-wiki",
"ktx-status", "ktx-status",
"ktx-mcp", "ktx-mcp",

View file

@ -8,6 +8,7 @@ import { registerWikiCommands } from './commands/knowledge-commands.js';
import { registerMcpCommands } from './commands/mcp-commands.js'; import { registerMcpCommands } from './commands/mcp-commands.js';
import { registerSetupCommands } from './commands/setup-commands.js'; import { registerSetupCommands } from './commands/setup-commands.js';
import { registerSlCommands } from './commands/sl-commands.js'; import { registerSlCommands } from './commands/sl-commands.js';
import { registerSqlCommands } from './commands/sql-commands.js';
import { registerStatusCommands } from './commands/status-commands.js'; import { registerStatusCommands } from './commands/status-commands.js';
import { registerDevCommands } from './dev.js'; import { registerDevCommands } from './dev.js';
import { renderMissingProjectMessage } from './doctor.js'; import { renderMissingProjectMessage } from './doctor.js';
@ -56,7 +57,7 @@ type CommandPathNode = CommandWithGlobalOptions & {
parent?: CommandPathNode | null; parent?: CommandPathNode | null;
}; };
const PROJECT_AWARE_ROOT_COMMANDS = new Set(['setup', 'connection', 'ingest', 'wiki', 'sl', 'status', 'mcp']); const PROJECT_AWARE_ROOT_COMMANDS = new Set(['setup', 'connection', 'ingest', 'wiki', 'sl', 'sql', 'status', 'mcp']);
const COMMANDS_THAT_CREATE_PROJECT = new Set(['setup', 'ktx dev init']); const COMMANDS_THAT_CREATE_PROJECT = new Set(['setup', 'ktx dev init']);
const COMMANDS_WITH_OWN_MISSING_PROJECT_HANDLING = new Set(['status']); const COMMANDS_WITH_OWN_MISSING_PROJECT_HANDLING = new Set(['status']);
const GLOBAL_OPTIONS_WITH_VALUE = new Set(['--project-dir']); const GLOBAL_OPTIONS_WITH_VALUE = new Set(['--project-dir']);
@ -416,6 +417,7 @@ export function buildKtxProgram(options: BuildKtxProgramOptions): Command {
}); });
registerWikiCommands(program, context); registerWikiCommands(program, context);
registerSlCommands(program, context); registerSlCommands(program, context);
registerSqlCommands(program, context);
registerStatusCommands(program, context); registerStatusCommands(program, context);
registerMcpCommands(program, context); registerMcpCommands(program, context);
registerDevCommands(program, context); registerDevCommands(program, context);

View file

@ -7,6 +7,7 @@ import type { KtxPublicIngestArgs } from './public-ingest.js';
import type { KtxRuntimeArgs } from './runtime.js'; import type { KtxRuntimeArgs } from './runtime.js';
import type { KtxSetupArgs } from './setup.js'; import type { KtxSetupArgs } from './setup.js';
import type { KtxSlArgs } from './sl.js'; import type { KtxSlArgs } from './sl.js';
import type { KtxSqlArgs } from './sql.js';
import { profileMark, profileSpan } from './startup-profile.js'; import { profileMark, profileSpan } from './startup-profile.js';
import type { KtxTextIngestArgs } from './text-ingest.js'; import type { KtxTextIngestArgs } from './text-ingest.js';
@ -34,6 +35,7 @@ export interface KtxCliDeps {
runtime?: (args: KtxRuntimeArgs, io: KtxCliIo) => Promise<number>; runtime?: (args: KtxRuntimeArgs, io: KtxCliIo) => Promise<number>;
knowledge?: (args: KtxKnowledgeArgs, io: KtxCliIo) => Promise<number>; knowledge?: (args: KtxKnowledgeArgs, io: KtxCliIo) => Promise<number>;
sl?: (args: KtxSlArgs, io: KtxCliIo) => Promise<number>; sl?: (args: KtxSlArgs, io: KtxCliIo) => Promise<number>;
sql?: (args: KtxSqlArgs, io: KtxCliIo) => Promise<number>;
mcp?: { mcp?: {
startDaemon?: typeof import('./managed-mcp-daemon.js').startKtxMcpDaemon; startDaemon?: typeof import('./managed-mcp-daemon.js').startKtxMcpDaemon;
stopDaemon?: typeof import('./managed-mcp-daemon.js').stopKtxMcpDaemon; stopDaemon?: typeof import('./managed-mcp-daemon.js').stopKtxMcpDaemon;

View file

@ -0,0 +1,99 @@
import { Command } from '@commander-js/extra-typings';
import { describe, expect, it, vi } from 'vitest';
import type { KtxCliCommandContext } from '../cli-program.js';
import { registerSqlCommands } from './sql-commands.js';
function makeContext(overrides: Partial<KtxCliCommandContext> = {}): KtxCliCommandContext {
let exitCode = 0;
return {
io: {
stdout: { write: vi.fn() },
stderr: { write: vi.fn() },
},
deps: {},
packageInfo: { name: '@ktx/cli', version: '0.0.0-test', contextPackageName: '@ktx/context' },
setExitCode: (code) => {
exitCode = code;
},
runInit: vi.fn(),
writeDebug: vi.fn(),
...overrides,
get exitCode() {
return exitCode;
},
} as KtxCliCommandContext;
}
describe('registerSqlCommands', () => {
it('routes positional SQL through the sql runner', async () => {
const program = new Command().exitOverride().option('--project-dir <path>');
const sql = vi.fn(async () => 0);
const context = makeContext({ deps: { sql } });
registerSqlCommands(program, context);
await expect(
program.parseAsync(
['--project-dir', '/tmp/ktx-sql', 'sql', '--connection', 'warehouse', 'select', '1'],
{ from: 'user' },
),
).resolves.toBe(program);
expect(sql).toHaveBeenCalledWith(
{
command: 'execute',
projectDir: '/tmp/ktx-sql',
connectionId: 'warehouse',
sql: 'select 1',
maxRows: 1000,
output: undefined,
json: false,
cliVersion: '0.0.0-test',
},
context.io,
);
});
it('supports the short connection flag', async () => {
const program = new Command().exitOverride().option('--project-dir <path>');
const sql = vi.fn(async () => 0);
const context = makeContext({ deps: { sql } });
registerSqlCommands(program, context);
await expect(
program.parseAsync(['--project-dir', '/tmp/ktx-sql', 'sql', '-c', 'warehouse', 'select 1'], {
from: 'user',
}),
).resolves.toBe(program);
expect(sql).toHaveBeenCalledWith(expect.objectContaining({ connectionId: 'warehouse', sql: 'select 1' }), context.io);
});
it('rejects missing SQL before invoking the runner', async () => {
const program = new Command().exitOverride().option('--project-dir <path>');
const sql = vi.fn(async () => 0);
registerSqlCommands(program, makeContext({ deps: { sql } }));
await expect(
program.parseAsync(['--project-dir', '/tmp/ktx-sql', 'sql', '--connection', 'warehouse'], {
from: 'user',
}),
).rejects.toThrow('missing required argument');
expect(sql).not.toHaveBeenCalled();
});
it('rejects maxRows above the CLI cap', async () => {
const program = new Command().exitOverride().option('--project-dir <path>');
const sql = vi.fn(async () => 0);
registerSqlCommands(program, makeContext({ deps: { sql } }));
await expect(
program.parseAsync(
['--project-dir', '/tmp/ktx-sql', 'sql', '--connection', 'warehouse', '--max-rows', '10001', 'select 1'],
{ from: 'user' },
),
).rejects.toThrow('must be an integer between 1 and 10000');
expect(sql).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,62 @@
import { type Command, InvalidArgumentError, Option } from '@commander-js/extra-typings';
import { type KtxCliCommandContext, resolveCommandProjectDir } from '../cli-program.js';
import type { KtxSqlArgs } from '../sql.js';
import { profileMark } from '../startup-profile.js';
profileMark('module:commands/sql-commands');
const DEFAULT_MAX_ROWS = 1000;
const MAX_ROWS_CAP = 10_000;
function parseSqlMaxRowsOption(value: string): number {
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 1 || parsed > MAX_ROWS_CAP) {
throw new InvalidArgumentError(`must be an integer between 1 and ${MAX_ROWS_CAP}`);
}
return parsed;
}
async function runSqlArgs(context: KtxCliCommandContext, args: KtxSqlArgs): Promise<void> {
const runner = context.deps.sql ?? (await import('../sql.js')).runKtxSql;
context.setExitCode(await runner(args, context.io));
}
export function registerSqlCommands(program: Command, context: KtxCliCommandContext): void {
program
.command('sql')
.description('Execute parser-validated read-only SQL against a configured connection')
.argument('<sql...>', 'SQL query to execute')
.requiredOption('-c, --connection <id>', 'KTX connection id')
.option('--max-rows <n>', 'Maximum rows to return', parseSqlMaxRowsOption, DEFAULT_MAX_ROWS)
.addOption(
new Option('--output <mode>', 'Output mode: pretty (default), plain (TSV), or json').choices([
'pretty',
'plain',
'json',
]),
)
.option('--json', 'Shortcut for --output=json (overrides --output)', false)
.action(
async (
sqlParts: string[],
options: {
connection: string;
maxRows: number;
output?: 'pretty' | 'plain' | 'json';
json?: boolean;
},
command,
) => {
await runSqlArgs(context, {
command: 'execute',
projectDir: resolveCommandProjectDir(command),
connectionId: options.connection,
sql: sqlParts.join(' '),
maxRows: options.maxRows,
output: options.output,
json: options.json === true,
cliVersion: context.packageInfo.version,
});
},
);
}

View file

@ -33,6 +33,7 @@ export type {
} from './setup-sources.js'; } from './setup-sources.js';
export { runKtxSetupSourcesStep } from './setup-sources.js'; export { runKtxSetupSourcesStep } from './setup-sources.js';
export { runKtxRuntime, type KtxRuntimeArgs, type KtxRuntimeDeps } from './runtime.js'; export { runKtxRuntime, type KtxRuntimeArgs, type KtxRuntimeDeps } from './runtime.js';
export { runKtxSql, type KtxSqlArgs, type KtxSqlDeps } from './sql.js';
export { export {
allocateDaemonPort, allocateDaemonPort,
readManagedPythonDaemonStatus, readManagedPythonDaemonStatus,

View file

@ -0,0 +1,295 @@
import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { initKtxProject, parseKtxProjectConfig, serializeKtxProjectConfig } from '@ktx/context/project';
import type { KtxScanConnector } from '@ktx/context/scan';
import type { SqlAnalysisPort } from '@ktx/context/sql-analysis';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { runKtxSql } from './sql.js';
function makeIo() {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
write: (chunk: string) => {
stdout += chunk;
},
},
stderr: {
write: (chunk: string) => {
stderr += chunk;
},
},
},
stdout: () => stdout,
stderr: () => stderr,
};
}
function makeSqlAnalysis(result: Awaited<ReturnType<SqlAnalysisPort['validateReadOnly']>>): SqlAnalysisPort {
return {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(),
validateReadOnly: vi.fn(async () => result),
};
}
function makeConnector(overrides: Partial<KtxScanConnector> = {}): KtxScanConnector {
return {
id: 'sqlite:warehouse',
driver: 'sqlite',
capabilities: {
structuralIntrospection: true,
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
nestedAnalysis: false,
eventStreamDiscovery: false,
formalForeignKeys: true,
estimatedRowCounts: true,
},
introspect: vi.fn(),
executeReadOnly: vi.fn(async () => ({
headers: ['id', 'status'],
headerTypes: ['integer', 'text'],
rows: [
[1, 'paid'],
[2, 'open'],
],
totalRows: 2,
rowCount: 2,
})),
cleanup: vi.fn(async () => undefined),
...overrides,
};
}
describe('runKtxSql', () => {
let tempDir: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-cli-sql-'));
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
async function writeConnections(
projectDir: string,
connections: ReturnType<typeof parseKtxProjectConfig>['connections'],
): Promise<void> {
const config = parseKtxProjectConfig(await readFile(join(projectDir, 'ktx.yaml'), 'utf-8'));
await writeFile(join(projectDir, 'ktx.yaml'), serializeKtxProjectConfig({ ...config, connections }), 'utf-8');
}
it('validates SQL, executes through the scan connector, and prints a pretty table', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await writeConnections(projectDir, { warehouse: { driver: 'sqlite', path: 'warehouse.db' } });
const sqlAnalysis = makeSqlAnalysis({ ok: true, error: null });
const connector = makeConnector();
const createScanConnector = vi.fn(async () => connector);
const io = makeIo();
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'select id, status from orders',
maxRows: 1000,
output: 'pretty',
json: false,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => sqlAnalysis,
createScanConnector,
},
),
).resolves.toBe(0);
expect(sqlAnalysis.validateReadOnly).toHaveBeenCalledWith('select id, status from orders', 'sqlite');
expect(createScanConnector).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), 'warehouse');
expect(connector.executeReadOnly).toHaveBeenCalledWith(
{ connectionId: 'warehouse', sql: 'select id, status from orders', maxRows: 1000 },
{ runId: 'cli-sql' },
);
expect(connector.cleanup).toHaveBeenCalledTimes(1);
expect(io.stdout()).toContain('id status');
expect(io.stdout()).toContain('1 paid');
expect(io.stdout()).toContain('2 open');
expect(io.stdout()).toContain('2 rows');
expect(io.stderr()).toBe('');
});
it('prints JSON output', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await writeConnections(projectDir, { warehouse: { driver: 'sqlite', path: 'warehouse.db' } });
const io = makeIo();
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'select id from orders',
maxRows: 10,
output: undefined,
json: true,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => makeSqlAnalysis({ ok: true, error: null }),
createScanConnector: vi.fn(async () => makeConnector()),
},
),
).resolves.toBe(0);
expect(JSON.parse(io.stdout())).toEqual({
connectionId: 'warehouse',
headers: ['id', 'status'],
headerTypes: ['integer', 'text'],
rows: [
[1, 'paid'],
[2, 'open'],
],
rowCount: 2,
});
});
it('prints plain TSV output', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await writeConnections(projectDir, { warehouse: { driver: 'sqlite', path: 'warehouse.db' } });
const io = makeIo();
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'select id from orders',
maxRows: 10,
output: 'plain',
json: false,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => makeSqlAnalysis({ ok: true, error: null }),
createScanConnector: vi.fn(async () => makeConnector()),
},
),
).resolves.toBe(0);
expect(io.stdout()).toBe('id\tstatus\n1\tpaid\n2\topen\n');
expect(io.stderr()).toBe('');
});
it('rejects non-read-only SQL before executing connector SQL', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await writeConnections(projectDir, { warehouse: { driver: 'sqlite', path: 'warehouse.db' } });
const connector = makeConnector();
const io = makeIo();
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'delete from orders',
maxRows: 1000,
output: 'pretty',
json: false,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => makeSqlAnalysis({ ok: false, error: 'SQL contains read/write operation: Delete' }),
createScanConnector: vi.fn(async () => connector),
},
),
).resolves.toBe(1);
expect(connector.executeReadOnly).not.toHaveBeenCalled();
expect(connector.cleanup).not.toHaveBeenCalled();
expect(io.stderr()).toContain('SQL contains read/write operation: Delete');
});
it('rejects missing connections', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
const io = makeIo();
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'select 1',
maxRows: 1000,
output: 'pretty',
json: false,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => makeSqlAnalysis({ ok: true, error: null }),
},
),
).resolves.toBe(1);
expect(io.stderr()).toContain('Connection "warehouse" is not configured in ktx.yaml');
});
it('rejects connectors without read-only SQL support and still cleans up', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await writeConnections(projectDir, { warehouse: { driver: 'sqlite', path: 'warehouse.db' } });
const connector = makeConnector({
capabilities: {
...makeConnector().capabilities,
readOnlySql: false,
},
});
const io = makeIo();
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'select 1',
maxRows: 1000,
output: 'pretty',
json: false,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => makeSqlAnalysis({ ok: true, error: null }),
createScanConnector: vi.fn(async () => connector),
},
),
).resolves.toBe(1);
expect(connector.executeReadOnly).not.toHaveBeenCalled();
expect(connector.cleanup).toHaveBeenCalledTimes(1);
expect(io.stderr()).toContain('Connection "warehouse" does not support read-only SQL execution.');
});
});

171
packages/cli/src/sql.ts Normal file
View file

@ -0,0 +1,171 @@
import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project';
import type { KtxQueryResult, KtxScanConnector } from '@ktx/context/scan';
import type { SqlAnalysisDialect, SqlAnalysisPort } from '@ktx/context/sql-analysis';
import type { KtxCliIo } from './cli-runtime.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
import { createManagedDaemonSqlAnalysisPort } from './managed-python-http.js';
import { profileMark } from './startup-profile.js';
profileMark('module:sql');
type KtxSqlOutputMode = 'pretty' | 'plain' | 'json';
export type KtxSqlArgs = {
command: 'execute';
projectDir: string;
connectionId: string;
sql: string;
maxRows: number;
output?: KtxSqlOutputMode;
json?: boolean;
cliVersion: string;
};
export interface KtxSqlDeps {
loadProject?: typeof loadKtxProject;
createSqlAnalysis?: () => SqlAnalysisPort;
createScanConnector?: typeof createKtxCliScanConnector;
}
interface SqlExecutionOutput {
connectionId: string;
headers: string[];
headerTypes?: string[];
rows: unknown[][];
rowCount: number;
}
function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDialect {
const normalized = String(driver ?? '').trim().toLowerCase();
const map: Record<string, SqlAnalysisDialect> = {
postgres: 'postgres',
postgresql: 'postgres',
bigquery: 'bigquery',
snowflake: 'snowflake',
mysql: 'mysql',
sqlserver: 'tsql',
mssql: 'tsql',
sqlite: 'sqlite',
sqlite3: 'sqlite',
clickhouse: 'clickhouse',
redshift: 'redshift',
};
return map[normalized] ?? 'postgres';
}
function resolveOutputMode(args: KtxSqlArgs): KtxSqlOutputMode {
if (args.json === true) return 'json';
return args.output ?? 'pretty';
}
function formatValue(value: unknown): string {
if (value === null || value === undefined) return '';
if (typeof value === 'string') return value;
if (typeof value === 'number' || typeof value === 'boolean' || typeof value === 'bigint') return String(value);
return JSON.stringify(value);
}
function printJson(output: SqlExecutionOutput, io: KtxCliIo): void {
io.stdout.write(`${JSON.stringify(output, null, 2)}\n`);
}
function printPlain(output: SqlExecutionOutput, io: KtxCliIo): void {
io.stdout.write(`${output.headers.join('\t')}\n`);
for (const row of output.rows) {
io.stdout.write(`${row.map(formatValue).join('\t')}\n`);
}
}
function printPretty(output: SqlExecutionOutput, io: KtxCliIo): void {
const rows = output.rows.map((row) => row.map(formatValue));
const widths = output.headers.map((header, index) =>
Math.max(header.length, ...rows.map((row) => row[index]?.length ?? 0)),
);
const renderRow = (cells: string[]): string =>
cells.map((cell, index) => cell.padEnd(widths[index] ?? cell.length)).join(' ').trimEnd();
if (output.headers.length > 0) {
io.stdout.write(`${renderRow(output.headers)}\n`);
io.stdout.write(`${renderRow(widths.map((width) => '-'.repeat(width)))}\n`);
}
for (const row of rows) {
io.stdout.write(`${renderRow(row)}\n`);
}
io.stdout.write(`\n${output.rowCount} ${output.rowCount === 1 ? 'row' : 'rows'}\n`);
}
function printSqlResult(output: SqlExecutionOutput, mode: KtxSqlOutputMode, io: KtxCliIo): void {
if (mode === 'json') {
printJson(output, io);
return;
}
if (mode === 'plain') {
printPlain(output, io);
return;
}
printPretty(output, io);
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
if (connector?.cleanup) {
await connector.cleanup();
}
}
function resultOutput(connectionId: string, result: KtxQueryResult): SqlExecutionOutput {
return {
connectionId,
headers: result.headers,
...(result.headerTypes ? { headerTypes: result.headerTypes } : {}),
rows: result.rows,
rowCount: result.rowCount ?? result.rows.length,
};
}
export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps: KtxSqlDeps = {}): Promise<number> {
try {
const project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir });
const connection = project.config.connections[args.connectionId];
if (!connection) {
throw new Error(`Connection "${args.connectionId}" is not configured in ktx.yaml`);
}
const sqlAnalysis =
deps.createSqlAnalysis ??
(() =>
createManagedDaemonSqlAnalysisPort({
cliVersion: args.cliVersion,
projectDir: args.projectDir,
installPolicy: 'auto',
io,
}));
const validation = await sqlAnalysis().validateReadOnly(args.sql, sqlAnalysisDialectForDriver(connection.driver));
if (!validation.ok) {
throw new Error(validation.error ?? 'SQL is not read-only.');
}
const createScanConnector = deps.createScanConnector ?? createKtxCliScanConnector;
let connector: KtxScanConnector | null = null;
try {
connector = await createScanConnector(project as KtxLocalProject, args.connectionId);
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(`Connection "${args.connectionId}" does not support read-only SQL execution.`);
}
const result = await connector.executeReadOnly(
{
connectionId: args.connectionId,
sql: args.sql,
maxRows: args.maxRows,
},
{ runId: 'cli-sql' },
);
printSqlResult(resultOutput(args.connectionId, result), resolveOutputMode(args), io);
return 0;
} finally {
await cleanupConnector(connector);
}
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}
}