mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-13 08:15:14 +02:00
* fix: read semantic sources safely
* test: retarget reindex per-scope error case to a broken manifest
Reading a broken standalone source was made non-fatal in de1f1a8d (it is
surfaced for repair instead of throwing), so the reindex per-scope error
test no longer captured an error. Point it at a corrupt manifest shard,
which is the remaining fatal read failure the per-scope catch must
isolate, and assert the captured error names the offending file.
* fix(sl): decouple semantic-layer file names from warehouse naming rules
The in-file `name:` field is now the sole source identity; the filename is
a derived label that never participates in identity. This removes the
"Unsafe semantic-layer source name" failure class entirely: any warehouse
identifier (Snowflake's uppercase SIGNED_UP, EVENT$LOG, dotted names) can
be read, overlaid, edited, and deleted.
- New `source-files.ts`: one total filename derivation (safe lowercase
names verbatim; otherwise slug + sha256-hash suffix, immune to
case-insensitive-filesystem collisions) and one by-name file resolver.
- Reads resolve by name everywhere; the path-from-name fast path and
`assertSafeSourceName` are gone.
- Writes resolve-then-write: rewrites land on the file that declares the
name (human renames survive); new sources get a derived filename; a
derived path occupied by a different source fails instead of clobbering.
- `readSourceFile` returns null for missing files instead of forcing every
caller to launder IO errors; `deleteSource` distinguishes manifest-backed
sources from not-found instead of silently succeeding.
- `sl_write_source` accepts verbatim warehouse identifiers (snake_case is
now a recommendation for new sources) and rejects sourceName/source.name
mismatches; `sl_edit_source` rejects name-changing edits.
- Ingest projection commits, gate-repair allowlists, and touched-source
derivation use resolved paths / in-file names instead of interpolating
`<connId>/<name>.yaml`.
- Collapsed the five parallel path derivations and duplicated path-token
helpers onto the shared module; dropped dead service methods.
* fix(sl): resolve sources by declared name end-to-end and gate warehouse SQL with the parser-backed validator
- Key broken/renamed semantic-layer files by their recoverable in-file
name (slSourceNameForFile) so mid-edit sources stay reachable under
their real identity in reads, listings, and search
- Derive finalization touched sources from composed-source diffs and
recover deleted files' declared names from the pre-change commit
instead of parsing hash-derived filenames
- Resolve revert/rollback paths against history (listFilesAtCommit) so
human-renamed files are restored where they lived at preHead
- Validate ingest sql_execution through the daemon's sqlglot
validateReadOnly in the connection's dialect, sharing one
driver-to-dialect map (sql-analysis/dialect.ts) across MCP and ingest
- Harden the local read-only SQL backstop: accept leading comments,
reject smuggled second statements, and strip trailing
semicolons/comments before row-limit wrapping
127 lines
5.2 KiB
TypeScript
127 lines
5.2 KiB
TypeScript
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
|
import type { SlConnectionCatalogPort } from '../../../../../src/context/sl/ports.js';
|
|
import type { SqlAnalysisPort } from '../../../../../src/context/sql-analysis/ports.js';
|
|
import type { ToolContext } from '../../../../../src/context/tools/base-tool.js';
|
|
import { SqlExecutionTool } from '../../../../../src/context/ingest/tools/warehouse-verification/sql-execution.tool.js';
|
|
|
|
describe('SqlExecutionTool', () => {
|
|
const connections = {
|
|
executeQuery: vi.fn(),
|
|
getConnectionById: vi.fn(async () => ({ id: 'warehouse', name: 'warehouse', connectionType: 'POSTGRESQL' })),
|
|
} as unknown as SlConnectionCatalogPort & {
|
|
executeQuery: ReturnType<typeof vi.fn>;
|
|
getConnectionById: ReturnType<typeof vi.fn>;
|
|
};
|
|
const sqlAnalysis = {
|
|
validateReadOnly: vi.fn(async () => ({ ok: true, error: null })),
|
|
} as unknown as SqlAnalysisPort & { validateReadOnly: ReturnType<typeof vi.fn> };
|
|
const tool = new SqlExecutionTool(connections, sqlAnalysis);
|
|
const context: ToolContext = {
|
|
sourceId: 'ingest',
|
|
messageId: 'm1',
|
|
userId: 'system',
|
|
session: { allowedConnectionNames: new Set(['warehouse']) } as any,
|
|
};
|
|
|
|
beforeEach(() => {
|
|
connections.executeQuery.mockReset();
|
|
connections.getConnectionById.mockReset();
|
|
connections.getConnectionById.mockResolvedValue({ id: 'warehouse', name: 'warehouse', connectionType: 'POSTGRESQL' });
|
|
sqlAnalysis.validateReadOnly.mockReset();
|
|
sqlAnalysis.validateReadOnly.mockResolvedValue({ ok: true, error: null });
|
|
});
|
|
|
|
it('validates with the parser-backed validator in the connection dialect, then wraps with a capped row limit', async () => {
|
|
connections.executeQuery.mockResolvedValue({ headers: ['status'], rows: [['paid']], totalRows: 1 });
|
|
|
|
const result = await tool.call(
|
|
{ connectionId: 'warehouse', sql: 'select status from public.orders', rowLimit: 5 },
|
|
context,
|
|
);
|
|
|
|
expect(sqlAnalysis.validateReadOnly).toHaveBeenCalledWith('select status from public.orders', 'postgres');
|
|
expect(connections.executeQuery).toHaveBeenCalledWith(
|
|
'warehouse',
|
|
'select * from (select status from public.orders) as ktx_query_result limit 5',
|
|
);
|
|
expect(result.markdown).toContain('| status |');
|
|
expect(result.structured.wrappedSql).toContain('limit 5');
|
|
});
|
|
|
|
it('maps connection types to sqlglot dialects', async () => {
|
|
connections.getConnectionById.mockResolvedValue({ id: 'warehouse', name: 'warehouse', connectionType: 'SNOWFLAKE' });
|
|
connections.executeQuery.mockResolvedValue({ headers: [], rows: [], totalRows: 0 });
|
|
|
|
await tool.call({ connectionId: 'warehouse', sql: 'select 1' }, context);
|
|
|
|
expect(sqlAnalysis.validateReadOnly).toHaveBeenCalledWith('select 1', 'snowflake');
|
|
});
|
|
|
|
it('returns the validator error without executing when validation fails', async () => {
|
|
sqlAnalysis.validateReadOnly.mockResolvedValue({ ok: false, error: 'SQL contains read/write operation: Insert' });
|
|
|
|
const result = await tool.call(
|
|
{ connectionId: 'warehouse', sql: 'with x as (insert into t values (1) returning *) select * from x' },
|
|
context,
|
|
);
|
|
|
|
expect(result.markdown).toContain('SQL contains read/write operation: Insert');
|
|
expect(result.structured.error).toContain('SQL contains read/write operation: Insert');
|
|
expect(connections.executeQuery).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('throws when no parser-backed validator is configured', async () => {
|
|
const unvalidated = new SqlExecutionTool(connections);
|
|
|
|
await expect(unvalidated.call({ connectionId: 'warehouse', sql: 'select 1' }, context)).rejects.toThrow(
|
|
'sql_execution requires parser-backed SQL validation.',
|
|
);
|
|
expect(connections.executeQuery).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it.each(['insert into x values (1)', 'drop table x', 'vacuum'])(
|
|
'keeps the local backstop even when the validator approves: %s',
|
|
async (sql) => {
|
|
const result = await tool.call({ connectionId: 'warehouse', sql }, context);
|
|
|
|
expect(result.markdown).toContain('Only read-only SELECT/WITH queries can be executed locally.');
|
|
expect(connections.executeQuery).not.toHaveBeenCalled();
|
|
},
|
|
);
|
|
|
|
it('surfaces connector errors verbatim', async () => {
|
|
connections.executeQuery.mockRejectedValue(new Error('relation "orbit_analytics.customer" does not exist'));
|
|
|
|
const result = await tool.call(
|
|
{ connectionId: 'warehouse', sql: 'select 1 from orbit_analytics.customer', rowLimit: 1 },
|
|
context,
|
|
);
|
|
|
|
expect(result.markdown).toContain('relation "orbit_analytics.customer" does not exist');
|
|
expect(result.structured.error).toContain('relation "orbit_analytics.customer" does not exist');
|
|
});
|
|
|
|
it('uses connectionId as the public input field', () => {
|
|
const legacyConnectionField = ['connection', 'Name'].join('');
|
|
|
|
expect(
|
|
tool.parseInput({
|
|
connectionId: 'warehouse',
|
|
sql: 'select 1',
|
|
rowLimit: 5,
|
|
}),
|
|
).toEqual({
|
|
connectionId: 'warehouse',
|
|
sql: 'select 1',
|
|
rowLimit: 5,
|
|
});
|
|
|
|
expect(() =>
|
|
tool.parseInput({
|
|
[legacyConnectionField]: 'warehouse',
|
|
sql: 'select 1',
|
|
rowLimit: 5,
|
|
}),
|
|
).toThrow();
|
|
});
|
|
});
|