From b3ebba9f888913b001a3129059f5ea97f3ca215b Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 19:00:01 +0200 Subject: [PATCH] feat: cut over historic sql adapter --- .../historic-sql/chunk-unified.test.ts | 2 + .../adapters/historic-sql/chunk-unified.ts | 4 +- .../historic-sql/historic-sql.adapter.test.ts | 322 ++++-------------- .../historic-sql/historic-sql.adapter.ts | 124 +------ .../src/ingest/adapters/historic-sql/types.ts | 28 +- .../context/src/ingest/local-adapters.test.ts | 11 +- packages/context/src/ingest/local-adapters.ts | 20 +- 7 files changed, 104 insertions(+), 407 deletions(-) diff --git a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts index f20f22ab..1b5716f4 100644 --- a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts @@ -78,6 +78,8 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => { notes: expect.stringContaining('historic_sql_patterns'), }), ]); + expect(result.workUnits[0]?.notes).toContain('emit_historic_sql_evidence'); + expect(result.workUnits[1]?.notes).toContain('emit_historic_sql_evidence'); expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts index ff29f3cd..5ed17c52 100644 --- a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts +++ b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts @@ -42,7 +42,7 @@ export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSe dependencyPaths: ['manifest.json'], peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(), notes: - 'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.', + 'Use historic_sql_table_digest. Read this table usage JSON and emit exactly one table_usage object with emit_historic_sql_evidence. Do not call wiki_write or sl_write_source.', }); } @@ -55,7 +55,7 @@ export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSe dependencyPaths: ['manifest.json'], peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(), notes: - 'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.', + 'Use historic_sql_patterns. Read patterns-input.json and emit pattern objects with emit_historic_sql_evidence. Do not call wiki_write or sl_write_source.', }); } diff --git a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts index 2c038feb..a8015179 100644 --- a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.test.ts @@ -1,51 +1,29 @@ -import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises'; +import { mkdtemp } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { describe, expect, it, vi } from 'vitest'; +import { describe, expect, it } from 'vitest'; import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; import { HistoricSqlSourceAdapter } from './historic-sql.adapter.js'; -import { pgssBaselinePath } from './stage-pgss.js'; -import type { HistoricSqlQueryHistoryReader, PostgresPgssReader } from './types.js'; +import type { HistoricSqlReader } from './types.js'; async function tempDir(): Promise { return mkdtemp(join(tmpdir(), 'historic-sql-adapter-')); } -async function writeJson(root: string, relPath: string, value: unknown): Promise { - const target = join(root, relPath); - await mkdir(join(target, '..'), { recursive: true }); - await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); -} - const sqlAnalysis: SqlAnalysisPort = { async analyzeForFingerprint() { - return { - fingerprint: 'fp_1', - normalizedSql: 'SELECT count(*) FROM analytics.orders WHERE status = ?', - tablesTouched: ['analytics.orders'], - literalSlots: [{ position: 1, type: 'string', exampleValue: 'paid' }], - }; + throw new Error('legacy analyzeForFingerprint must not be used'); }, async analyzeBatch() { return new Map(); }, }; -const reader: HistoricSqlQueryHistoryReader = { - async probe() {}, - async *fetch() { - yield { - id: 'q1', - sql: "SELECT count(*) FROM analytics.orders WHERE status = 'paid'", - user: 'analyst', - startedAt: '2026-05-04T11:00:00.000Z', - endedAt: null, - runtimeMs: 10, - rowsProduced: 1, - success: true, - errorMessage: null, - }; +const reader: HistoricSqlReader = { + async probe() { + return { warnings: [] }; }, + async *fetchAggregated() {}, }; describe('HistoricSqlSourceAdapter', () => { @@ -53,255 +31,71 @@ describe('HistoricSqlSourceAdapter', () => { const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {} }); expect(adapter.source).toBe('historic-sql'); - expect(adapter.skillNames).toEqual(['historic_sql_ingest']); - expect(adapter.reconcileSkillNames).toEqual(['historic_sql_curator']); - expect(adapter.evidenceIndexing).toBe('documents'); - expect(adapter.triageSupported).toBe(true); + expect(adapter.skillNames).toEqual(['historic_sql_table_digest', 'historic_sql_patterns']); + expect(adapter.reconcileSkillNames).toEqual([]); + expect(adapter.evidenceIndexing).toBeUndefined(); + expect(adapter.triageSupported).toBe(false); }); - it('fetches staged templates through injected reader and SqlAnalysisPort', async () => { + it('fetches a unified aggregate snapshot and emits unified WorkUnits', async () => { const stagedDir = await tempDir(); - const adapter = new HistoricSqlSourceAdapter({ - sqlAnalysis, - reader, - queryClient: {}, - now: () => new Date('2026-05-04T12:00:00.000Z'), - }); - - await adapter.fetch( - { - dialect: 'snowflake', - windowDays: 90, - lastSuccessfulCursor: null, - serviceAccountUserPatterns: [], - redactionPatterns: [], - maxTemplatesPerRun: 5000, - }, - stagedDir, - { connectionId: 'conn_1', sourceKey: 'historic-sql' }, - ); - - await expect(adapter.detect(stagedDir)).resolves.toBe(true); - }); - - it('reads triage signals from usage.json and metadata properties', async () => { - const stagedDir = await tempDir(); - await writeJson(stagedDir, 'manifest.json', { - source: 'historic-sql', - connectionId: 'conn_1', - dialect: 'snowflake', - fetchedAt: '2026-05-04T12:00:00.000Z', - windowStart: '2026-02-03T12:00:00.000Z', - windowEnd: '2026-05-04T12:00:00.000Z', - nextSuccessfulCursor: '2026-05-04T11:55:00.000Z', - templateCount: 1, - capped: false, - warnings: [], - templates: [{ id: 'fp_1', fingerprint: 'fp_1', subClusterId: null, path: 'templates/fp_1/page.md' }], - }); - await writeJson(stagedDir, 'templates/fp_1/metadata.json', { - id: 'fp_1', - title: 'snowflake ยท analytics.orders [fp_1]', - path: 'templates/fp_1/page.md', - objectType: 'historic_sql_template', - lastEditedAt: null, - properties: { - fingerprint: 'fp_1', - sub_cluster_id: null, - dialect: 'snowflake', - tables_touched: ['analytics.orders'], - literal_slots: [{ position: 1, type: 'string', classification: 'constant' }], - triage_signals: { - executions_bucket: 'high', - distinct_users_bucket: 'team', - error_rate_bucket: 'ok', - recency_bucket: 'active', - service_account_only: 'false', - slot_summary: '1 constant, 0 runtime', - }, - }, - }); - await writeFile(join(stagedDir, 'templates/fp_1/page.md'), '# fp_1\n', 'utf-8'); - await writeJson(stagedDir, 'templates/fp_1/usage.json', { - stats: { - executions: 20, - distinct_users: 3, - first_seen: '2026-05-01T00:00:00.000Z', - last_seen: '2026-05-04T11:55:00.000Z', - p50_runtime_ms: 100, - p95_runtime_ms: 200, - error_rate: 0, - }, - literal_slots: [{ position: 1, distinct_values: 1, top_values: [['paid', 20]] }], - samples: [], - }); - - const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {} }); - - await expect(adapter.getTriageSignals(stagedDir, 'fp_1')).resolves.toEqual({ - objectType: 'historic_sql_template', - lastEditedAt: '2026-05-04T11:55:00.000Z', - propertyHints: { - executions_bucket: 'high', - distinct_users_bucket: 'team', - error_rate_bucket: 'ok', - recency_bucket: 'active', - service_account_only: 'false', - slot_summary: '1 constant, 0 runtime', - }, - }); - }); - - it('dispatches postgres fetches through PGSS staging and writes the baseline only after pull success', async () => { - const stagedDir = await tempDir(); - const baselineRootDir = await tempDir(); - const baselinePath = pgssBaselinePath(baselineRootDir, 'conn_pg'); - const unusedPerExecutionReader: HistoricSqlQueryHistoryReader = { + const aggregateReader: HistoricSqlReader = { async probe() { - throw new Error('per-execution reader must not be used for postgres'); + return { warnings: [] }; }, - async *fetch() { - throw new Error('per-execution reader must not be used for postgres'); - }, - }; - const postgresReader: PostgresPgssReader = { - async probe() { - return { pgServerVersion: 'PostgreSQL 16.4', warnings: [] }; - }, - async readSnapshot() { - return { - statsResetAt: '2026-05-08T08:00:00.000Z', - deallocCount: 0, - rows: [ - { - queryid: '901', - userid: '11', - username: 'analyst', - dbid: '5', - database: 'warehouse', - query: 'SELECT count(*) FROM analytics.orders WHERE status = $1', - calls: 9, - totalExecTime: 90, - meanExecTime: 10, - totalRows: 18, - }, - ], + async *fetchAggregated() { + yield { + templateId: 'pg:1', + canonicalSql: 'select status, count(*) from public.orders group by status', + dialect: 'postgres', + stats: { + executions: 25, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 10, + p95RuntimeMs: 20, + errorRate: 0, + rowsProduced: 10, + }, + topUsers: [{ user: 'analyst', executions: 25 }], }; }, }; + const batchSqlAnalysis: SqlAnalysisPort = { + async analyzeForFingerprint() { + throw new Error('legacy analyzeForFingerprint must not be used'); + }, + async analyzeBatch() { + return new Map([ + [ + 'pg:1', + { + tablesTouched: ['public.orders'], + columnsByClause: { select: ['status'], groupBy: ['status'] }, + }, + ], + ]); + }, + }; const adapter = new HistoricSqlSourceAdapter({ - sqlAnalysis, - reader: unusedPerExecutionReader, + sqlAnalysis: batchSqlAnalysis, + reader: aggregateReader, queryClient: {}, - postgresReader, - postgresQueryClient: { - async executeQuery() { - return { headers: [], rows: [] }; - }, - }, - postgresBaselineRootDir: baselineRootDir, - now: () => new Date('2026-05-08T12:00:00.000Z'), + now: () => new Date('2026-05-11T00:00:00.000Z'), }); - await adapter.fetch( - { - dialect: 'postgres', - windowDays: 90, - lastSuccessfulCursor: null, - serviceAccountUserPatterns: [], - redactionPatterns: [], - maxTemplatesPerRun: 5000, - minCalls: 5, - }, - stagedDir, - { connectionId: 'conn_pg', sourceKey: 'historic-sql' }, - ); - - const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8')) as { - dialect: string; - baselineFirstRun: boolean; - templates: Array<{ id: string }>; - }; - expect(manifest.dialect).toBe('postgres'); - expect(manifest.baselineFirstRun).toBe(true); - expect(manifest.templates).toEqual([ - { id: 'db5_q901', fingerprint: 'fp_1', subClusterId: null, path: 'templates/db5_q901/page.md' }, - ]); - await expect(readFile(baselinePath, 'utf-8')).rejects.toMatchObject({ code: 'ENOENT' }); - - await adapter.onPullSucceeded({ - connectionId: 'conn_pg', + await adapter.fetch({ dialect: 'postgres', minExecutions: 5 }, stagedDir, { + connectionId: 'warehouse', sourceKey: 'historic-sql', - syncId: 'sync_pg', - trigger: 'scheduled_pull', - completedAt: new Date('2026-05-08T12:01:00.000Z'), - stagedDir, }); - const baseline = JSON.parse(await readFile(baselinePath, 'utf-8')) as { - fetchedAt: string; - templates: Record }>; - }; - expect(baseline.fetchedAt).toBe('2026-05-08T12:00:00.000Z'); - expect(baseline.templates.db5_q901.perUser['11'].calls).toBe(9); - }); - - it('fails postgres fetches clearly when no PGSS reader is configured', async () => { - const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {} }); - - await expect( - adapter.fetch( - { - dialect: 'postgres', - windowDays: 90, - lastSuccessfulCursor: null, - serviceAccountUserPatterns: [], - redactionPatterns: [], - maxTemplatesPerRun: 5000, - minCalls: 5, - }, - await tempDir(), - { connectionId: 'conn_pg', sourceKey: 'historic-sql' }, - ), - ).rejects.toThrow('Historic SQL Postgres fetch requires deps.postgresReader'); - }); - - it('forwards manifest cursor through onPullSucceeded without changing the SourceAdapter signature', async () => { - const stagedDir = await tempDir(); - await writeJson(stagedDir, 'manifest.json', { - source: 'historic-sql', - connectionId: 'conn_1', - dialect: 'snowflake', - fetchedAt: '2026-05-04T12:00:00.000Z', - windowStart: '2026-02-03T12:00:00.000Z', - windowEnd: '2026-05-04T12:00:00.000Z', - nextSuccessfulCursor: '2026-05-04T11:55:00.000Z', - templateCount: 0, - capped: false, - warnings: [], - templates: [], - }); - const onPullSucceeded = vi.fn(async () => {}); - const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {}, onPullSucceeded }); - const completedAt = new Date('2026-05-04T12:01:00.000Z'); - - await adapter.onPullSucceeded({ - connectionId: 'conn_1', - sourceKey: 'historic-sql', - syncId: 'sync_1', - trigger: 'scheduled_pull', - completedAt, - stagedDir, - }); - - expect(onPullSucceeded).toHaveBeenCalledWith({ - connectionId: 'conn_1', - sourceKey: 'historic-sql', - syncId: 'sync_1', - trigger: 'scheduled_pull', - completedAt, - stagedDir, - nextSuccessfulCursor: '2026-05-04T11:55:00.000Z', + await expect(adapter.detect(stagedDir)).resolves.toBe(true); + await expect(adapter.chunk(stagedDir)).resolves.toMatchObject({ + workUnits: [ + { unitKey: 'historic-sql-table-public-orders' }, + { unitKey: 'historic-sql-patterns' }, + ], }); }); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts index e66b1cd1..bd565292 100644 --- a/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts +++ b/packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts @@ -1,39 +1,16 @@ -import { readFile } from 'node:fs/promises'; +import { rm } from 'node:fs/promises'; import { join } from 'node:path'; -import type { - ChunkResult, - DiffSet, - FetchContext, - IngestTrigger, - ScopeDescriptor, - SourceAdapter, - TriageSignals, -} from '../../types.js'; -import { chunkHistoricSqlStagedDir, describeHistoricSqlScope } from './chunk.js'; +import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter } from '../../types.js'; +import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js'; import { detectHistoricSqlStagedDir } from './detect.js'; -import { stageHistoricSqlTemplates } from './stage.js'; -import { - pgssBaselinePath, - stagePgStatStatementsTemplates, - writePgssBaselineAtomic, - type StagePgStatStatementsTemplatesResult, -} from './stage-pgss.js'; -import { - historicSqlManifestSchema, - historicSqlMetadataSchema, - historicSqlPullConfigSchema, - historicSqlUsageSchema, - type HistoricSqlSourceAdapterDeps, -} from './types.js'; +import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js'; +import { type HistoricSqlSourceAdapterDeps } from './types.js'; export class HistoricSqlSourceAdapter implements SourceAdapter { readonly source = 'historic-sql'; - readonly skillNames = ['historic_sql_ingest']; - readonly reconcileSkillNames = ['historic_sql_curator']; - readonly evidenceIndexing = 'documents' as const; - readonly triageSupported = true; - - private readonly pendingPgssBaselines = new Map(); + readonly skillNames = ['historic_sql_table_digest', 'historic_sql_patterns']; + readonly reconcileSkillNames: string[] = []; + readonly triageSupported = false; constructor(private readonly deps: HistoricSqlSourceAdapterDeps) {} @@ -42,94 +19,27 @@ export class HistoricSqlSourceAdapter implements SourceAdapter { } async fetch(pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise { - const config = historicSqlPullConfigSchema.parse(pullConfig); - if (config.dialect === 'postgres') { - if (!this.deps.postgresReader) { - throw new Error('Historic SQL Postgres fetch requires deps.postgresReader'); - } - const postgresQueryClient = this.deps.postgresQueryClient ?? this.deps.queryClient; - if ( - !postgresQueryClient || - typeof postgresQueryClient !== 'object' || - !('executeQuery' in postgresQueryClient) || - typeof (postgresQueryClient as { executeQuery?: unknown }).executeQuery !== 'function' - ) { - throw new Error('Historic SQL Postgres fetch requires deps.postgresQueryClient with executeQuery(sql, params?)'); - } - const result = await stagePgStatStatementsTemplates({ - stagedDir, - connectionId: ctx.connectionId, - queryClient: postgresQueryClient as NonNullable, - reader: this.deps.postgresReader, - sqlAnalysis: this.deps.sqlAnalysis, - pullConfig: config, - baselinePath: pgssBaselinePath(this.deps.postgresBaselineRootDir, ctx.connectionId), - now: this.deps.now?.(), - }); - this.pendingPgssBaselines.set(stagedDir, result); - return; - } - - await stageHistoricSqlTemplates({ + await stageHistoricSqlAggregatedSnapshot({ stagedDir, connectionId: ctx.connectionId, queryClient: this.deps.queryClient, reader: this.deps.reader, sqlAnalysis: this.deps.sqlAnalysis, - pullConfig: config, + pullConfig, now: this.deps.now?.(), }); + if (this.deps.legacyPostgresBaselineRootDir) { + await rm(join(this.deps.legacyPostgresBaselineRootDir, ctx.connectionId, 'pgss-baseline.json'), { + force: true, + }); + } } chunk(stagedDir: string, diffSet?: DiffSet): Promise { - return chunkHistoricSqlStagedDir(stagedDir, diffSet); + return chunkHistoricSqlUnifiedStagedDir(stagedDir, diffSet); } describeScope(stagedDir: string): Promise { - return describeHistoricSqlScope(stagedDir); - } - - async getTriageSignals(stagedDir: string, externalId: string): Promise { - const manifest = historicSqlManifestSchema.parse( - JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8')), - ); - const template = manifest.templates.find((entry) => entry.id === externalId); - if (!template) { - return {}; - } - const templateDir = template.path.replace(/\/page\.md$/, ''); - const metadata = historicSqlMetadataSchema.parse( - JSON.parse(await readFile(join(stagedDir, templateDir, 'metadata.json'), 'utf-8')), - ); - const usage = historicSqlUsageSchema.parse( - JSON.parse(await readFile(join(stagedDir, templateDir, 'usage.json'), 'utf-8')), - ); - - return { - objectType: metadata.objectType, - lastEditedAt: usage.stats.last_seen, - propertyHints: metadata.properties.triage_signals, - }; - } - - async onPullSucceeded(ctx: { - connectionId: string; - sourceKey: string; - syncId: string; - trigger: IngestTrigger; - completedAt: Date; - stagedDir: string; - }): Promise { - const manifest = historicSqlManifestSchema.parse( - JSON.parse(await readFile(join(ctx.stagedDir, 'manifest.json'), 'utf-8')), - ); - if (manifest.dialect === 'postgres') { - const pending = this.pendingPgssBaselines.get(ctx.stagedDir); - if (pending) { - await writePgssBaselineAtomic(pending.baselinePath, pending.baseline); - this.pendingPgssBaselines.delete(ctx.stagedDir); - } - } - await this.deps.onPullSucceeded?.({ ...ctx, nextSuccessfulCursor: manifest.nextSuccessfulCursor }); + return describeHistoricSqlUnifiedScope(stagedDir); } } diff --git a/packages/context/src/ingest/adapters/historic-sql/types.ts b/packages/context/src/ingest/adapters/historic-sql/types.ts index 6ed75459..6a56ed28 100644 --- a/packages/context/src/ingest/adapters/historic-sql/types.ts +++ b/packages/context/src/ingest/adapters/historic-sql/types.ts @@ -28,10 +28,17 @@ export const historicSqlUnifiedPullConfigSchema = z.preprocess((value) => { if (!isRecord(value)) { return value; } - if (value.minExecutions === undefined && typeof value.minCalls === 'number') { - return { ...value, minExecutions: value.minCalls }; + const next: Record = { ...value }; + if (next.minExecutions === undefined && typeof next.minCalls === 'number') { + next.minExecutions = next.minCalls; } - return value; + if (!next.filters && Array.isArray(next.serviceAccountUserPatterns)) { + next.filters = { + serviceAccounts: { patterns: next.serviceAccountUserPatterns, mode: 'exclude' }, + dropTrivialProbes: true, + }; + } + return next; }, z.object({ dialect: historicSqlDialectSchema, windowDays: z.number().int().positive().default(90), @@ -222,21 +229,10 @@ export interface PostgresPgssAggregateRow { export interface HistoricSqlSourceAdapterDeps { sqlAnalysis: SqlAnalysisPort; - reader: HistoricSqlQueryHistoryReader; + reader: HistoricSqlReader; queryClient: unknown; - postgresReader?: PostgresPgssReader; - postgresQueryClient?: KtxPostgresQueryClient; - postgresBaselineRootDir?: string; + legacyPostgresBaselineRootDir?: string; now?: () => Date; - onPullSucceeded?: (ctx: { - connectionId: string; - sourceKey: string; - syncId: string; - trigger: import('../../types.js').IngestTrigger; - completedAt: Date; - stagedDir: string; - nextSuccessfulCursor: string | null; - }) => Promise; } const historicSqlLiteralSlotClassificationSchema = z.enum(['constant', 'runtime', 'categorical']); diff --git a/packages/context/src/ingest/local-adapters.test.ts b/packages/context/src/ingest/local-adapters.test.ts index ae1aeb5c..cf91ad31 100644 --- a/packages/context/src/ingest/local-adapters.test.ts +++ b/packages/context/src/ingest/local-adapters.test.ts @@ -152,11 +152,14 @@ describe('local ingest adapters', () => { await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).resolves.toEqual({ dialect: 'postgres', windowDays: 90, - lastSuccessfulCursor: null, - serviceAccountUserPatterns: ['^svc_'], + minExecutions: 7, + concurrency: 12, + filters: { + serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' }, + dropTrivialProbes: true, + }, redactionPatterns: [], - maxTemplatesPerRun: 123, - minCalls: 7, + staleArchiveAfterDays: 90, }); }); diff --git a/packages/context/src/ingest/local-adapters.ts b/packages/context/src/ingest/local-adapters.ts index 51681774..e6977875 100644 --- a/packages/context/src/ingest/local-adapters.ts +++ b/packages/context/src/ingest/local-adapters.ts @@ -6,11 +6,10 @@ import type { SqlAnalysisPort } from '../sql-analysis/index.js'; import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; import { HistoricSqlSourceAdapter } from './adapters/historic-sql/historic-sql.adapter.js'; -import { PostgresPgssQueryHistoryReader } from './adapters/historic-sql/postgres-pgss-query-history-reader.js'; -import { SnowflakeHistoricSqlQueryHistoryReader } from './adapters/historic-sql/snowflake-query-history-reader.js'; +import { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js'; import { HISTORIC_SQL_SOURCE_KEY, - historicSqlPullConfigSchema, + historicSqlUnifiedPullConfigSchema, type KtxPostgresQueryClient, } from './adapters/historic-sql/types.js'; import { @@ -94,15 +93,9 @@ export function createDefaultLocalIngestAdapters( adapters.push( new HistoricSqlSourceAdapter({ sqlAnalysis: options.historicSql.sqlAnalysis, - reader: new SnowflakeHistoricSqlQueryHistoryReader(), - queryClient: { - executeQuery: async () => { - throw new Error('Local historic-SQL currently supports Postgres pg_stat_statements only'); - }, - }, - postgresReader: new PostgresPgssQueryHistoryReader(), - postgresQueryClient: options.historicSql.postgresQueryClient, - postgresBaselineRootDir: options.historicSql.postgresBaselineRootDir, + reader: new PostgresPgssReader(), + queryClient: options.historicSql.postgresQueryClient, + legacyPostgresBaselineRootDir: options.historicSql.postgresBaselineRootDir, now: options.historicSql.now, }), ); @@ -180,9 +173,8 @@ export async function localPullConfigForAdapter( if (historicSql?.enabled !== true) { throw new Error(`Connection "${connectionId}" does not have historicSql.enabled: true`); } - return historicSqlPullConfigSchema.parse({ + return historicSqlUnifiedPullConfigSchema.parse({ ...historicSql, - lastSuccessfulCursor: stringField(historicSql.lastSuccessfulCursor), }); } if (adapter.source === 'looker') {