feat: add historic sql aggregate readers

This commit is contained in:
Andrey Avtomonov 2026-05-11 18:07:13 +02:00
parent 93752d5719
commit 25952e44bc
7 changed files with 577 additions and 3 deletions

View file

@ -179,6 +179,68 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
expect(sql).toContain("creation_time >= TIMESTAMP('2026-02-03T12:00:00.000Z')");
});
it('fetches aggregated BigQuery query templates', async () => {
const client = queryClient([
{
headers: [
'template_id',
'canonical_sql',
'executions',
'distinct_users',
'first_seen',
'last_seen',
'p50_ms',
'p95_ms',
'error_rate',
'rows_produced',
'top_users',
],
rows: [
[
'hash-1',
'select status from orders',
42,
3,
'2026-05-01T00:00:00.000Z',
'2026-05-11T00:00:00.000Z',
12,
40,
0.05,
null,
JSON.stringify([{ user: 'analyst@example.test', executions: 1 }]),
],
],
totalRows: 1,
},
]);
const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'demo', region: 'us' });
const rows = [];
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'bigquery', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
const sql = firstQuery(client);
expect(sql).toContain('COUNT(*) AS executions');
expect(sql).toContain('COUNT(DISTINCT user_email) AS distinct_users');
expect(sql).toContain('GROUP BY query_hash');
expect(sql).toContain('HAVING COUNT(*) >= 5');
expect(rows).toMatchObject([
{
templateId: 'hash-1',
stats: {
executions: 42,
errorRate: 0.05,
},
topUsers: [{ user: 'analyst@example.test', executions: 1 }],
},
]);
});
it('throws a clear error when the query client cannot execute SQL', async () => {
const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'US' });

View file

@ -1,5 +1,12 @@
import { HistoricSqlGrantsMissingError } from './errors.js';
import type { HistoricSqlQueryHistoryReader, HistoricSqlRawQueryRow, HistoricSqlTimeWindow } from './types.js';
import {
aggregatedTemplateSchema,
type AggregatedTemplate,
type HistoricSqlQueryHistoryReader,
type HistoricSqlRawQueryRow,
type HistoricSqlTimeWindow,
type HistoricSqlUnifiedPullConfig,
} from './types.js';
interface QueryResultLike {
headers: string[];
@ -110,6 +117,23 @@ function nullableNumber(raw: unknown): number | null {
return Math.max(0, number);
}
function requiredNumber(raw: unknown, field: string): number {
const number = nullableNumber(raw);
if (number === null) {
throw new Error(`BigQuery JOBS_BY_PROJECT row has invalid ${field}: ${String(raw)}`);
}
return number;
}
function requiredInteger(raw: unknown, field: string): number {
return Math.trunc(requiredNumber(raw, field));
}
function nullableInteger(raw: unknown): number | null {
const number = nullableNumber(raw);
return number === null ? null : Math.trunc(number);
}
function isoTimestamp(raw: unknown, field: string): string {
if (raw instanceof Date) {
return raw.toISOString();
@ -158,6 +182,48 @@ function mapRow(row: unknown[], indexes: Map<string, number>): HistoricSqlRawQue
};
}
function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> {
const text = nullableString(raw);
if (!text) {
return [];
}
try {
const parsed = JSON.parse(text) as unknown;
if (!Array.isArray(parsed)) {
return [];
}
return parsed.flatMap((entry) => {
if (!entry || typeof entry !== 'object') {
return [];
}
const user = nullableString((entry as { user?: unknown }).user);
const executions = nullableInteger((entry as { executions?: unknown }).executions);
return executions === null ? [] : [{ user, executions }];
});
} catch {
return [];
}
}
function mapAggregatedRow(row: unknown[], indexes: Map<string, number>): AggregatedTemplate {
return aggregatedTemplateSchema.parse({
templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'),
canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'),
dialect: 'bigquery',
stats: {
executions: requiredInteger(value(row, indexes, 'executions'), 'executions'),
distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'),
firstSeen: isoTimestamp(value(row, indexes, 'first_seen'), 'first_seen'),
lastSeen: isoTimestamp(value(row, indexes, 'last_seen'), 'last_seen'),
p50RuntimeMs: nullableNumber(value(row, indexes, 'p50_ms')),
p95RuntimeMs: nullableNumber(value(row, indexes, 'p95_ms')),
errorRate: requiredNumber(value(row, indexes, 'error_rate'), 'error_rate'),
rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')),
},
topUsers: parseTopUsers(value(row, indexes, 'top_users')),
});
}
export class BigQueryHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader {
private readonly viewPath: string;
@ -216,4 +282,41 @@ ORDER BY creation_time ASC, job_id ASC`.trim();
yield mapRow(row, indexes);
}
}
async *fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,
config: HistoricSqlUnifiedPullConfig,
): AsyncIterable<AggregatedTemplate> {
const sql = `
SELECT
query_hash AS template_id,
MIN(query) AS canonical_sql,
COUNT(*) AS executions,
COUNT(DISTINCT user_email) AS distinct_users,
MIN(creation_time) AS first_seen,
MAX(creation_time) AS last_seen,
APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(50)] AS p50_ms,
APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(95)] AS p95_ms,
SAFE_DIVIDE(COUNTIF(error_result IS NOT NULL), COUNT(*)) AS error_rate,
CAST(NULL AS INT64) AS rows_produced,
TO_JSON_STRING(ARRAY_AGG(STRUCT(user_email AS user, 1 AS executions) ORDER BY creation_time DESC LIMIT 5)) AS top_users
FROM ${this.viewPath}
WHERE job_type = 'QUERY'
AND statement_type IN ('SELECT', 'MERGE')
AND creation_time >= ${timestampExpression(window.start)}
AND creation_time < ${timestampExpression(window.end)}
AND query IS NOT NULL
GROUP BY query_hash
HAVING COUNT(*) >= ${config.minExecutions}
ORDER BY executions DESC`.trim();
const result = await queryClient(client).executeQuery(sql);
if (result.error) {
throw grantsError(result.error);
}
const indexes = indexByHeader(result.headers);
for (const row of result.rows) {
yield mapAggregatedRow(row, indexes);
}
}
}

View file

@ -0,0 +1,58 @@
import { describe, expect, it, vi } from 'vitest';
import { PostgresPgssReader } from './postgres-pgss-reader.js';
describe('PostgresPgssReader aggregate path', () => {
it('aggregates pg_stat_statements rows by queryid and query', async () => {
const executeQuery = vi.fn(async (sql: string, params?: unknown[]) => {
if (sql.includes('pg_stat_statements_info')) {
return { headers: ['stats_reset', 'dealloc'], rows: [['2026-05-01T00:00:00.000Z', 1]] };
}
expect(sql).toContain('GROUP BY queryid, query');
expect(sql).toContain('HAVING SUM(calls) >= $1');
expect(params).toEqual([5]);
return {
headers: ['template_id', 'canonical_sql', 'executions', 'distinct_users', 'mean_ms', 'rows_produced', 'top_users'],
rows: [
[
'123',
'select status from public.orders',
'42',
'3',
'11.5',
'100',
JSON.stringify([{ user: 'analyst', executions: 40 }]),
],
],
};
});
const reader = new PostgresPgssReader();
const rows = [];
for await (const row of reader.fetchAggregated(
{ executeQuery },
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
expect(rows).toEqual([
{
templateId: '123',
canonicalSql: 'select status from public.orders',
dialect: 'postgres',
stats: {
executions: 42,
distinctUsers: 3,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 11.5,
p95RuntimeMs: 11.5,
errorRate: 0,
rowsProduced: 100,
},
topUsers: [{ user: 'analyst', executions: 40 }],
},
]);
});
});

View file

@ -0,0 +1,191 @@
import { PostgresPgssQueryHistoryReader } from './postgres-pgss-query-history-reader.js';
import {
aggregatedTemplateSchema,
type AggregatedTemplate,
type HistoricSqlTimeWindow,
type HistoricSqlUnifiedPullConfig,
type KtxPostgresQueryClient,
type PostgresPgssProbeResult,
} from './types.js';
interface QueryResultLike {
headers: string[];
rows: unknown[][];
totalRows?: number;
error?: string;
}
const STATS_INFO_SQL = 'SELECT stats_reset, dealloc FROM pg_stat_statements_info';
const AGGREGATE_SQL = `
SELECT queryid::text AS template_id,
query AS canonical_sql,
SUM(calls)::bigint AS executions,
COUNT(DISTINCT userid) AS distinct_users,
SUM(total_exec_time) / NULLIF(SUM(calls), 0) AS mean_ms,
SUM(rows)::bigint AS rows_produced,
COALESCE(
json_agg(json_build_object('user', rolname, 'executions', calls) ORDER BY calls DESC)
FILTER (WHERE userid IS NOT NULL),
'[]'::json
)::text AS top_users
FROM pg_stat_statements
LEFT JOIN pg_roles ON pg_roles.oid = pg_stat_statements.userid
WHERE toplevel = true
GROUP BY queryid, query
HAVING SUM(calls) >= $1
ORDER BY SUM(total_exec_time) DESC
`.trim();
function queryClient(client: unknown): KtxPostgresQueryClient {
if (
client &&
typeof client === 'object' &&
'executeQuery' in client &&
typeof (client as { executeQuery?: unknown }).executeQuery === 'function'
) {
return client as KtxPostgresQueryClient;
}
throw new Error('Historic SQL Postgres PGSS reader requires a query client with executeQuery(sql, params?)');
}
async function execute(client: KtxPostgresQueryClient, sql: string, params?: unknown[]): Promise<QueryResultLike> {
const result = await client.executeQuery(sql, params);
if ('error' in result && typeof result.error === 'string' && result.error.length > 0) {
throw new Error(result.error);
}
return result;
}
function indexByHeader(headers: string[]): Map<string, number> {
const out = new Map<string, number>();
headers.forEach((header, index) => out.set(header.toLowerCase(), index));
return out;
}
function value(row: unknown[], headerIndexes: Map<string, number>, header: string): unknown {
const index = headerIndexes.get(header.toLowerCase());
return index === undefined ? null : row[index];
}
function nullableString(raw: unknown): string | null {
if (raw === null || raw === undefined) {
return null;
}
const text = String(raw);
return text.length > 0 ? text : null;
}
function requiredString(raw: unknown, field: string): string {
const text = nullableString(raw);
if (!text) {
throw new Error(`Postgres pg_stat_statements row is missing ${field}`);
}
return text;
}
function requiredFiniteNumber(raw: unknown, field: string): number {
const number = typeof raw === 'number' ? raw : Number(raw);
if (!Number.isFinite(number)) {
throw new Error(`Postgres pg_stat_statements row has invalid ${field}: ${String(raw)}`);
}
return number;
}
function requiredInteger(raw: unknown, field: string): number {
return Math.trunc(requiredFiniteNumber(raw, field));
}
function nullableNumber(raw: unknown): number | null {
if (raw === null || raw === undefined || raw === '') {
return null;
}
const number = typeof raw === 'number' ? raw : Number(raw);
return Number.isFinite(number) ? number : null;
}
function nullableInteger(raw: unknown): number | null {
const number = nullableNumber(raw);
return number === null ? null : Math.trunc(number);
}
function nullableIsoTimestamp(raw: unknown): string | null {
if (raw === null || raw === undefined || raw === '') {
return null;
}
if (raw instanceof Date) {
return raw.toISOString();
}
const date = new Date(String(raw));
return Number.isNaN(date.getTime()) ? null : date.toISOString();
}
function firstRow(result: QueryResultLike, context: string): { row: unknown[]; headers: Map<string, number> } {
const row = result.rows[0];
if (!row) {
throw new Error(`Postgres historic-SQL ${context} query returned no rows`);
}
return { row, headers: indexByHeader(result.headers) };
}
function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> {
const text = nullableString(raw);
if (!text) {
return [];
}
try {
const parsed = JSON.parse(text) as unknown;
if (!Array.isArray(parsed)) {
return [];
}
return parsed.flatMap((entry) => {
if (!entry || typeof entry !== 'object') {
return [];
}
const user = nullableString((entry as { user?: unknown }).user);
const executions = nullableInteger((entry as { executions?: unknown }).executions);
return executions === null ? [] : [{ user, executions }];
});
} catch {
return [];
}
}
export class PostgresPgssReader {
private readonly legacyReader = new PostgresPgssQueryHistoryReader();
probe(client: unknown): Promise<PostgresPgssProbeResult> {
return this.legacyReader.probe(client);
}
async *fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,
config: HistoricSqlUnifiedPullConfig,
): AsyncIterable<AggregatedTemplate> {
const pgClient = queryClient(client);
const statsResult = await execute(pgClient, STATS_INFO_SQL);
const { row: statsRow, headers: statsHeaders } = firstRow(statsResult, 'stats-info');
const firstSeen = nullableIsoTimestamp(value(statsRow, statsHeaders, 'stats_reset')) ?? window.start.toISOString();
const result = await execute(pgClient, AGGREGATE_SQL, [config.minExecutions]);
const indexes = indexByHeader(result.headers);
for (const row of result.rows) {
yield aggregatedTemplateSchema.parse({
templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'),
canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'),
dialect: 'postgres',
stats: {
executions: requiredInteger(value(row, indexes, 'executions'), 'executions'),
distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'),
firstSeen,
lastSeen: window.end.toISOString(),
p50RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')),
p95RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')),
errorRate: 0,
rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')),
},
topUsers: parseTopUsers(value(row, indexes, 'top_users')),
});
}
}
}

View file

@ -181,6 +181,68 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
expect(sql).toContain("START_TIME >= '2026-02-03T12:00:00.000Z'::TIMESTAMP_TZ");
});
it('fetches aggregated Snowflake query templates', async () => {
const client = queryClient([
{
headers: [
'template_id',
'canonical_sql',
'executions',
'distinct_users',
'first_seen',
'last_seen',
'p50_ms',
'p95_ms',
'error_rate',
'rows_produced',
'top_users',
],
rows: [
[
'hash-1',
'select status from orders',
42,
3,
'2026-05-01T00:00:00.000Z',
'2026-05-11T00:00:00.000Z',
12,
40,
0.05,
100,
JSON.stringify([{ user: 'ANALYST', executions: 1 }]),
],
],
totalRows: 1,
},
]);
const reader = new SnowflakeHistoricSqlQueryHistoryReader();
const rows = [];
for await (const row of reader.fetchAggregated(
client,
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'snowflake', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
const sql = firstQuery(client);
expect(sql).toContain('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY');
expect(sql).toContain('COUNT(*) AS executions');
expect(sql).toContain('GROUP BY query_hash');
expect(sql).toContain('HAVING COUNT(*) >= 5');
expect(rows).toMatchObject([
{
templateId: 'hash-1',
stats: {
executions: 42,
errorRate: 0.05,
},
topUsers: [{ user: 'ANALYST', executions: 1 }],
},
]);
});
it('throws a clear error when the query client cannot execute SQL', async () => {
const reader = new SnowflakeHistoricSqlQueryHistoryReader();

View file

@ -1,5 +1,12 @@
import { HistoricSqlGrantsMissingError } from './errors.js';
import type { HistoricSqlQueryHistoryReader, HistoricSqlRawQueryRow, HistoricSqlTimeWindow } from './types.js';
import {
aggregatedTemplateSchema,
type AggregatedTemplate,
type HistoricSqlQueryHistoryReader,
type HistoricSqlRawQueryRow,
type HistoricSqlTimeWindow,
type HistoricSqlUnifiedPullConfig,
} from './types.js';
interface QueryResultLike {
headers: string[];
@ -87,7 +94,7 @@ function indexByHeader(headers: string[]): Map<string, number> {
}
function value(row: unknown[], indexes: Map<string, number>, name: string): unknown {
const index = indexes.get(name);
const index = indexes.get(name.toUpperCase());
return index === undefined ? null : row[index];
}
@ -118,6 +125,18 @@ function nullableNumber(raw: unknown): number | null {
return number;
}
function requiredNumber(raw: unknown, field: string): number {
const number = nullableNumber(raw);
if (number === null) {
throw new Error(`Snowflake QUERY_HISTORY row has invalid ${field}: ${String(raw)}`);
}
return number;
}
function requiredInteger(raw: unknown, field: string): number {
return Math.trunc(requiredNumber(raw, field));
}
function nullableInteger(raw: unknown): number | null {
const number = nullableNumber(raw);
return number === null ? null : Math.trunc(number);
@ -173,6 +192,48 @@ function mapRow(row: unknown[], indexes: Map<string, number>): HistoricSqlRawQue
};
}
function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> {
const text = nullableString(raw);
if (!text) {
return [];
}
try {
const parsed = JSON.parse(text) as unknown;
if (!Array.isArray(parsed)) {
return [];
}
return parsed.flatMap((entry) => {
if (!entry || typeof entry !== 'object') {
return [];
}
const user = nullableString((entry as { user?: unknown }).user);
const executions = nullableInteger((entry as { executions?: unknown }).executions);
return executions === null ? [] : [{ user, executions }];
});
} catch {
return [];
}
}
function mapAggregatedRow(row: unknown[], indexes: Map<string, number>): AggregatedTemplate {
return aggregatedTemplateSchema.parse({
templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'),
canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'),
dialect: 'snowflake',
stats: {
executions: requiredInteger(value(row, indexes, 'executions'), 'executions'),
distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'),
firstSeen: isoTimestamp(value(row, indexes, 'first_seen'), 'first_seen'),
lastSeen: isoTimestamp(value(row, indexes, 'last_seen'), 'last_seen'),
p50RuntimeMs: nullableNumber(value(row, indexes, 'p50_ms')),
p95RuntimeMs: nullableNumber(value(row, indexes, 'p95_ms')),
errorRate: requiredNumber(value(row, indexes, 'error_rate'), 'error_rate'),
rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')),
},
topUsers: parseTopUsers(value(row, indexes, 'top_users')),
});
}
export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader {
async probe(client: unknown): Promise<void> {
let result: QueryResultLike;
@ -200,4 +261,40 @@ export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryH
yield mapRow(row, indexes);
}
}
async *fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,
config: HistoricSqlUnifiedPullConfig,
): AsyncIterable<AggregatedTemplate> {
const sql = `
SELECT
query_hash AS template_id,
MIN(query_text) AS canonical_sql,
COUNT(*) AS executions,
COUNT(DISTINCT user_name) AS distinct_users,
MIN(start_time) AS first_seen,
MAX(start_time) AS last_seen,
APPROX_PERCENTILE(total_elapsed_time, 0.50) AS p50_ms,
APPROX_PERCENTILE(total_elapsed_time, 0.95) AS p95_ms,
DIV0(COUNT_IF(execution_status != 'SUCCESS'), COUNT(*)) AS error_rate,
SUM(rows_produced) AS rows_produced,
ARRAY_AGG(OBJECT_CONSTRUCT('user', user_name, 'executions', 1)) WITHIN GROUP (ORDER BY start_time DESC)::string AS top_users
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE query_text IS NOT NULL
AND query_type IN ('SELECT', 'MERGE')
AND start_time >= ${timestampLiteral(window.start)}
AND start_time < ${timestampLiteral(window.end)}
GROUP BY query_hash
HAVING COUNT(*) >= ${config.minExecutions}
ORDER BY executions DESC`.trim();
const result = await queryClient(client).executeQuery(sql);
if (result.error) {
throw grantsError(result.error);
}
const indexes = indexByHeader(result.headers);
for (const row of result.rows) {
yield mapAggregatedRow(row, indexes);
}
}
}

View file

@ -328,6 +328,7 @@ export {
export { HistoricSqlSourceAdapter } from './adapters/historic-sql/historic-sql.adapter.js';
export { BigQueryHistoricSqlQueryHistoryReader } from './adapters/historic-sql/bigquery-query-history-reader.js';
export type { BigQueryHistoricSqlQueryHistoryReaderOptions } from './adapters/historic-sql/bigquery-query-history-reader.js';
export { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js';
export { PostgresPgssQueryHistoryReader } from './adapters/historic-sql/postgres-pgss-query-history-reader.js';
export { SnowflakeHistoricSqlQueryHistoryReader } from './adapters/historic-sql/snowflake-query-history-reader.js';
export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js';