diff --git a/packages/cli/src/managed-python-http.test.ts b/packages/cli/src/managed-python-http.test.ts index c0153c45..7bab7ea5 100644 --- a/packages/cli/src/managed-python-http.test.ts +++ b/packages/cli/src/managed-python-http.test.ts @@ -154,6 +154,37 @@ describe('managed daemon ingest ports', () => { }); }); + it('routes SQL batch analysis through the managed daemon runner', async () => { + const requestJson = vi.fn(async () => ({ + results: { + orders: { + tables_touched: ['public.orders'], + columns_by_clause: { select: ['status'] }, + error: null, + }, + }, + })); + const sqlAnalysis = createManagedDaemonSqlAnalysisPort({ requestJson }); + + await expect(sqlAnalysis.analyzeBatch([{ id: 'orders', sql: 'select status from public.orders' }], 'postgres')) + .resolves.toEqual( + new Map([ + [ + 'orders', + { + tablesTouched: ['public.orders'], + columnsByClause: { select: ['status'] }, + error: null, + }, + ], + ]), + ); + expect(requestJson).toHaveBeenCalledWith('/sql/analyze-batch', { + dialect: 'postgres', + items: [{ id: 'orders', sql: 'select status from public.orders' }], + }); + }); + it('returns live-database daemon request options backed by the managed runner', async () => { const requestJson = vi.fn(async () => ({ connection_id: 'warehouse', diff --git a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts index 40926965..2c038feb 100644 --- a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts @@ -26,6 +26,9 @@ const sqlAnalysis: SqlAnalysisPort = { literalSlots: [{ position: 1, type: 'string', exampleValue: 'paid' }], }; }, + async analyzeBatch() { + return new Map(); + }, }; const reader: HistoricSqlQueryHistoryReader = { diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-pgss-golden.test.ts b/packages/context/src/ingest/adapters/historic-sql/stage-pgss-golden.test.ts index 98a907fd..14450bd3 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-pgss-golden.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-pgss-golden.test.ts @@ -83,6 +83,9 @@ function fixtureSqlAnalysis(fixture: GoldenFixture): SqlAnalysisPort { } return result; }, + async analyzeBatch() { + return new Map(); + }, }; } diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-pgss.test.ts b/packages/context/src/ingest/adapters/historic-sql/stage-pgss.test.ts index 901a0ae2..ebc1f13c 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-pgss.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-pgss.test.ts @@ -89,6 +89,9 @@ const sqlAnalysis: SqlAnalysisPort = { literalSlots: [], }; }, + async analyzeBatch() { + return new Map(); + }, }; function postgresPullConfig(maxTemplatesPerRun = 5000) { diff --git a/packages/context/src/ingest/adapters/historic-sql/stage.test.ts b/packages/context/src/ingest/adapters/historic-sql/stage.test.ts index dfaed511..21a1f850 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage.test.ts @@ -51,6 +51,9 @@ const fakeSqlAnalysis: SqlAnalysisPort = { literalSlots: [{ position: 1, type: 'string', exampleValue: 'complete' }], }; }, + async analyzeBatch() { + return new Map(); + }, }; const categoricalSqlAnalysis: SqlAnalysisPort = { @@ -63,6 +66,9 @@ const categoricalSqlAnalysis: SqlAnalysisPort = { literalSlots: [{ position: 1, type: 'string', exampleValue: status }], }; }, + async analyzeBatch() { + return new Map(); + }, }; function categoricalRows(): HistoricSqlRawQueryRow[] { @@ -146,6 +152,9 @@ const diverseSqlAnalysis: SqlAnalysisPort = { literalSlots: [{ position: 1, type: 'string', exampleValue: value }], }; }, + async analyzeBatch() { + return new Map(); + }, }; const classificationMatrixSqlAnalysis: SqlAnalysisPort = { @@ -177,6 +186,9 @@ const classificationMatrixSqlAnalysis: SqlAnalysisPort = { ], }; }, + async analyzeBatch() { + return new Map(); + }, }; function classificationMatrixRows(): HistoricSqlRawQueryRow[] { @@ -699,6 +711,9 @@ describe('stageHistoricSqlTemplates', () => { literalSlots: [], }; }, + async analyzeBatch() { + return new Map(); + }, }; await stageHistoricSqlTemplates({ @@ -775,6 +790,9 @@ describe('stageHistoricSqlTemplates', () => { literalSlots: [{ position: 1, type: 'string', exampleValue: 'analyst@example.com' }], }; }, + async analyzeBatch() { + return new Map(); + }, }, pullConfig: { dialect: 'snowflake', diff --git a/packages/context/src/ingest/local-adapters.test.ts b/packages/context/src/ingest/local-adapters.test.ts index 009cdda2..ae1aeb5c 100644 --- a/packages/context/src/ingest/local-adapters.test.ts +++ b/packages/context/src/ingest/local-adapters.test.ts @@ -92,6 +92,9 @@ describe('local ingest adapters', () => { literalSlots: [], }; }, + async analyzeBatch() { + return new Map(); + }, }; const adapters = createDefaultLocalIngestAdapters(project, { historicSql: { @@ -121,6 +124,9 @@ describe('local ingest adapters', () => { literalSlots: [], }; }, + async analyzeBatch() { + return new Map(); + }, }, postgresQueryClient: { async executeQuery() { @@ -166,6 +172,9 @@ describe('local ingest adapters', () => { literalSlots: [], }; }, + async analyzeBatch() { + return new Map(); + }, }, postgresQueryClient: { async executeQuery() { diff --git a/packages/context/src/sql-analysis/http-sql-analysis-port.test.ts b/packages/context/src/sql-analysis/http-sql-analysis-port.test.ts index f9bf513b..6e22fd47 100644 --- a/packages/context/src/sql-analysis/http-sql-analysis-port.test.ts +++ b/packages/context/src/sql-analysis/http-sql-analysis-port.test.ts @@ -45,6 +45,85 @@ describe('createHttpSqlAnalysisPort', () => { }); }); + it('calls the SQL batch endpoint and maps snake_case response fields into a Map', async () => { + const requestJson = vi.fn(async () => ({ + results: { + orders: { + tables_touched: ['public.orders', 'public.customers'], + columns_by_clause: { + select: ['status'], + where: ['created_at'], + join: ['customer_id', 'id'], + }, + error: null, + }, + broken: { + tables_touched: [], + columns_by_clause: {}, + error: 'Invalid expression / Unexpected token', + }, + }, + })); + const port = createHttpSqlAnalysisPort({ baseUrl: 'http://python.test', requestJson }); + + await expect( + port.analyzeBatch( + [ + { id: 'orders', sql: 'select status from public.orders' }, + { id: 'broken', sql: 'select * from where' }, + ], + 'postgres', + ), + ).resolves.toEqual( + new Map([ + [ + 'orders', + { + tablesTouched: ['public.orders', 'public.customers'], + columnsByClause: { + select: ['status'], + where: ['created_at'], + join: ['customer_id', 'id'], + }, + error: null, + }, + ], + [ + 'broken', + { + tablesTouched: [], + columnsByClause: {}, + error: 'Invalid expression / Unexpected token', + }, + ], + ]), + ); + + expect(requestJson).toHaveBeenCalledWith('/sql/analyze-batch', { + dialect: 'postgres', + items: [ + { id: 'orders', sql: 'select status from public.orders' }, + { id: 'broken', sql: 'select * from where' }, + ], + }); + }); + + it('rejects malformed SQL batch responses instead of inventing defaults', async () => { + const requestJson = vi.fn(async () => ({ + results: { + orders: { + tables_touched: ['public.orders'], + columns_by_clause: { select: ['status'], where: [42] }, + error: null, + }, + }, + })); + const port = createHttpSqlAnalysisPort({ baseUrl: 'http://python.test', requestJson }); + + await expect(port.analyzeBatch([{ id: 'orders', sql: 'select status from public.orders' }], 'postgres')).rejects + .toThrow('sql analysis response is missing string[] field columns_by_clause.where'); + }); + it('rejects malformed daemon responses instead of inventing defaults', async () => { const requestJson = vi.fn(async () => ({ fingerprint: 'abc', diff --git a/packages/context/src/sql-analysis/http-sql-analysis-port.ts b/packages/context/src/sql-analysis/http-sql-analysis-port.ts index a26d69e4..9da37556 100644 --- a/packages/context/src/sql-analysis/http-sql-analysis-port.ts +++ b/packages/context/src/sql-analysis/http-sql-analysis-port.ts @@ -2,6 +2,8 @@ import { request as httpRequest } from 'node:http'; import { request as httpsRequest } from 'node:https'; import { URL } from 'node:url'; import type { + SqlAnalysisBatchItem, + SqlAnalysisBatchResult, SqlAnalysisDialect, SqlAnalysisFingerprintResult, SqlAnalysisLiteralSlot, @@ -94,6 +96,14 @@ function requiredStringArray(raw: Record, field: string): strin return value; } +function requiredObject(raw: Record, field: string): Record { + const value = raw[field]; + if (!value || typeof value !== 'object' || Array.isArray(value)) { + throw new Error(`sql analysis response is missing object field ${field}`); + } + return value as Record; +} + function isLiteralSlotType(value: unknown): value is SqlAnalysisLiteralSlotType { return ( value === 'string' || @@ -144,6 +154,39 @@ function mapResult(raw: Record): SqlAnalysisFingerprintResult { }; } +function mapColumnsByClause(raw: Record): SqlAnalysisBatchResult['columnsByClause'] { + const value = requiredObject(raw, 'columns_by_clause'); + const result: SqlAnalysisBatchResult['columnsByClause'] = {}; + for (const [clause, columns] of Object.entries(value)) { + if (!Array.isArray(columns) || columns.some((item) => typeof item !== 'string')) { + throw new Error(`sql analysis response is missing string[] field columns_by_clause.${clause}`); + } + result[clause] = columns; + } + return result; +} + +function mapBatchResult(raw: Record): SqlAnalysisBatchResult { + const error = optionalString(raw, 'error'); + return { + tablesTouched: requiredStringArray(raw, 'tables_touched'), + columnsByClause: mapColumnsByClause(raw), + ...(error !== undefined ? { error } : {}), + }; +} + +function mapBatchResponse(raw: Record): Map { + const results = requiredObject(raw, 'results'); + return new Map( + Object.entries(results).map(([id, value]) => { + if (!value || typeof value !== 'object' || Array.isArray(value)) { + throw new Error(`sql analysis response contains invalid batch result ${id}`); + } + return [id, mapBatchResult(value as Record)]; + }), + ); +} + export function createHttpSqlAnalysisPort(options: HttpSqlAnalysisPortOptions): SqlAnalysisPort { const requestJson = options.requestJson ?? postJson(options.baseUrl); @@ -155,5 +198,12 @@ export function createHttpSqlAnalysisPort(options: HttpSqlAnalysisPortOptions): }); return mapResult(raw); }, + async analyzeBatch(items: SqlAnalysisBatchItem[], dialect: SqlAnalysisDialect) { + const raw = await requestJson('/sql/analyze-batch', { + dialect, + items, + }); + return mapBatchResponse(raw); + }, }; } diff --git a/packages/context/src/sql-analysis/index.ts b/packages/context/src/sql-analysis/index.ts index 89e3ada9..8338b822 100644 --- a/packages/context/src/sql-analysis/index.ts +++ b/packages/context/src/sql-analysis/index.ts @@ -1,6 +1,9 @@ export { createHttpSqlAnalysisPort } from './http-sql-analysis-port.js'; export type { HttpSqlAnalysisPortOptions, KtxSqlAnalysisHttpJsonRunner } from './http-sql-analysis-port.js'; export type { + SqlAnalysisBatchItem, + SqlAnalysisBatchResult, + SqlAnalysisClause, SqlAnalysisDialect, SqlAnalysisFingerprintResult, SqlAnalysisLiteralSlot, diff --git a/packages/context/src/sql-analysis/ports.ts b/packages/context/src/sql-analysis/ports.ts index 69b15780..3361a7c4 100644 --- a/packages/context/src/sql-analysis/ports.ts +++ b/packages/context/src/sql-analysis/ports.ts @@ -25,6 +25,23 @@ export interface SqlAnalysisFingerprintResult { error?: string | null; } +export type SqlAnalysisClause = 'select' | 'where' | 'join' | 'groupBy' | 'having' | 'orderBy' | (string & {}); + +export interface SqlAnalysisBatchItem { + id: string; + sql: string; +} + +export interface SqlAnalysisBatchResult { + tablesTouched: string[]; + columnsByClause: Partial>; + error?: string | null; +} + export interface SqlAnalysisPort { analyzeForFingerprint(sql: string, dialect: SqlAnalysisDialect): Promise; + analyzeBatch( + items: SqlAnalysisBatchItem[], + dialect: SqlAnalysisDialect, + ): Promise>; }