refactor: remove legacy historic sql reader fetch surface

This commit is contained in:
Andrey Avtomonov 2026-05-11 19:13:16 +02:00
parent c91331b57a
commit 109dd8d836
7 changed files with 48 additions and 412 deletions

View file

@ -63,122 +63,6 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
await expect(reader.probe(client)).rejects.toBeInstanceOf(HistoricSqlGrantsMissingError);
});
it('fetches BigQuery jobs with cursor and maps them into RawQueryRow shape without rowsProduced', async () => {
const client = queryClient([
{
headers: [
'job_id',
'query',
'user_email',
'creation_time',
'end_time',
'runtime_ms',
'total_slot_ms',
'total_bytes_processed',
'state',
'error_reason',
'error_message',
'statement_type',
],
rows: [
[
'bquxjob_1',
"SELECT COUNT(*) FROM `project-1.analytics.orders` WHERE status = 'paid'",
'analyst-a@example.test',
'2026-05-04T10:00:00.000Z',
'2026-05-04T10:00:01.250Z',
1250,
3106,
161164718,
'DONE',
null,
null,
'SELECT',
],
[
'bquxjob_2',
'SELECT * FROM `project-1.analytics.missing_table`',
'analyst-b@example.test',
new Date('2026-05-04T10:05:00.000Z'),
null,
null,
0,
0,
'DONE',
'notFound',
'Not found: Table project-1.analytics.missing_table',
'SELECT',
],
],
totalRows: 2,
},
]);
const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'US' });
const rows = [];
for await (const row of reader.fetch(
client,
{
start: new Date('2026-05-01T00:00:00.000Z'),
end: new Date('2026-05-04T12:00:00.000Z'),
},
'2026-05-03T00:00:00.000Z',
)) {
rows.push(row);
}
expect(client.executeQuery).toHaveBeenCalledTimes(1);
const sql = firstQuery(client);
expect(sql).toContain('FROM `project-1.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`');
expect(sql).toContain("creation_time >= TIMESTAMP('2026-05-03T00:00:00.000Z')");
expect(sql).toContain("creation_time < TIMESTAMP('2026-05-04T12:00:00.000Z')");
expect(sql).toContain("job_type = 'QUERY'");
expect(sql).toContain("(statement_type IS NULL OR statement_type != 'SCRIPT')");
expect(sql).toContain('ORDER BY creation_time ASC, job_id ASC');
expect(sql).toContain('total_slot_ms');
expect(sql).toContain('total_bytes_processed');
expect(sql).not.toMatch(/total_rows/i);
expect(rows).toEqual([
{
id: 'bquxjob_1',
sql: "SELECT COUNT(*) FROM `project-1.analytics.orders` WHERE status = 'paid'",
user: 'analyst-a@example.test',
startedAt: '2026-05-04T10:00:00.000Z',
endedAt: '2026-05-04T10:00:01.250Z',
runtimeMs: 1250,
success: true,
errorMessage: null,
},
{
id: 'bquxjob_2',
sql: 'SELECT * FROM `project-1.analytics.missing_table`',
user: 'analyst-b@example.test',
startedAt: '2026-05-04T10:05:00.000Z',
endedAt: null,
runtimeMs: null,
success: false,
errorMessage: 'notFound: Not found: Table project-1.analytics.missing_table',
},
]);
});
it('uses the window start when no cursor is available', async () => {
const client = queryClient([{ headers: ['job_id'], rows: [], totalRows: 0 }]);
const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'EU' });
for await (const _row of reader.fetch(client, {
start: new Date('2026-02-03T12:00:00.000Z'),
end: new Date('2026-05-04T12:00:00.000Z'),
})) {
throw new Error('empty result should not yield rows');
}
const sql = firstQuery(client);
expect(sql).toContain('FROM `project-1.region-eu.INFORMATION_SCHEMA.JOBS_BY_PROJECT`');
expect(sql).toContain("creation_time >= TIMESTAMP('2026-02-03T12:00:00.000Z')");
});
it('fetches aggregated BigQuery query templates', async () => {
const client = queryClient([
{
@ -245,7 +129,19 @@ describe('BigQueryHistoricSqlQueryHistoryReader', () => {
const reader = new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'project-1', region: 'US' });
await expect(async () => {
for await (const _row of reader.fetch({}, { start: new Date(), end: new Date() })) {
for await (const _row of reader.fetchAggregated(
{},
{ start: new Date(), end: new Date() },
{
dialect: 'bigquery',
minExecutions: 5,
windowDays: 90,
concurrency: 12,
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,
},
)) {
throw new Error('unreachable');
}
}).rejects.toThrow('Historic SQL BigQuery reader requires a query client with executeQuery(query)');

View file

@ -2,8 +2,6 @@ import { HistoricSqlGrantsMissingError } from './errors.js';
import {
aggregatedTemplateSchema,
type AggregatedTemplate,
type HistoricSqlQueryHistoryReader,
type HistoricSqlRawQueryRow,
type HistoricSqlTimeWindow,
type HistoricSqlUnifiedPullConfig,
} from './types.js';
@ -146,42 +144,6 @@ function isoTimestamp(raw: unknown, field: string): string {
return date.toISOString();
}
function nullableIsoTimestamp(raw: unknown): string | null {
if (raw === null || raw === undefined || raw === '') {
return null;
}
return isoTimestamp(raw, 'end_time');
}
function executionSucceeded(state: string | null, errorReason: string | null, errorMessage: string | null): boolean {
if (errorReason || errorMessage) {
return false;
}
return state === null || state.toUpperCase() === 'DONE';
}
function combinedErrorMessage(errorReason: string | null, errorMessage: string | null): string | null {
if (errorReason && errorMessage) {
return `${errorReason}: ${errorMessage}`;
}
return errorMessage ?? errorReason;
}
function mapRow(row: unknown[], indexes: Map<string, number>): HistoricSqlRawQueryRow {
const errorReason = nullableString(value(row, indexes, 'error_reason'));
const errorMessage = nullableString(value(row, indexes, 'error_message'));
return {
id: requiredString(value(row, indexes, 'job_id'), 'job_id'),
sql: requiredString(value(row, indexes, 'query'), 'query'),
user: nullableString(value(row, indexes, 'user_email')),
startedAt: isoTimestamp(value(row, indexes, 'creation_time'), 'creation_time'),
endedAt: nullableIsoTimestamp(value(row, indexes, 'end_time')),
runtimeMs: nullableNumber(value(row, indexes, 'runtime_ms')),
success: executionSucceeded(nullableString(value(row, indexes, 'state')), errorReason, errorMessage),
errorMessage: combinedErrorMessage(errorReason, errorMessage),
};
}
function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> {
const text = nullableString(raw);
if (!text) {
@ -224,7 +186,7 @@ function mapAggregatedRow(row: unknown[], indexes: Map<string, number>): Aggrega
});
}
export class BigQueryHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader {
export class BigQueryHistoricSqlQueryHistoryReader {
private readonly viewPath: string;
constructor(options: BigQueryHistoricSqlQueryHistoryReaderOptions) {
@ -245,44 +207,6 @@ export class BigQueryHistoricSqlQueryHistoryReader implements HistoricSqlQueryHi
}
}
async *fetch(
client: unknown,
window: HistoricSqlTimeWindow,
cursor?: string | null,
): AsyncIterable<HistoricSqlRawQueryRow> {
const start = timestampExpression(cursor ?? window.start);
const end = timestampExpression(window.end);
const sql = `
SELECT
job_id,
query,
user_email,
creation_time,
end_time,
TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND) AS runtime_ms,
total_slot_ms,
total_bytes_processed,
state,
error_result.reason AS error_reason,
error_result.message AS error_message,
statement_type
FROM ${this.viewPath}
WHERE creation_time >= ${start}
AND creation_time < ${end}
AND job_type = 'QUERY'
AND query IS NOT NULL
AND (statement_type IS NULL OR statement_type != 'SCRIPT')
ORDER BY creation_time ASC, job_id ASC`.trim();
const result = await queryClient(client).executeQuery(sql);
if (result.error) {
throw grantsError(result.error);
}
const indexes = indexByHeader(result.headers);
for (const row of result.rows) {
yield mapRow(row, indexes);
}
}
async *fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,

View file

@ -2,6 +2,7 @@ import { describe, expect, it } from 'vitest';
import {
historicSqlEvidenceEnvelopeSchema,
historicSqlEvidencePath,
historicSqlPatternEvidenceSchema,
historicSqlTableUsageEvidenceSchema,
} from './evidence.js';
@ -27,20 +28,22 @@ describe('historic-sql evidence contracts', () => {
});
it('validates pattern evidence emitted by the patterns WorkUnit', () => {
const parsed = historicSqlEvidenceEnvelopeSchema.parse({
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
narrative: 'Analysts compare order status changes by customer segment.',
definitionSql: 'select status, count(*) from public.orders group by status',
tablesInvolved: ['public.orders', 'public.customers'],
slRefs: ['orders', 'customers'],
constituentTemplateIds: ['pg:1', 'pg:2'],
},
});
const parsed = historicSqlPatternEvidenceSchema.parse(
historicSqlEvidenceEnvelopeSchema.parse({
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
narrative: 'Analysts compare order status changes by customer segment.',
definitionSql: 'select status, count(*) from public.orders group by status',
tablesInvolved: ['public.orders', 'public.customers'],
slRefs: ['orders', 'customers'],
constituentTemplateIds: ['pg:1', 'pg:2'],
},
}),
);
expect(parsed.kind).toBe('pattern');
expect(parsed.pattern.slug).toBe('order-lifecycle-analysis');

View file

@ -3,6 +3,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it } from 'vitest';
import type { SqlAnalysisPort } from '../../../sql-analysis/index.js';
import type { SourceAdapter } from '../../types.js';
import { HistoricSqlSourceAdapter } from './historic-sql.adapter.js';
import type { HistoricSqlReader } from './types.js';
@ -33,7 +34,7 @@ describe('HistoricSqlSourceAdapter', () => {
expect(adapter.source).toBe('historic-sql');
expect(adapter.skillNames).toEqual(['historic_sql_table_digest', 'historic_sql_patterns']);
expect(adapter.reconcileSkillNames).toEqual([]);
expect(adapter.evidenceIndexing).toBeUndefined();
expect((adapter as SourceAdapter).evidenceIndexing).toBeUndefined();
expect(adapter.triageSupported).toBe(false);
});

View file

@ -62,125 +62,6 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
await expect(reader.probe(client)).rejects.toBeInstanceOf(HistoricSqlGrantsMissingError);
});
it('fetches query-history rows with cursor and maps them into RawQueryRow shape', async () => {
const client = queryClient([
{
headers: [
'QUERY_ID',
'QUERY_TEXT',
'USER_NAME',
'ROLE_NAME',
'WAREHOUSE_NAME',
'DATABASE_NAME',
'SCHEMA_NAME',
'START_TIME',
'END_TIME',
'TOTAL_ELAPSED_TIME',
'ROWS_PRODUCED',
'EXECUTION_STATUS',
'ERROR_CODE',
'ERROR_MESSAGE',
],
rows: [
[
'01a',
"SELECT count(*) FROM ANALYTICS.ORDERS WHERE STATUS = 'paid'",
'ANALYST_A',
'ANALYST_ROLE',
'WH_XS',
'ANALYTICS',
'PUBLIC',
'2026-05-04T10:00:00.000Z',
'2026-05-04T10:00:01.250Z',
1250,
12,
'SUCCESS',
null,
null,
],
[
'01b',
'SELECT * FROM MISSING_TABLE',
'ANALYST_B',
'ANALYST_ROLE',
'WH_XS',
'ANALYTICS',
'PUBLIC',
new Date('2026-05-04T10:05:00.000Z'),
null,
null,
null,
'FAILED_WITH_ERROR',
'002003',
'SQL compilation error',
],
],
totalRows: 2,
},
]);
const reader = new SnowflakeHistoricSqlQueryHistoryReader();
const rows = [];
for await (const row of reader.fetch(
client,
{
start: new Date('2026-05-01T00:00:00.000Z'),
end: new Date('2026-05-04T12:00:00.000Z'),
},
'2026-05-03T00:00:00.000Z',
)) {
rows.push(row);
}
expect(client.executeQuery).toHaveBeenCalledTimes(1);
const sql = firstQuery(client);
expect(sql).toContain('FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY');
expect(sql).toContain("START_TIME >= '2026-05-03T00:00:00.000Z'::TIMESTAMP_TZ");
expect(sql).toContain("START_TIME < '2026-05-04T12:00:00.000Z'::TIMESTAMP_TZ");
expect(sql).toContain('ORDER BY START_TIME ASC, QUERY_ID ASC');
expect(sql).toContain('ROWS_PRODUCED');
expect(rows).toEqual([
{
id: '01a',
sql: "SELECT count(*) FROM ANALYTICS.ORDERS WHERE STATUS = 'paid'",
user: 'ANALYST_A',
startedAt: '2026-05-04T10:00:00.000Z',
endedAt: '2026-05-04T10:00:01.250Z',
runtimeMs: 1250,
rowsProduced: 12,
success: true,
errorMessage: null,
},
{
id: '01b',
sql: 'SELECT * FROM MISSING_TABLE',
user: 'ANALYST_B',
startedAt: '2026-05-04T10:05:00.000Z',
endedAt: null,
runtimeMs: null,
rowsProduced: null,
success: false,
errorMessage: '002003: SQL compilation error',
},
]);
});
it('uses the window start when no cursor is available', async () => {
const client = queryClient([{ headers: ['QUERY_ID'], rows: [], totalRows: 0 }]);
const reader = new SnowflakeHistoricSqlQueryHistoryReader();
for await (const _row of reader.fetch(client, {
start: new Date('2026-02-03T12:00:00.000Z'),
end: new Date('2026-05-04T12:00:00.000Z'),
})) {
throw new Error('empty result should not yield rows');
}
const sql = firstQuery(client);
expect(sql).toContain("START_TIME >= '2026-02-03T12:00:00.000Z'::TIMESTAMP_TZ");
});
it('fetches aggregated Snowflake query templates', async () => {
const client = queryClient([
{
@ -247,7 +128,19 @@ describe('SnowflakeHistoricSqlQueryHistoryReader', () => {
const reader = new SnowflakeHistoricSqlQueryHistoryReader();
await expect(async () => {
for await (const _row of reader.fetch({}, { start: new Date(), end: new Date() })) {
for await (const _row of reader.fetchAggregated(
{},
{ start: new Date(), end: new Date() },
{
dialect: 'snowflake',
minExecutions: 5,
windowDays: 90,
concurrency: 12,
filters: { dropTrivialProbes: true },
redactionPatterns: [],
staleArchiveAfterDays: 90,
},
)) {
throw new Error('unreachable');
}
}).rejects.toThrow('Historic SQL Snowflake reader requires a query client with executeQuery(query)');

View file

@ -2,8 +2,6 @@ import { HistoricSqlGrantsMissingError } from './errors.js';
import {
aggregatedTemplateSchema,
type AggregatedTemplate,
type HistoricSqlQueryHistoryReader,
type HistoricSqlRawQueryRow,
type HistoricSqlTimeWindow,
type HistoricSqlUnifiedPullConfig,
} from './types.js';
@ -59,32 +57,6 @@ function timestampLiteral(value: Date | string): string {
return `'${date.toISOString().replace(/'/g, "''")}'::TIMESTAMP_TZ`;
}
function queryHistorySql(window: HistoricSqlTimeWindow, cursor?: string | null): string {
const start = timestampLiteral(cursor ?? window.start);
const end = timestampLiteral(window.end);
return `
SELECT
QUERY_ID,
QUERY_TEXT,
USER_NAME,
ROLE_NAME,
WAREHOUSE_NAME,
DATABASE_NAME,
SCHEMA_NAME,
START_TIME,
END_TIME,
TOTAL_ELAPSED_TIME,
ROWS_PRODUCED,
EXECUTION_STATUS,
ERROR_CODE,
ERROR_MESSAGE
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE START_TIME >= ${start}
AND START_TIME < ${end}
AND QUERY_TEXT IS NOT NULL
ORDER BY START_TIME ASC, QUERY_ID ASC`.trim();
}
function indexByHeader(headers: string[]): Map<string, number> {
const out = new Map<string, number>();
headers.forEach((header, index) => {
@ -154,44 +126,6 @@ function isoTimestamp(raw: unknown, field: string): string {
return date.toISOString();
}
function nullableIsoTimestamp(raw: unknown): string | null {
if (raw === null || raw === undefined || raw === '') {
return null;
}
return isoTimestamp(raw, 'END_TIME');
}
function executionSucceeded(status: string | null, errorCode: string | null, errorMessage: string | null): boolean {
if (errorCode || errorMessage) {
return false;
}
return status === null || status.toUpperCase().startsWith('SUCCESS');
}
function combinedErrorMessage(errorCode: string | null, errorMessage: string | null): string | null {
if (errorCode && errorMessage) {
return `${errorCode}: ${errorMessage}`;
}
return errorMessage ?? errorCode;
}
function mapRow(row: unknown[], indexes: Map<string, number>): HistoricSqlRawQueryRow {
const errorCode = nullableString(value(row, indexes, 'ERROR_CODE'));
const errorMessage = nullableString(value(row, indexes, 'ERROR_MESSAGE'));
const rowsProduced = nullableInteger(value(row, indexes, 'ROWS_PRODUCED'));
return {
id: requiredString(value(row, indexes, 'QUERY_ID'), 'QUERY_ID'),
sql: requiredString(value(row, indexes, 'QUERY_TEXT'), 'QUERY_TEXT'),
user: nullableString(value(row, indexes, 'USER_NAME')),
startedAt: isoTimestamp(value(row, indexes, 'START_TIME'), 'START_TIME'),
endedAt: nullableIsoTimestamp(value(row, indexes, 'END_TIME')),
runtimeMs: nullableNumber(value(row, indexes, 'TOTAL_ELAPSED_TIME')),
rowsProduced,
success: executionSucceeded(nullableString(value(row, indexes, 'EXECUTION_STATUS')), errorCode, errorMessage),
errorMessage: combinedErrorMessage(errorCode, errorMessage),
};
}
function parseTopUsers(raw: unknown): Array<{ user: string | null; executions: number }> {
const text = nullableString(raw);
if (!text) {
@ -234,7 +168,7 @@ function mapAggregatedRow(row: unknown[], indexes: Map<string, number>): Aggrega
});
}
export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryHistoryReader {
export class SnowflakeHistoricSqlQueryHistoryReader {
async probe(client: unknown): Promise<void> {
let result: QueryResultLike;
try {
@ -247,21 +181,6 @@ export class SnowflakeHistoricSqlQueryHistoryReader implements HistoricSqlQueryH
}
}
async *fetch(
client: unknown,
window: HistoricSqlTimeWindow,
cursor?: string | null,
): AsyncIterable<HistoricSqlRawQueryRow> {
const result = await queryClient(client).executeQuery(queryHistorySql(window, cursor));
if (result.error) {
throw grantsError(result.error);
}
const indexes = indexByHeader(result.headers);
for (const row of result.rows) {
yield mapRow(row, indexes);
}
}
async *fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,

View file

@ -508,7 +508,7 @@ class LocalIngestToolsetFactory implements IngestToolsetFactoryPort {
}
createIngestWuToolset(session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike {
const sourceTools =
const sourceTools: Record<string, Tool> =
session.ingest?.sourceKey === 'historic-sql'
? {
emit_historic_sql_evidence: createEmitHistoricSqlEvidenceTool({