From 25952e44bc0ee41266938a5d28069b67510a77c4 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 18:07:13 +0200 Subject: [PATCH] feat: add historic sql aggregate readers --- .../bigquery-query-history-reader.test.ts | 62 ++++++ .../bigquery-query-history-reader.ts | 105 +++++++++- .../historic-sql/postgres-pgss-reader.test.ts | 58 ++++++ .../historic-sql/postgres-pgss-reader.ts | 191 ++++++++++++++++++ .../snowflake-query-history-reader.test.ts | 62 ++++++ .../snowflake-query-history-reader.ts | 101 ++++++++- packages/context/src/ingest/index.ts | 1 + 7 files changed, 577 insertions(+), 3 deletions(-) create mode 100644 packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts create mode 100644 packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts diff --git a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts index e0a5e07d..cac20cd5 100644 --- a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts @@ -179,6 +179,68 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => { expect(sql).toContain("creation_time >= TIMESTAMP('2026-02-03T12:00:00.000Z')"); }); + it('fetches aggregated BigQuery query templates', async () => { + const client = queryClient([ + { + headers: [ + 'template_id', + 'canonical_sql', + 'executions', + 'distinct_users', + 'first_seen', + 'last_seen', + 'p50_ms', + 'p95_ms', + 'error_rate', + 'rows_produced', + 'top_users', + ], + rows: [ + [ + 'hash-1', + 'select status from orders', + 42, + 3, + '2026-05-01T00:00:00.000Z', + '2026-05-11T00:00:00.000Z', + 12, + 40, + 0.05, + null, + JSON.stringify([{ user: 'analyst@example.test', executions: 1 }]), + ], + ], + totalRows: 1, + }, + ]); + const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'demo', region: 'us' }); + + const rows = []; + for await (const row of reader.fetchAggregated( + client, + { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, + { dialect: 'bigquery', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 }, + )) { + rows.push(row); + } + + const sql = firstQuery(client); + expect(sql).toContain('COUNT(*) AS executions'); + expect(sql).toContain('COUNT(DISTINCT user_email) AS distinct_users'); + expect(sql).toContain('GROUP BY query_hash'); + expect(sql).toContain('HAVING COUNT(*) >= 5'); + expect(rows).toMatchObject([ + { + templateId: 'hash-1', + stats: { + executions: 42, + errorRate: 0.05, + }, + topUsers: [{ user: 'analyst@example.test', executions: 1 }], + }, + ]); + }); + it('throws a clear error when the query client cannot execute SQL', async () => { const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'US' }); diff --git a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts index ea8fb00e..e5d6a82f 100644 --- a/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts +++ b/packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts @@ -1,5 +1,12 @@ import { HistoricSqlGrantsMissingError } from './errors.js'; -import type { HistoricSqlQueryHistoryReader, HistoricSqlRawQueryRow, HistoricSqlTimeWindow } from './types.js'; +import { + aggregatedTemplateSchema, + type AggregatedTemplate, + type HistoricSqlQueryHistoryReader, + type HistoricSqlRawQueryRow, + type HistoricSqlTimeWindow, + type HistoricSqlUnifiedPullConfig, +} from './types.js'; interface QueryResultLike { headers: string[]; @@ -110,6 +117,23 @@ function nullableNumber(raw: unknown): number | null { return Math.max(0, number); } +function requiredNumber(raw: unknown, field: string): number { + const number = nullableNumber(raw); + if (number === null) { + throw new Error(`BigQuery JOBS_BY_PROJECT row has invalid ${field}: ${String(raw)}`); + } + return number; +} + +function requiredInteger(raw: unknown, field: string): number { + return Math.trunc(requiredNumber(raw, field)); +} + +function nullableInteger(raw: unknown): number | null { + const number = nullableNumber(raw); + return number === null ? null : Math.trunc(number); +} + function isoTimestamp(raw: unknown, field: string): string { if (raw instanceof Date) { return raw.toISOString(); @@ -158,6 +182,48 @@ function mapRow(row: unknown[], indexes: Map): HistoricSqlRawQue }; } +function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> { + const text = nullableString(raw); + if (!text) { + return []; + } + try { + const parsed = JSON.parse(text) as unknown; + if (!Array.isArray(parsed)) { + return []; + } + return parsed.flatMap((entry) => { + if (!entry || typeof entry !== 'object') { + return []; + } + const user = nullableString((entry as { user?: unknown }).user); + const executions = nullableInteger((entry as { executions?: unknown }).executions); + return executions === null ? [] : [{ user, executions }]; + }); + } catch { + return []; + } +} + +function mapAggregatedRow(row: unknown[], indexes: Map): AggregatedTemplate { + return aggregatedTemplateSchema.parse({ + templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'), + canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'), + dialect: 'bigquery', + stats: { + executions: requiredInteger(value(row, indexes, 'executions'), 'executions'), + distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'), + firstSeen: isoTimestamp(value(row, indexes, 'first_seen'), 'first_seen'), + lastSeen: isoTimestamp(value(row, indexes, 'last_seen'), 'last_seen'), + p50RuntimeMs: nullableNumber(value(row, indexes, 'p50_ms')), + p95RuntimeMs: nullableNumber(value(row, indexes, 'p95_ms')), + errorRate: requiredNumber(value(row, indexes, 'error_rate'), 'error_rate'), + rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')), + }, + topUsers: parseTopUsers(value(row, indexes, 'top_users')), + }); +} + export class BigQueryHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader { private readonly viewPath: string; @@ -216,4 +282,41 @@ ORDER BY creation_time ASC, job_id ASC`.trim(); yield mapRow(row, indexes); } } + + async *fetchAggregated( + client: unknown, + window: HistoricSqlTimeWindow, + config: HistoricSqlUnifiedPullConfig, + ): AsyncIterable { + const sql = ` +SELECT + query_hash AS template_id, + MIN(query) AS canonical_sql, + COUNT(*) AS executions, + COUNT(DISTINCT user_email) AS distinct_users, + MIN(creation_time) AS first_seen, + MAX(creation_time) AS last_seen, + APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(50)] AS p50_ms, + APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(95)] AS p95_ms, + SAFE_DIVIDE(COUNTIF(error_result IS NOT NULL), COUNT(*)) AS error_rate, + CAST(NULL AS INT64) AS rows_produced, + TO_JSON_STRING(ARRAY_AGG(STRUCT(user_email AS user, 1 AS executions) ORDER BY creation_time DESC LIMIT 5)) AS top_users +FROM ${this.viewPath} +WHERE job_type = 'QUERY' + AND statement_type IN ('SELECT', 'MERGE') + AND creation_time >= ${timestampExpression(window.start)} + AND creation_time < ${timestampExpression(window.end)} + AND query IS NOT NULL +GROUP BY query_hash +HAVING COUNT(*) >= ${config.minExecutions} +ORDER BY executions DESC`.trim(); + const result = await queryClient(client).executeQuery(sql); + if (result.error) { + throw grantsError(result.error); + } + const indexes = indexByHeader(result.headers); + for (const row of result.rows) { + yield mapAggregatedRow(row, indexes); + } + } } diff --git a/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts new file mode 100644 index 00000000..a1b710b6 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it, vi } from 'vitest'; +import { PostgresPgssReader } from './postgres-pgss-reader.js'; + +describe('PostgresPgssReader aggregate path', () => { + it('aggregates pg_stat_statements rows by queryid and query', async () => { + const executeQuery = vi.fn(async (sql: string, params?: unknown[]) => { + if (sql.includes('pg_stat_statements_info')) { + return { headers: ['stats_reset', 'dealloc'], rows: [['2026-05-01T00:00:00.000Z', 1]] }; + } + expect(sql).toContain('GROUP BY queryid, query'); + expect(sql).toContain('HAVING SUM(calls) >= $1'); + expect(params).toEqual([5]); + return { + headers: ['template_id', 'canonical_sql', 'executions', 'distinct_users', 'mean_ms', 'rows_produced', 'top_users'], + rows: [ + [ + '123', + 'select status from public.orders', + '42', + '3', + '11.5', + '100', + JSON.stringify([{ user: 'analyst', executions: 40 }]), + ], + ], + }; + }); + + const reader = new PostgresPgssReader(); + const rows = []; + for await (const row of reader.fetchAggregated( + { executeQuery }, + { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, + { dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 }, + )) { + rows.push(row); + } + + expect(rows).toEqual([ + { + templateId: '123', + canonicalSql: 'select status from public.orders', + dialect: 'postgres', + stats: { + executions: 42, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 11.5, + p95RuntimeMs: 11.5, + errorRate: 0, + rowsProduced: 100, + }, + topUsers: [{ user: 'analyst', executions: 40 }], + }, + ]); + }); +}); diff --git a/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts b/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts new file mode 100644 index 00000000..02e0522c --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts @@ -0,0 +1,191 @@ +import { PostgresPgssQueryHistoryReader } from './postgres-pgss-query-history-reader.js'; +import { + aggregatedTemplateSchema, + type AggregatedTemplate, + type HistoricSqlTimeWindow, + type HistoricSqlUnifiedPullConfig, + type KtxPostgresQueryClient, + type PostgresPgssProbeResult, +} from './types.js'; + +interface QueryResultLike { + headers: string[]; + rows: unknown[][]; + totalRows?: number; + error?: string; +} + +const STATS_INFO_SQL = 'SELECT stats_reset, dealloc FROM pg_stat_statements_info'; + +const AGGREGATE_SQL = ` +SELECT queryid::text AS template_id, + query AS canonical_sql, + SUM(calls)::bigint AS executions, + COUNT(DISTINCT userid) AS distinct_users, + SUM(total_exec_time) / NULLIF(SUM(calls), 0) AS mean_ms, + SUM(rows)::bigint AS rows_produced, + COALESCE( + json_agg(json_build_object('user', rolname, 'executions', calls) ORDER BY calls DESC) + FILTER (WHERE userid IS NOT NULL), + '[]'::json + )::text AS top_users +FROM pg_stat_statements +LEFT JOIN pg_roles ON pg_roles.oid = pg_stat_statements.userid +WHERE toplevel = true +GROUP BY queryid, query +HAVING SUM(calls) >= $1 +ORDER BY SUM(total_exec_time) DESC +`.trim(); + +function queryClient(client: unknown): KtxPostgresQueryClient { + if ( + client && + typeof client === 'object' && + 'executeQuery' in client && + typeof (client as { executeQuery?: unknown }).executeQuery === 'function' + ) { + return client as KtxPostgresQueryClient; + } + throw new Error('Historic SQL Postgres PGSS reader requires a query client with executeQuery(sql, params?)'); +} + +async function execute(client: KtxPostgresQueryClient, sql: string, params?: unknown[]): Promise { + const result = await client.executeQuery(sql, params); + if ('error' in result && typeof result.error === 'string' && result.error.length > 0) { + throw new Error(result.error); + } + return result; +} + +function indexByHeader(headers: string[]): Map { + const out = new Map(); + headers.forEach((header, index) => out.set(header.toLowerCase(), index)); + return out; +} + +function value(row: unknown[], headerIndexes: Map, header: string): unknown { + const index = headerIndexes.get(header.toLowerCase()); + return index === undefined ? null : row[index]; +} + +function nullableString(raw: unknown): string | null { + if (raw === null || raw === undefined) { + return null; + } + const text = String(raw); + return text.length > 0 ? text : null; +} + +function requiredString(raw: unknown, field: string): string { + const text = nullableString(raw); + if (!text) { + throw new Error(`Postgres pg_stat_statements row is missing ${field}`); + } + return text; +} + +function requiredFiniteNumber(raw: unknown, field: string): number { + const number = typeof raw === 'number' ? raw : Number(raw); + if (!Number.isFinite(number)) { + throw new Error(`Postgres pg_stat_statements row has invalid ${field}: ${String(raw)}`); + } + return number; +} + +function requiredInteger(raw: unknown, field: string): number { + return Math.trunc(requiredFiniteNumber(raw, field)); +} + +function nullableNumber(raw: unknown): number | null { + if (raw === null || raw === undefined || raw === '') { + return null; + } + const number = typeof raw === 'number' ? raw : Number(raw); + return Number.isFinite(number) ? number : null; +} + +function nullableInteger(raw: unknown): number | null { + const number = nullableNumber(raw); + return number === null ? null : Math.trunc(number); +} + +function nullableIsoTimestamp(raw: unknown): string | null { + if (raw === null || raw === undefined || raw === '') { + return null; + } + if (raw instanceof Date) { + return raw.toISOString(); + } + const date = new Date(String(raw)); + return Number.isNaN(date.getTime()) ? null : date.toISOString(); +} + +function firstRow(result: QueryResultLike, context: string): { row: unknown[]; headers: Map } { + const row = result.rows[0]; + if (!row) { + throw new Error(`Postgres historic-SQL ${context} query returned no rows`); + } + return { row, headers: indexByHeader(result.headers) }; +} + +function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> { + const text = nullableString(raw); + if (!text) { + return []; + } + try { + const parsed = JSON.parse(text) as unknown; + if (!Array.isArray(parsed)) { + return []; + } + return parsed.flatMap((entry) => { + if (!entry || typeof entry !== 'object') { + return []; + } + const user = nullableString((entry as { user?: unknown }).user); + const executions = nullableInteger((entry as { executions?: unknown }).executions); + return executions === null ? [] : [{ user, executions }]; + }); + } catch { + return []; + } +} + +export class PostgresPgssReader { + private readonly legacyReader = new PostgresPgssQueryHistoryReader(); + + probe(client: unknown): Promise { + return this.legacyReader.probe(client); + } + + async *fetchAggregated( + client: unknown, + window: HistoricSqlTimeWindow, + config: HistoricSqlUnifiedPullConfig, + ): AsyncIterable { + const pgClient = queryClient(client); + const statsResult = await execute(pgClient, STATS_INFO_SQL); + const { row: statsRow, headers: statsHeaders } = firstRow(statsResult, 'stats-info'); + const firstSeen = nullableIsoTimestamp(value(statsRow, statsHeaders, 'stats_reset')) ?? window.start.toISOString(); + const result = await execute(pgClient, AGGREGATE_SQL, [config.minExecutions]); + const indexes = indexByHeader(result.headers); + for (const row of result.rows) { + yield aggregatedTemplateSchema.parse({ + templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'), + canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'), + dialect: 'postgres', + stats: { + executions: requiredInteger(value(row, indexes, 'executions'), 'executions'), + distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'), + firstSeen, + lastSeen: window.end.toISOString(), + p50RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')), + p95RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')), + errorRate: 0, + rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')), + }, + topUsers: parseTopUsers(value(row, indexes, 'top_users')), + }); + } + } +} diff --git a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts index d8253df9..416a6ae4 100644 --- a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts @@ -181,6 +181,68 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => { expect(sql).toContain("START_TIME >= '2026-02-03T12:00:00.000Z'::TIMESTAMP_TZ"); }); + it('fetches aggregated Snowflake query templates', async () => { + const client = queryClient([ + { + headers: [ + 'template_id', + 'canonical_sql', + 'executions', + 'distinct_users', + 'first_seen', + 'last_seen', + 'p50_ms', + 'p95_ms', + 'error_rate', + 'rows_produced', + 'top_users', + ], + rows: [ + [ + 'hash-1', + 'select status from orders', + 42, + 3, + '2026-05-01T00:00:00.000Z', + '2026-05-11T00:00:00.000Z', + 12, + 40, + 0.05, + 100, + JSON.stringify([{ user: 'ANALYST', executions: 1 }]), + ], + ], + totalRows: 1, + }, + ]); + const reader = new SnowflakeHistoricSqlQueryHistoryReader(); + + const rows = []; + for await (const row of reader.fetchAggregated( + client, + { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, + { dialect: 'snowflake', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 }, + )) { + rows.push(row); + } + + const sql = firstQuery(client); + expect(sql).toContain('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'); + expect(sql).toContain('COUNT(*) AS executions'); + expect(sql).toContain('GROUP BY query_hash'); + expect(sql).toContain('HAVING COUNT(*) >= 5'); + expect(rows).toMatchObject([ + { + templateId: 'hash-1', + stats: { + executions: 42, + errorRate: 0.05, + }, + topUsers: [{ user: 'ANALYST', executions: 1 }], + }, + ]); + }); + it('throws a clear error when the query client cannot execute SQL', async () => { const reader = new SnowflakeHistoricSqlQueryHistoryReader(); diff --git a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts index b149a34b..6f3d02d2 100644 --- a/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts +++ b/packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts @@ -1,5 +1,12 @@ import { HistoricSqlGrantsMissingError } from './errors.js'; -import type { HistoricSqlQueryHistoryReader, HistoricSqlRawQueryRow, HistoricSqlTimeWindow } from './types.js'; +import { + aggregatedTemplateSchema, + type AggregatedTemplate, + type HistoricSqlQueryHistoryReader, + type HistoricSqlRawQueryRow, + type HistoricSqlTimeWindow, + type HistoricSqlUnifiedPullConfig, +} from './types.js'; interface QueryResultLike { headers: string[]; @@ -87,7 +94,7 @@ function indexByHeader(headers: string[]): Map { } function value(row: unknown[], indexes: Map, name: string): unknown { - const index = indexes.get(name); + const index = indexes.get(name.toUpperCase()); return index === undefined ? null : row[index]; } @@ -118,6 +125,18 @@ function nullableNumber(raw: unknown): number | null { return number; } +function requiredNumber(raw: unknown, field: string): number { + const number = nullableNumber(raw); + if (number === null) { + throw new Error(`Snowflake QUERY_HISTORY row has invalid ${field}: ${String(raw)}`); + } + return number; +} + +function requiredInteger(raw: unknown, field: string): number { + return Math.trunc(requiredNumber(raw, field)); +} + function nullableInteger(raw: unknown): number | null { const number = nullableNumber(raw); return number === null ? null : Math.trunc(number); @@ -173,6 +192,48 @@ function mapRow(row: unknown[], indexes: Map): HistoricSqlRawQue }; } +function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> { + const text = nullableString(raw); + if (!text) { + return []; + } + try { + const parsed = JSON.parse(text) as unknown; + if (!Array.isArray(parsed)) { + return []; + } + return parsed.flatMap((entry) => { + if (!entry || typeof entry !== 'object') { + return []; + } + const user = nullableString((entry as { user?: unknown }).user); + const executions = nullableInteger((entry as { executions?: unknown }).executions); + return executions === null ? [] : [{ user, executions }]; + }); + } catch { + return []; + } +} + +function mapAggregatedRow(row: unknown[], indexes: Map): AggregatedTemplate { + return aggregatedTemplateSchema.parse({ + templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'), + canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'), + dialect: 'snowflake', + stats: { + executions: requiredInteger(value(row, indexes, 'executions'), 'executions'), + distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'), + firstSeen: isoTimestamp(value(row, indexes, 'first_seen'), 'first_seen'), + lastSeen: isoTimestamp(value(row, indexes, 'last_seen'), 'last_seen'), + p50RuntimeMs: nullableNumber(value(row, indexes, 'p50_ms')), + p95RuntimeMs: nullableNumber(value(row, indexes, 'p95_ms')), + errorRate: requiredNumber(value(row, indexes, 'error_rate'), 'error_rate'), + rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')), + }, + topUsers: parseTopUsers(value(row, indexes, 'top_users')), + }); +} + export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader { async probe(client: unknown): Promise { let result: QueryResultLike; @@ -200,4 +261,40 @@ export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryH yield mapRow(row, indexes); } } + + async *fetchAggregated( + client: unknown, + window: HistoricSqlTimeWindow, + config: HistoricSqlUnifiedPullConfig, + ): AsyncIterable { + const sql = ` +SELECT + query_hash AS template_id, + MIN(query_text) AS canonical_sql, + COUNT(*) AS executions, + COUNT(DISTINCT user_name) AS distinct_users, + MIN(start_time) AS first_seen, + MAX(start_time) AS last_seen, + APPROX_PERCENTILE(total_elapsed_time, 0.50) AS p50_ms, + APPROX_PERCENTILE(total_elapsed_time, 0.95) AS p95_ms, + DIV0(COUNT_IF(execution_status != 'SUCCESS'), COUNT(*)) AS error_rate, + SUM(rows_produced) AS rows_produced, + ARRAY_AGG(OBJECT_CONSTRUCT('user', user_name, 'executions', 1)) WITHIN GROUP (ORDER BY start_time DESC)::string AS top_users +FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY +WHERE query_text IS NOT NULL + AND query_type IN ('SELECT', 'MERGE') + AND start_time >= ${timestampLiteral(window.start)} + AND start_time < ${timestampLiteral(window.end)} +GROUP BY query_hash +HAVING COUNT(*) >= ${config.minExecutions} +ORDER BY executions DESC`.trim(); + const result = await queryClient(client).executeQuery(sql); + if (result.error) { + throw grantsError(result.error); + } + const indexes = indexByHeader(result.headers); + for (const row of result.rows) { + yield mapAggregatedRow(row, indexes); + } + } } diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index 3362d88e..32beb6e8 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -328,6 +328,7 @@ export { export { HistoricSqlSourceAdapter } from './adapters/historic-sql/historic-sql.adapter.js'; export { BigQueryHistoricSqlQueryHistoryReader } from './adapters/historic-sql/bigquery-query-history-reader.js'; export type { BigQueryHistoricSqlQueryHistoryReaderOptions } from './adapters/historic-sql/bigquery-query-history-reader.js'; +export { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js'; export { PostgresPgssQueryHistoryReader } from './adapters/historic-sql/postgres-pgss-query-history-reader.js'; export { SnowflakeHistoricSqlQueryHistoryReader } from './adapters/historic-sql/snowflake-query-history-reader.js'; export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js';