mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
feat: add sql analysis batch port
This commit is contained in:
parent
ffbbaf417a
commit
f4021da969
10 changed files with 216 additions and 0 deletions
|
|
@ -154,6 +154,37 @@ describe('managed daemon ingest ports', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('routes SQL batch analysis through the managed daemon runner', async () => {
|
||||
const requestJson = vi.fn(async () => ({
|
||||
results: {
|
||||
orders: {
|
||||
tables_touched: ['public.orders'],
|
||||
columns_by_clause: { select: ['status'] },
|
||||
error: null,
|
||||
},
|
||||
},
|
||||
}));
|
||||
const sqlAnalysis = createManagedDaemonSqlAnalysisPort({ requestJson });
|
||||
|
||||
await expect(sqlAnalysis.analyzeBatch([{ id: 'orders', sql: 'select status from public.orders' }], 'postgres'))
|
||||
.resolves.toEqual(
|
||||
new Map([
|
||||
[
|
||||
'orders',
|
||||
{
|
||||
tablesTouched: ['public.orders'],
|
||||
columnsByClause: { select: ['status'] },
|
||||
error: null,
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
expect(requestJson).toHaveBeenCalledWith('/sql/analyze-batch', {
|
||||
dialect: 'postgres',
|
||||
items: [{ id: 'orders', sql: 'select status from public.orders' }],
|
||||
});
|
||||
});
|
||||
|
||||
it('returns live-database daemon request options backed by the managed runner', async () => {
|
||||
const requestJson = vi.fn(async () => ({
|
||||
connection_id: 'warehouse',
|
||||
|
|
|
|||
|
|
@ -26,6 +26,9 @@ const sqlAnalysis: SqlAnalysisPort = {
|
|||
literalSlots: [{ position: 1, type: 'string', exampleValue: 'paid' }],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
const reader: HistoricSqlQueryHistoryReader = {
|
||||
|
|
|
|||
|
|
@ -83,6 +83,9 @@ function fixtureSqlAnalysis(fixture: GoldenFixture): SqlAnalysisPort {
|
|||
}
|
||||
return result;
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -89,6 +89,9 @@ const sqlAnalysis: SqlAnalysisPort = {
|
|||
literalSlots: [],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
function postgresPullConfig(maxTemplatesPerRun = 5000) {
|
||||
|
|
|
|||
|
|
@ -51,6 +51,9 @@ const fakeSqlAnalysis: SqlAnalysisPort = {
|
|||
literalSlots: [{ position: 1, type: 'string', exampleValue: 'complete' }],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
const categoricalSqlAnalysis: SqlAnalysisPort = {
|
||||
|
|
@ -63,6 +66,9 @@ const categoricalSqlAnalysis: SqlAnalysisPort = {
|
|||
literalSlots: [{ position: 1, type: 'string', exampleValue: status }],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
function categoricalRows(): HistoricSqlRawQueryRow[] {
|
||||
|
|
@ -146,6 +152,9 @@ const diverseSqlAnalysis: SqlAnalysisPort = {
|
|||
literalSlots: [{ position: 1, type: 'string', exampleValue: value }],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
const classificationMatrixSqlAnalysis: SqlAnalysisPort = {
|
||||
|
|
@ -177,6 +186,9 @@ const classificationMatrixSqlAnalysis: SqlAnalysisPort = {
|
|||
],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
function classificationMatrixRows(): HistoricSqlRawQueryRow[] {
|
||||
|
|
@ -699,6 +711,9 @@ describe('stageHistoricSqlTemplates', () => {
|
|||
literalSlots: [],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
await stageHistoricSqlTemplates({
|
||||
|
|
@ -775,6 +790,9 @@ describe('stageHistoricSqlTemplates', () => {
|
|||
literalSlots: [{ position: 1, type: 'string', exampleValue: 'analyst@example.com' }],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
},
|
||||
pullConfig: {
|
||||
dialect: 'snowflake',
|
||||
|
|
|
|||
|
|
@ -92,6 +92,9 @@ describe('local ingest adapters', () => {
|
|||
literalSlots: [],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
const adapters = createDefaultLocalIngestAdapters(project, {
|
||||
historicSql: {
|
||||
|
|
@ -121,6 +124,9 @@ describe('local ingest adapters', () => {
|
|||
literalSlots: [],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
},
|
||||
postgresQueryClient: {
|
||||
async executeQuery() {
|
||||
|
|
@ -166,6 +172,9 @@ describe('local ingest adapters', () => {
|
|||
literalSlots: [],
|
||||
};
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
},
|
||||
postgresQueryClient: {
|
||||
async executeQuery() {
|
||||
|
|
|
|||
|
|
@ -45,6 +45,85 @@ describe('createHttpSqlAnalysisPort', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('calls the SQL batch endpoint and maps snake_case response fields into a Map', async () => {
|
||||
const requestJson = vi.fn(async () => ({
|
||||
results: {
|
||||
orders: {
|
||||
tables_touched: ['public.orders', 'public.customers'],
|
||||
columns_by_clause: {
|
||||
select: ['status'],
|
||||
where: ['created_at'],
|
||||
join: ['customer_id', 'id'],
|
||||
},
|
||||
error: null,
|
||||
},
|
||||
broken: {
|
||||
tables_touched: [],
|
||||
columns_by_clause: {},
|
||||
error: 'Invalid expression / Unexpected token',
|
||||
},
|
||||
},
|
||||
}));
|
||||
const port = createHttpSqlAnalysisPort({ baseUrl: 'http://python.test', requestJson });
|
||||
|
||||
await expect(
|
||||
port.analyzeBatch(
|
||||
[
|
||||
{ id: 'orders', sql: 'select status from public.orders' },
|
||||
{ id: 'broken', sql: 'select * from where' },
|
||||
],
|
||||
'postgres',
|
||||
),
|
||||
).resolves.toEqual(
|
||||
new Map([
|
||||
[
|
||||
'orders',
|
||||
{
|
||||
tablesTouched: ['public.orders', 'public.customers'],
|
||||
columnsByClause: {
|
||||
select: ['status'],
|
||||
where: ['created_at'],
|
||||
join: ['customer_id', 'id'],
|
||||
},
|
||||
error: null,
|
||||
},
|
||||
],
|
||||
[
|
||||
'broken',
|
||||
{
|
||||
tablesTouched: [],
|
||||
columnsByClause: {},
|
||||
error: 'Invalid expression / Unexpected token',
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
|
||||
expect(requestJson).toHaveBeenCalledWith('/sql/analyze-batch', {
|
||||
dialect: 'postgres',
|
||||
items: [
|
||||
{ id: 'orders', sql: 'select status from public.orders' },
|
||||
{ id: 'broken', sql: 'select * from where' },
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('rejects malformed SQL batch responses instead of inventing defaults', async () => {
|
||||
const requestJson = vi.fn(async () => ({
|
||||
results: {
|
||||
orders: {
|
||||
tables_touched: ['public.orders'],
|
||||
columns_by_clause: { select: ['status'], where: [42] },
|
||||
error: null,
|
||||
},
|
||||
},
|
||||
}));
|
||||
const port = createHttpSqlAnalysisPort({ baseUrl: 'http://python.test', requestJson });
|
||||
|
||||
await expect(port.analyzeBatch([{ id: 'orders', sql: 'select status from public.orders' }], 'postgres')).rejects
|
||||
.toThrow('sql analysis response is missing string[] field columns_by_clause.where');
|
||||
});
|
||||
|
||||
it('rejects malformed daemon responses instead of inventing defaults', async () => {
|
||||
const requestJson = vi.fn(async () => ({
|
||||
fingerprint: 'abc',
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ import { request as httpRequest } from 'node:http';
|
|||
import { request as httpsRequest } from 'node:https';
|
||||
import { URL } from 'node:url';
|
||||
import type {
|
||||
SqlAnalysisBatchItem,
|
||||
SqlAnalysisBatchResult,
|
||||
SqlAnalysisDialect,
|
||||
SqlAnalysisFingerprintResult,
|
||||
SqlAnalysisLiteralSlot,
|
||||
|
|
@ -94,6 +96,14 @@ function requiredStringArray(raw: Record<string, unknown>, field: string): strin
|
|||
return value;
|
||||
}
|
||||
|
||||
function requiredObject(raw: Record<string, unknown>, field: string): Record<string, unknown> {
|
||||
const value = raw[field];
|
||||
if (!value || typeof value !== 'object' || Array.isArray(value)) {
|
||||
throw new Error(`sql analysis response is missing object field ${field}`);
|
||||
}
|
||||
return value as Record<string, unknown>;
|
||||
}
|
||||
|
||||
function isLiteralSlotType(value: unknown): value is SqlAnalysisLiteralSlotType {
|
||||
return (
|
||||
value === 'string' ||
|
||||
|
|
@ -144,6 +154,39 @@ function mapResult(raw: Record<string, unknown>): SqlAnalysisFingerprintResult {
|
|||
};
|
||||
}
|
||||
|
||||
function mapColumnsByClause(raw: Record<string, unknown>): SqlAnalysisBatchResult['columnsByClause'] {
|
||||
const value = requiredObject(raw, 'columns_by_clause');
|
||||
const result: SqlAnalysisBatchResult['columnsByClause'] = {};
|
||||
for (const [clause, columns] of Object.entries(value)) {
|
||||
if (!Array.isArray(columns) || columns.some((item) => typeof item !== 'string')) {
|
||||
throw new Error(`sql analysis response is missing string[] field columns_by_clause.${clause}`);
|
||||
}
|
||||
result[clause] = columns;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function mapBatchResult(raw: Record<string, unknown>): SqlAnalysisBatchResult {
|
||||
const error = optionalString(raw, 'error');
|
||||
return {
|
||||
tablesTouched: requiredStringArray(raw, 'tables_touched'),
|
||||
columnsByClause: mapColumnsByClause(raw),
|
||||
...(error !== undefined ? { error } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function mapBatchResponse(raw: Record<string, unknown>): Map<string, SqlAnalysisBatchResult> {
|
||||
const results = requiredObject(raw, 'results');
|
||||
return new Map(
|
||||
Object.entries(results).map(([id, value]) => {
|
||||
if (!value || typeof value !== 'object' || Array.isArray(value)) {
|
||||
throw new Error(`sql analysis response contains invalid batch result ${id}`);
|
||||
}
|
||||
return [id, mapBatchResult(value as Record<string, unknown>)];
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
export function createHttpSqlAnalysisPort(options: HttpSqlAnalysisPortOptions): SqlAnalysisPort {
|
||||
const requestJson = options.requestJson ?? postJson(options.baseUrl);
|
||||
|
||||
|
|
@ -155,5 +198,12 @@ export function createHttpSqlAnalysisPort(options: HttpSqlAnalysisPortOptions):
|
|||
});
|
||||
return mapResult(raw);
|
||||
},
|
||||
async analyzeBatch(items: SqlAnalysisBatchItem[], dialect: SqlAnalysisDialect) {
|
||||
const raw = await requestJson('/sql/analyze-batch', {
|
||||
dialect,
|
||||
items,
|
||||
});
|
||||
return mapBatchResponse(raw);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
export { createHttpSqlAnalysisPort } from './http-sql-analysis-port.js';
|
||||
export type { HttpSqlAnalysisPortOptions, KtxSqlAnalysisHttpJsonRunner } from './http-sql-analysis-port.js';
|
||||
export type {
|
||||
SqlAnalysisBatchItem,
|
||||
SqlAnalysisBatchResult,
|
||||
SqlAnalysisClause,
|
||||
SqlAnalysisDialect,
|
||||
SqlAnalysisFingerprintResult,
|
||||
SqlAnalysisLiteralSlot,
|
||||
|
|
|
|||
|
|
@ -25,6 +25,23 @@ export interface SqlAnalysisFingerprintResult {
|
|||
error?: string | null;
|
||||
}
|
||||
|
||||
export type SqlAnalysisClause = 'select' | 'where' | 'join' | 'groupBy' | 'having' | 'orderBy' | (string & {});
|
||||
|
||||
export interface SqlAnalysisBatchItem {
|
||||
id: string;
|
||||
sql: string;
|
||||
}
|
||||
|
||||
export interface SqlAnalysisBatchResult {
|
||||
tablesTouched: string[];
|
||||
columnsByClause: Partial<Record<SqlAnalysisClause, string[]>>;
|
||||
error?: string | null;
|
||||
}
|
||||
|
||||
export interface SqlAnalysisPort {
|
||||
analyzeForFingerprint(sql: string, dialect: SqlAnalysisDialect): Promise<SqlAnalysisFingerprintResult>;
|
||||
analyzeBatch(
|
||||
items: SqlAnalysisBatchItem[],
|
||||
dialect: SqlAnalysisDialect,
|
||||
): Promise<Map<string, SqlAnalysisBatchResult>>;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue