From 109dd8d83697aa3c8376788ec3338e4b364a50e6 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 19:13:16 +0200 Subject: [PATCH] refactor: remove legacy historic sql reader fetch surface --- .../bigquery-query-history-reader.test.ts | 130 ++--------------- .../bigquery-query-history-reader.ts | 78 +--------- .../adapters/historic-sql/evidence.test.ts | 31 ++-- .../historic-sql/historic-sql.adapter.test.ts | 3 +- .../snowflake-query-history-reader.test.ts | 133 ++---------------- .../snowflake-query-history-reader.ts | 83 +---------- .../src/ingest/local-bundle-runtime.ts | 2 +- 7 files changed, 48 insertions(+), 412 deletions(-) diff --git a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts index 25b11ecd..75db8644 100644 --- a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts @@ -63,122 +63,6 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => { await expect(reader.probe(client)).rejects.toBeInstanceOf(HistoricSqlGrantsMissingError); }); - it('fetches BigQuery jobs with cursor and maps them into RawQueryRow shape without rowsProduced', async () => { - const client = queryClient([ - { - headers: [ - 'job_id', - 'query', - 'user_email', - 'creation_time', - 'end_time', - 'runtime_ms', - 'total_slot_ms', - 'total_bytes_processed', - 'state', - 'error_reason', - 'error_message', - 'statement_type', - ], - rows: [ - [ - 'bquxjob_1', - "SELECT COUNT(*) FROM `project-1.analytics.orders` WHERE status = 'paid'", - 'analyst-a@example.test', - '2026-05-04T10:00:00.000Z', - '2026-05-04T10:00:01.250Z', - 1250, - 3106, - 161164718, - 'DONE', - null, - null, - 'SELECT', - ], - [ - 'bquxjob_2', - 'SELECT * FROM `project-1.analytics.missing_table`', - 'analyst-b@example.test', - new Date('2026-05-04T10:05:00.000Z'), - null, - null, - 0, - 0, - 'DONE', - 'notFound', - 'Not found: Table project-1.analytics.missing_table', - 'SELECT', - ], - ], - totalRows: 2, - }, - ]); - const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'US' }); - - const rows = []; - for await (const row of reader.fetch( - client, - { - start: new Date('2026-05-01T00:00:00.000Z'), - end: new Date('2026-05-04T12:00:00.000Z'), - }, - '2026-05-03T00:00:00.000Z', - )) { - rows.push(row); - } - - expect(client.executeQuery).toHaveBeenCalledTimes(1); - const sql = firstQuery(client); - expect(sql).toContain('FROM `project-1.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`'); - expect(sql).toContain("creation_time >= TIMESTAMP('2026-05-03T00:00:00.000Z')"); - expect(sql).toContain("creation_time < TIMESTAMP('2026-05-04T12:00:00.000Z')"); - expect(sql).toContain("job_type = 'QUERY'"); - expect(sql).toContain("(statement_type IS NULL OR statement_type != 'SCRIPT')"); - expect(sql).toContain('ORDER BY creation_time ASC, job_id ASC'); - expect(sql).toContain('total_slot_ms'); - expect(sql).toContain('total_bytes_processed'); - expect(sql).not.toMatch(/total_rows/i); - - expect(rows).toEqual([ - { - id: 'bquxjob_1', - sql: "SELECT COUNT(*) FROM `project-1.analytics.orders` WHERE status = 'paid'", - user: 'analyst-a@example.test', - startedAt: '2026-05-04T10:00:00.000Z', - endedAt: '2026-05-04T10:00:01.250Z', - runtimeMs: 1250, - success: true, - errorMessage: null, - }, - { - id: 'bquxjob_2', - sql: 'SELECT * FROM `project-1.analytics.missing_table`', - user: 'analyst-b@example.test', - startedAt: '2026-05-04T10:05:00.000Z', - endedAt: null, - runtimeMs: null, - success: false, - errorMessage: 'notFound: Not found: Table project-1.analytics.missing_table', - }, - ]); - }); - - it('uses the window start when no cursor is available', async () => { - const client = queryClient([{ headers: ['job_id'], rows: [], totalRows: 0 }]); - const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'EU' }); - - for await (const _row of reader.fetch(client, { - start: new Date('2026-02-03T12:00:00.000Z'), - end: new Date('2026-05-04T12:00:00.000Z'), - })) { - throw new Error('empty result should not yield rows'); - } - - const sql = firstQuery(client); - expect(sql).toContain('FROM `project-1.region-eu.INFORMATION_SCHEMA.JOBS_BY_PROJECT`'); - expect(sql).toContain("creation_time >= TIMESTAMP('2026-02-03T12:00:00.000Z')"); - }); - it('fetches aggregated BigQuery query templates', async () => { const client = queryClient([ { @@ -245,7 +129,19 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => { const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'US' }); await expect(async () => { - for await (const _row of reader.fetch({}, { start: new Date(), end: new Date() })) { + for await (const _row of reader.fetchAggregated( + {}, + { start: new Date(), end: new Date() }, + { + dialect: 'bigquery', + minExecutions: 5, + windowDays: 90, + concurrency: 12, + filters: { dropTrivialProbes: true }, + redactionPatterns: [], + staleArchiveAfterDays: 90, + }, + )) { throw new Error('unreachable'); } }).rejects.toThrow('Historic SQL BigQuery reader requires a query client with executeQuery(query)'); diff --git a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts index e5d6a82f..1d3265be 100644 --- a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts +++ b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts @@ -2,8 +2,6 @@ import { HistoricSqlGrantsMissingError } from './errors.js'; import { aggregatedTemplateSchema, type AggregatedTemplate, - type HistoricSqlQueryHistoryReader, - type HistoricSqlRawQueryRow, type HistoricSqlTimeWindow, type HistoricSqlUnifiedPullConfig, } from './types.js'; @@ -146,42 +144,6 @@ function isoTimestamp(raw: unknown, field: string): string { return date.toISOString(); } -function nullableIsoTimestamp(raw: unknown): string | null { - if (raw === null || raw === undefined || raw === '') { - return null; - } - return isoTimestamp(raw, 'end_time'); -} - -function executionSucceeded(state: string | null, errorReason: string | null, errorMessage: string | null): boolean { - if (errorReason || errorMessage) { - return false; - } - return state === null || state.toUpperCase() === 'DONE'; -} - -function combinedErrorMessage(errorReason: string | null, errorMessage: string | null): string | null { - if (errorReason && errorMessage) { - return `${errorReason}: ${errorMessage}`; - } - return errorMessage ?? errorReason; -} - -function mapRow(row: unknown[], indexes: Map): HistoricSqlRawQueryRow { - const errorReason = nullableString(value(row, indexes, 'error_reason')); - const errorMessage = nullableString(value(row, indexes, 'error_message')); - return { - id: requiredString(value(row, indexes, 'job_id'), 'job_id'), - sql: requiredString(value(row, indexes, 'query'), 'query'), - user: nullableString(value(row, indexes, 'user_email')), - startedAt: isoTimestamp(value(row, indexes, 'creation_time'), 'creation_time'), - endedAt: nullableIsoTimestamp(value(row, indexes, 'end_time')), - runtimeMs: nullableNumber(value(row, indexes, 'runtime_ms')), - success: executionSucceeded(nullableString(value(row, indexes, 'state')), errorReason, errorMessage), - errorMessage: combinedErrorMessage(errorReason, errorMessage), - }; -} - function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> { const text = nullableString(raw); if (!text) { @@ -224,7 +186,7 @@ function mapAggregatedRow(row: unknown[], indexes: Map): Aggrega }); } -export class BigQueryHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader { +export class BigQueryHistoricSqlQueryHistoryReader { private readonly viewPath: string; constructor(options: BigQueryHistoricSqlQueryHistoryReaderOptions) { @@ -245,44 +207,6 @@ export class BigQueryHistoricSqlQueryHistoryReader implements HistoricSqlQueryHi } } - async *fetch( - client: unknown, - window: HistoricSqlTimeWindow, - cursor?: string | null, - ): AsyncIterable { - const start = timestampExpression(cursor ?? window.start); - const end = timestampExpression(window.end); - const sql = ` -SELECT - job_id, - query, - user_email, - creation_time, - end_time, - TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND) AS runtime_ms, - total_slot_ms, - total_bytes_processed, - state, - error_result.reason AS error_reason, - error_result.message AS error_message, - statement_type -FROM ${this.viewPath} -WHERE creation_time >= ${start} - AND creation_time < ${end} - AND job_type = 'QUERY' - AND query IS NOT NULL - AND (statement_type IS NULL OR statement_type != 'SCRIPT') -ORDER BY creation_time ASC, job_id ASC`.trim(); - const result = await queryClient(client).executeQuery(sql); - if (result.error) { - throw grantsError(result.error); - } - const indexes = indexByHeader(result.headers); - for (const row of result.rows) { - yield mapRow(row, indexes); - } - } - async *fetchAggregated( client: unknown, window: HistoricSqlTimeWindow, diff --git a/packages/context/src/ingest/adapters/historic-sql/evidence.test.ts b/packages/context/src/ingest/adapters/historic-sql/evidence.test.ts index 2d6b7556..8858ed37 100644 --- a/packages/context/src/ingest/adapters/historic-sql/evidence.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/evidence.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from 'vitest'; import { historicSqlEvidenceEnvelopeSchema, historicSqlEvidencePath, + historicSqlPatternEvidenceSchema, historicSqlTableUsageEvidenceSchema, } from './evidence.js'; @@ -27,20 +28,22 @@ describe('historic-sql evidence contracts', () => { }); it('validates pattern evidence emitted by the patterns WorkUnit', () => { - const parsed = historicSqlEvidenceEnvelopeSchema.parse({ - kind: 'pattern', - connectionId: 'warehouse', - rawPath: 'patterns-input.json', - pattern: { - slug: 'order-lifecycle-analysis', - title: 'Order Lifecycle Analysis', - narrative: 'Analysts compare order status changes by customer segment.', - definitionSql: 'select status, count(*) from public.orders group by status', - tablesInvolved: ['public.orders', 'public.customers'], - slRefs: ['orders', 'customers'], - constituentTemplateIds: ['pg:1', 'pg:2'], - }, - }); + const parsed = historicSqlPatternEvidenceSchema.parse( + historicSqlEvidenceEnvelopeSchema.parse({ + kind: 'pattern', + connectionId: 'warehouse', + rawPath: 'patterns-input.json', + pattern: { + slug: 'order-lifecycle-analysis', + title: 'Order Lifecycle Analysis', + narrative: 'Analysts compare order status changes by customer segment.', + definitionSql: 'select status, count(*) from public.orders group by status', + tablesInvolved: ['public.orders', 'public.customers'], + slRefs: ['orders', 'customers'], + constituentTemplateIds: ['pg:1', 'pg:2'], + }, + }), + ); expect(parsed.kind).toBe('pattern'); expect(parsed.pattern.slug).toBe('order-lifecycle-analysis'); diff --git a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts index a8015179..b388f71e 100644 --- a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts @@ -3,6 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; +import type { SourceAdapter } from '../../types.js'; import { HistoricSqlSourceAdapter } from './historic-sql.adapter.js'; import type { HistoricSqlReader } from './types.js'; @@ -33,7 +34,7 @@ describe('HistoricSqlSourceAdapter', () => { expect(adapter.source).toBe('historic-sql'); expect(adapter.skillNames).toEqual(['historic_sql_table_digest', 'historic_sql_patterns']); expect(adapter.reconcileSkillNames).toEqual([]); - expect(adapter.evidenceIndexing).toBeUndefined(); + expect((adapter as SourceAdapter).evidenceIndexing).toBeUndefined(); expect(adapter.triageSupported).toBe(false); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts index e68e4b49..8b9c5fee 100644 --- a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts @@ -62,125 +62,6 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => { await expect(reader.probe(client)).rejects.toBeInstanceOf(HistoricSqlGrantsMissingError); }); - it('fetches query-history rows with cursor and maps them into RawQueryRow shape', async () => { - const client = queryClient([ - { - headers: [ - 'QUERY_ID', - 'QUERY_TEXT', - 'USER_NAME', - 'ROLE_NAME', - 'WAREHOUSE_NAME', - 'DATABASE_NAME', - 'SCHEMA_NAME', - 'START_TIME', - 'END_TIME', - 'TOTAL_ELAPSED_TIME', - 'ROWS_PRODUCED', - 'EXECUTION_STATUS', - 'ERROR_CODE', - 'ERROR_MESSAGE', - ], - rows: [ - [ - '01a', - "SELECT count(*) FROM ANALYTICS.ORDERS WHERE STATUS = 'paid'", - 'ANALYST_A', - 'ANALYST_ROLE', - 'WH_XS', - 'ANALYTICS', - 'PUBLIC', - '2026-05-04T10:00:00.000Z', - '2026-05-04T10:00:01.250Z', - 1250, - 12, - 'SUCCESS', - null, - null, - ], - [ - '01b', - 'SELECT * FROM MISSING_TABLE', - 'ANALYST_B', - 'ANALYST_ROLE', - 'WH_XS', - 'ANALYTICS', - 'PUBLIC', - new Date('2026-05-04T10:05:00.000Z'), - null, - null, - null, - 'FAILED_WITH_ERROR', - '002003', - 'SQL compilation error', - ], - ], - totalRows: 2, - }, - ]); - const reader = new SnowflakeHistoricSqlQueryHistoryReader(); - - const rows = []; - for await (const row of reader.fetch( - client, - { - start: new Date('2026-05-01T00:00:00.000Z'), - end: new Date('2026-05-04T12:00:00.000Z'), - }, - '2026-05-03T00:00:00.000Z', - )) { - rows.push(row); - } - - expect(client.executeQuery).toHaveBeenCalledTimes(1); - const sql = firstQuery(client); - expect(sql).toContain('FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'); - expect(sql).toContain("START_TIME >= '2026-05-03T00:00:00.000Z'::TIMESTAMP_TZ"); - expect(sql).toContain("START_TIME < '2026-05-04T12:00:00.000Z'::TIMESTAMP_TZ"); - expect(sql).toContain('ORDER BY START_TIME ASC, QUERY_ID ASC'); - expect(sql).toContain('ROWS_PRODUCED'); - - expect(rows).toEqual([ - { - id: '01a', - sql: "SELECT count(*) FROM ANALYTICS.ORDERS WHERE STATUS = 'paid'", - user: 'ANALYST_A', - startedAt: '2026-05-04T10:00:00.000Z', - endedAt: '2026-05-04T10:00:01.250Z', - runtimeMs: 1250, - rowsProduced: 12, - success: true, - errorMessage: null, - }, - { - id: '01b', - sql: 'SELECT * FROM MISSING_TABLE', - user: 'ANALYST_B', - startedAt: '2026-05-04T10:05:00.000Z', - endedAt: null, - runtimeMs: null, - rowsProduced: null, - success: false, - errorMessage: '002003: SQL compilation error', - }, - ]); - }); - - it('uses the window start when no cursor is available', async () => { - const client = queryClient([{ headers: ['QUERY_ID'], rows: [], totalRows: 0 }]); - const reader = new SnowflakeHistoricSqlQueryHistoryReader(); - - for await (const _row of reader.fetch(client, { - start: new Date('2026-02-03T12:00:00.000Z'), - end: new Date('2026-05-04T12:00:00.000Z'), - })) { - throw new Error('empty result should not yield rows'); - } - - const sql = firstQuery(client); - expect(sql).toContain("START_TIME >= '2026-02-03T12:00:00.000Z'::TIMESTAMP_TZ"); - }); - it('fetches aggregated Snowflake query templates', async () => { const client = queryClient([ { @@ -247,7 +128,19 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => { const reader = new SnowflakeHistoricSqlQueryHistoryReader(); await expect(async () => { - for await (const _row of reader.fetch({}, { start: new Date(), end: new Date() })) { + for await (const _row of reader.fetchAggregated( + {}, + { start: new Date(), end: new Date() }, + { + dialect: 'snowflake', + minExecutions: 5, + windowDays: 90, + concurrency: 12, + filters: { dropTrivialProbes: true }, + redactionPatterns: [], + staleArchiveAfterDays: 90, + }, + )) { throw new Error('unreachable'); } }).rejects.toThrow('Historic SQL Snowflake reader requires a query client with executeQuery(query)'); diff --git a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts index 6f3d02d2..4d8417b8 100644 --- a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts +++ b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts @@ -2,8 +2,6 @@ import { HistoricSqlGrantsMissingError } from './errors.js'; import { aggregatedTemplateSchema, type AggregatedTemplate, - type HistoricSqlQueryHistoryReader, - type HistoricSqlRawQueryRow, type HistoricSqlTimeWindow, type HistoricSqlUnifiedPullConfig, } from './types.js'; @@ -59,32 +57,6 @@ function timestampLiteral(value: Date | string): string { return `'${date.toISOString().replace(/'/g, "''")}'::TIMESTAMP_TZ`; } -function queryHistorySql(window: HistoricSqlTimeWindow, cursor?: string | null): string { - const start = timestampLiteral(cursor ?? window.start); - const end = timestampLiteral(window.end); - return ` -SELECT - QUERY_ID, - QUERY_TEXT, - USER_NAME, - ROLE_NAME, - WAREHOUSE_NAME, - DATABASE_NAME, - SCHEMA_NAME, - START_TIME, - END_TIME, - TOTAL_ELAPSED_TIME, - ROWS_PRODUCED, - EXECUTION_STATUS, - ERROR_CODE, - ERROR_MESSAGE -FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY -WHERE START_TIME >= ${start} - AND START_TIME < ${end} - AND QUERY_TEXT IS NOT NULL -ORDER BY START_TIME ASC, QUERY_ID ASC`.trim(); -} - function indexByHeader(headers: string[]): Map { const out = new Map(); headers.forEach((header, index) => { @@ -154,44 +126,6 @@ function isoTimestamp(raw: unknown, field: string): string { return date.toISOString(); } -function nullableIsoTimestamp(raw: unknown): string | null { - if (raw === null || raw === undefined || raw === '') { - return null; - } - return isoTimestamp(raw, 'END_TIME'); -} - -function executionSucceeded(status: string | null, errorCode: string | null, errorMessage: string | null): boolean { - if (errorCode || errorMessage) { - return false; - } - return status === null || status.toUpperCase().startsWith('SUCCESS'); -} - -function combinedErrorMessage(errorCode: string | null, errorMessage: string | null): string | null { - if (errorCode && errorMessage) { - return `${errorCode}: ${errorMessage}`; - } - return errorMessage ?? errorCode; -} - -function mapRow(row: unknown[], indexes: Map): HistoricSqlRawQueryRow { - const errorCode = nullableString(value(row, indexes, 'ERROR_CODE')); - const errorMessage = nullableString(value(row, indexes, 'ERROR_MESSAGE')); - const rowsProduced = nullableInteger(value(row, indexes, 'ROWS_PRODUCED')); - return { - id: requiredString(value(row, indexes, 'QUERY_ID'), 'QUERY_ID'), - sql: requiredString(value(row, indexes, 'QUERY_TEXT'), 'QUERY_TEXT'), - user: nullableString(value(row, indexes, 'USER_NAME')), - startedAt: isoTimestamp(value(row, indexes, 'START_TIME'), 'START_TIME'), - endedAt: nullableIsoTimestamp(value(row, indexes, 'END_TIME')), - runtimeMs: nullableNumber(value(row, indexes, 'TOTAL_ELAPSED_TIME')), - rowsProduced, - success: executionSucceeded(nullableString(value(row, indexes, 'EXECUTION_STATUS')), errorCode, errorMessage), - errorMessage: combinedErrorMessage(errorCode, errorMessage), - }; -} - function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> { const text = nullableString(raw); if (!text) { @@ -234,7 +168,7 @@ function mapAggregatedRow(row: unknown[], indexes: Map): Aggrega }); } -export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader { +export class SnowflakeHistoricSqlQueryHistoryReader { async probe(client: unknown): Promise { let result: QueryResultLike; try { @@ -247,21 +181,6 @@ export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryH } } - async *fetch( - client: unknown, - window: HistoricSqlTimeWindow, - cursor?: string | null, - ): AsyncIterable { - const result = await queryClient(client).executeQuery(queryHistorySql(window, cursor)); - if (result.error) { - throw grantsError(result.error); - } - const indexes = indexByHeader(result.headers); - for (const row of result.rows) { - yield mapRow(row, indexes); - } - } - async *fetchAggregated( client: unknown, window: HistoricSqlTimeWindow, diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index f8c7099e..43d0247b 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -508,7 +508,7 @@ class LocalIngestToolsetFactory implements IngestToolsetFactoryPort { } createIngestWuToolset(session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike { - const sourceTools = + const sourceTools: Record = session.ingest?.sourceKey === 'historic-sql' ? { emit_historic_sql_evidence: createEmitHistoricSqlEvidenceTool({