mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
feat: cut over historic sql adapter
This commit is contained in:
parent
da263c0957
commit
b3ebba9f88
7 changed files with 104 additions and 407 deletions
|
|
@ -78,6 +78,8 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => {
|
|||
notes: expect.stringContaining('historic_sql_patterns'),
|
||||
}),
|
||||
]);
|
||||
expect(result.workUnits[0]?.notes).toContain('emit_historic_sql_evidence');
|
||||
expect(result.workUnits[1]?.notes).toContain('emit_historic_sql_evidence');
|
||||
expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']);
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSe
|
|||
dependencyPaths: ['manifest.json'],
|
||||
peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(),
|
||||
notes:
|
||||
'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.',
|
||||
'Use historic_sql_table_digest. Read this table usage JSON and emit exactly one table_usage object with emit_historic_sql_evidence. Do not call wiki_write or sl_write_source.',
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSe
|
|||
dependencyPaths: ['manifest.json'],
|
||||
peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(),
|
||||
notes:
|
||||
'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.',
|
||||
'Use historic_sql_patterns. Read patterns-input.json and emit pattern objects with emit_historic_sql_evidence. Do not call wiki_write or sl_write_source.',
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,51 +1,29 @@
|
|||
import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises';
|
||||
import { mkdtemp } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import type { SqlAnalysisPort } from '../../../sql-analysis/index.js';
|
||||
import { HistoricSqlSourceAdapter } from './historic-sql.adapter.js';
|
||||
import { pgssBaselinePath } from './stage-pgss.js';
|
||||
import type { HistoricSqlQueryHistoryReader, PostgresPgssReader } from './types.js';
|
||||
import type { HistoricSqlReader } from './types.js';
|
||||
|
||||
async function tempDir(): Promise<string> {
|
||||
return mkdtemp(join(tmpdir(), 'historic-sql-adapter-'));
|
||||
}
|
||||
|
||||
async function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
|
||||
const target = join(root, relPath);
|
||||
await mkdir(join(target, '..'), { recursive: true });
|
||||
await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
|
||||
}
|
||||
|
||||
const sqlAnalysis: SqlAnalysisPort = {
|
||||
async analyzeForFingerprint() {
|
||||
return {
|
||||
fingerprint: 'fp_1',
|
||||
normalizedSql: 'SELECT count(*) FROM analytics.orders WHERE status = ?',
|
||||
tablesTouched: ['analytics.orders'],
|
||||
literalSlots: [{ position: 1, type: 'string', exampleValue: 'paid' }],
|
||||
};
|
||||
throw new Error('legacy analyzeForFingerprint must not be used');
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map();
|
||||
},
|
||||
};
|
||||
|
||||
const reader: HistoricSqlQueryHistoryReader = {
|
||||
async probe() {},
|
||||
async *fetch() {
|
||||
yield {
|
||||
id: 'q1',
|
||||
sql: "SELECT count(*) FROM analytics.orders WHERE status = 'paid'",
|
||||
user: 'analyst',
|
||||
startedAt: '2026-05-04T11:00:00.000Z',
|
||||
endedAt: null,
|
||||
runtimeMs: 10,
|
||||
rowsProduced: 1,
|
||||
success: true,
|
||||
errorMessage: null,
|
||||
};
|
||||
const reader: HistoricSqlReader = {
|
||||
async probe() {
|
||||
return { warnings: [] };
|
||||
},
|
||||
async *fetchAggregated() {},
|
||||
};
|
||||
|
||||
describe('HistoricSqlSourceAdapter', () => {
|
||||
|
|
@ -53,255 +31,71 @@ describe('HistoricSqlSourceAdapter', () => {
|
|||
const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {} });
|
||||
|
||||
expect(adapter.source).toBe('historic-sql');
|
||||
expect(adapter.skillNames).toEqual(['historic_sql_ingest']);
|
||||
expect(adapter.reconcileSkillNames).toEqual(['historic_sql_curator']);
|
||||
expect(adapter.evidenceIndexing).toBe('documents');
|
||||
expect(adapter.triageSupported).toBe(true);
|
||||
expect(adapter.skillNames).toEqual(['historic_sql_table_digest', 'historic_sql_patterns']);
|
||||
expect(adapter.reconcileSkillNames).toEqual([]);
|
||||
expect(adapter.evidenceIndexing).toBeUndefined();
|
||||
expect(adapter.triageSupported).toBe(false);
|
||||
});
|
||||
|
||||
it('fetches staged templates through injected reader and SqlAnalysisPort', async () => {
|
||||
it('fetches a unified aggregate snapshot and emits unified WorkUnits', async () => {
|
||||
const stagedDir = await tempDir();
|
||||
const adapter = new HistoricSqlSourceAdapter({
|
||||
sqlAnalysis,
|
||||
reader,
|
||||
queryClient: {},
|
||||
now: () => new Date('2026-05-04T12:00:00.000Z'),
|
||||
});
|
||||
|
||||
await adapter.fetch(
|
||||
{
|
||||
dialect: 'snowflake',
|
||||
windowDays: 90,
|
||||
lastSuccessfulCursor: null,
|
||||
serviceAccountUserPatterns: [],
|
||||
redactionPatterns: [],
|
||||
maxTemplatesPerRun: 5000,
|
||||
},
|
||||
stagedDir,
|
||||
{ connectionId: 'conn_1', sourceKey: 'historic-sql' },
|
||||
);
|
||||
|
||||
await expect(adapter.detect(stagedDir)).resolves.toBe(true);
|
||||
});
|
||||
|
||||
it('reads triage signals from usage.json and metadata properties', async () => {
|
||||
const stagedDir = await tempDir();
|
||||
await writeJson(stagedDir, 'manifest.json', {
|
||||
source: 'historic-sql',
|
||||
connectionId: 'conn_1',
|
||||
dialect: 'snowflake',
|
||||
fetchedAt: '2026-05-04T12:00:00.000Z',
|
||||
windowStart: '2026-02-03T12:00:00.000Z',
|
||||
windowEnd: '2026-05-04T12:00:00.000Z',
|
||||
nextSuccessfulCursor: '2026-05-04T11:55:00.000Z',
|
||||
templateCount: 1,
|
||||
capped: false,
|
||||
warnings: [],
|
||||
templates: [{ id: 'fp_1', fingerprint: 'fp_1', subClusterId: null, path: 'templates/fp_1/page.md' }],
|
||||
});
|
||||
await writeJson(stagedDir, 'templates/fp_1/metadata.json', {
|
||||
id: 'fp_1',
|
||||
title: 'snowflake · analytics.orders [fp_1]',
|
||||
path: 'templates/fp_1/page.md',
|
||||
objectType: 'historic_sql_template',
|
||||
lastEditedAt: null,
|
||||
properties: {
|
||||
fingerprint: 'fp_1',
|
||||
sub_cluster_id: null,
|
||||
dialect: 'snowflake',
|
||||
tables_touched: ['analytics.orders'],
|
||||
literal_slots: [{ position: 1, type: 'string', classification: 'constant' }],
|
||||
triage_signals: {
|
||||
executions_bucket: 'high',
|
||||
distinct_users_bucket: 'team',
|
||||
error_rate_bucket: 'ok',
|
||||
recency_bucket: 'active',
|
||||
service_account_only: 'false',
|
||||
slot_summary: '1 constant, 0 runtime',
|
||||
},
|
||||
},
|
||||
});
|
||||
await writeFile(join(stagedDir, 'templates/fp_1/page.md'), '# fp_1\n', 'utf-8');
|
||||
await writeJson(stagedDir, 'templates/fp_1/usage.json', {
|
||||
stats: {
|
||||
executions: 20,
|
||||
distinct_users: 3,
|
||||
first_seen: '2026-05-01T00:00:00.000Z',
|
||||
last_seen: '2026-05-04T11:55:00.000Z',
|
||||
p50_runtime_ms: 100,
|
||||
p95_runtime_ms: 200,
|
||||
error_rate: 0,
|
||||
},
|
||||
literal_slots: [{ position: 1, distinct_values: 1, top_values: [['paid', 20]] }],
|
||||
samples: [],
|
||||
});
|
||||
|
||||
const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {} });
|
||||
|
||||
await expect(adapter.getTriageSignals(stagedDir, 'fp_1')).resolves.toEqual({
|
||||
objectType: 'historic_sql_template',
|
||||
lastEditedAt: '2026-05-04T11:55:00.000Z',
|
||||
propertyHints: {
|
||||
executions_bucket: 'high',
|
||||
distinct_users_bucket: 'team',
|
||||
error_rate_bucket: 'ok',
|
||||
recency_bucket: 'active',
|
||||
service_account_only: 'false',
|
||||
slot_summary: '1 constant, 0 runtime',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('dispatches postgres fetches through PGSS staging and writes the baseline only after pull success', async () => {
|
||||
const stagedDir = await tempDir();
|
||||
const baselineRootDir = await tempDir();
|
||||
const baselinePath = pgssBaselinePath(baselineRootDir, 'conn_pg');
|
||||
const unusedPerExecutionReader: HistoricSqlQueryHistoryReader = {
|
||||
const aggregateReader: HistoricSqlReader = {
|
||||
async probe() {
|
||||
throw new Error('per-execution reader must not be used for postgres');
|
||||
return { warnings: [] };
|
||||
},
|
||||
async *fetch() {
|
||||
throw new Error('per-execution reader must not be used for postgres');
|
||||
},
|
||||
};
|
||||
const postgresReader: PostgresPgssReader = {
|
||||
async probe() {
|
||||
return { pgServerVersion: 'PostgreSQL 16.4', warnings: [] };
|
||||
},
|
||||
async readSnapshot() {
|
||||
return {
|
||||
statsResetAt: '2026-05-08T08:00:00.000Z',
|
||||
deallocCount: 0,
|
||||
rows: [
|
||||
{
|
||||
queryid: '901',
|
||||
userid: '11',
|
||||
username: 'analyst',
|
||||
dbid: '5',
|
||||
database: 'warehouse',
|
||||
query: 'SELECT count(*) FROM analytics.orders WHERE status = $1',
|
||||
calls: 9,
|
||||
totalExecTime: 90,
|
||||
meanExecTime: 10,
|
||||
totalRows: 18,
|
||||
},
|
||||
],
|
||||
async *fetchAggregated() {
|
||||
yield {
|
||||
templateId: 'pg:1',
|
||||
canonicalSql: 'select status, count(*) from public.orders group by status',
|
||||
dialect: 'postgres',
|
||||
stats: {
|
||||
executions: 25,
|
||||
distinctUsers: 3,
|
||||
firstSeen: '2026-05-01T00:00:00.000Z',
|
||||
lastSeen: '2026-05-11T00:00:00.000Z',
|
||||
p50RuntimeMs: 10,
|
||||
p95RuntimeMs: 20,
|
||||
errorRate: 0,
|
||||
rowsProduced: 10,
|
||||
},
|
||||
topUsers: [{ user: 'analyst', executions: 25 }],
|
||||
};
|
||||
},
|
||||
};
|
||||
const batchSqlAnalysis: SqlAnalysisPort = {
|
||||
async analyzeForFingerprint() {
|
||||
throw new Error('legacy analyzeForFingerprint must not be used');
|
||||
},
|
||||
async analyzeBatch() {
|
||||
return new Map([
|
||||
[
|
||||
'pg:1',
|
||||
{
|
||||
tablesTouched: ['public.orders'],
|
||||
columnsByClause: { select: ['status'], groupBy: ['status'] },
|
||||
},
|
||||
],
|
||||
]);
|
||||
},
|
||||
};
|
||||
const adapter = new HistoricSqlSourceAdapter({
|
||||
sqlAnalysis,
|
||||
reader: unusedPerExecutionReader,
|
||||
sqlAnalysis: batchSqlAnalysis,
|
||||
reader: aggregateReader,
|
||||
queryClient: {},
|
||||
postgresReader,
|
||||
postgresQueryClient: {
|
||||
async executeQuery() {
|
||||
return { headers: [], rows: [] };
|
||||
},
|
||||
},
|
||||
postgresBaselineRootDir: baselineRootDir,
|
||||
now: () => new Date('2026-05-08T12:00:00.000Z'),
|
||||
now: () => new Date('2026-05-11T00:00:00.000Z'),
|
||||
});
|
||||
|
||||
await adapter.fetch(
|
||||
{
|
||||
dialect: 'postgres',
|
||||
windowDays: 90,
|
||||
lastSuccessfulCursor: null,
|
||||
serviceAccountUserPatterns: [],
|
||||
redactionPatterns: [],
|
||||
maxTemplatesPerRun: 5000,
|
||||
minCalls: 5,
|
||||
},
|
||||
stagedDir,
|
||||
{ connectionId: 'conn_pg', sourceKey: 'historic-sql' },
|
||||
);
|
||||
|
||||
const manifest = JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8')) as {
|
||||
dialect: string;
|
||||
baselineFirstRun: boolean;
|
||||
templates: Array<{ id: string }>;
|
||||
};
|
||||
expect(manifest.dialect).toBe('postgres');
|
||||
expect(manifest.baselineFirstRun).toBe(true);
|
||||
expect(manifest.templates).toEqual([
|
||||
{ id: 'db5_q901', fingerprint: 'fp_1', subClusterId: null, path: 'templates/db5_q901/page.md' },
|
||||
]);
|
||||
await expect(readFile(baselinePath, 'utf-8')).rejects.toMatchObject({ code: 'ENOENT' });
|
||||
|
||||
await adapter.onPullSucceeded({
|
||||
connectionId: 'conn_pg',
|
||||
await adapter.fetch({ dialect: 'postgres', minExecutions: 5 }, stagedDir, {
|
||||
connectionId: 'warehouse',
|
||||
sourceKey: 'historic-sql',
|
||||
syncId: 'sync_pg',
|
||||
trigger: 'scheduled_pull',
|
||||
completedAt: new Date('2026-05-08T12:01:00.000Z'),
|
||||
stagedDir,
|
||||
});
|
||||
|
||||
const baseline = JSON.parse(await readFile(baselinePath, 'utf-8')) as {
|
||||
fetchedAt: string;
|
||||
templates: Record<string, { perUser: Record<string, { calls: number }> }>;
|
||||
};
|
||||
expect(baseline.fetchedAt).toBe('2026-05-08T12:00:00.000Z');
|
||||
expect(baseline.templates.db5_q901.perUser['11'].calls).toBe(9);
|
||||
});
|
||||
|
||||
it('fails postgres fetches clearly when no PGSS reader is configured', async () => {
|
||||
const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {} });
|
||||
|
||||
await expect(
|
||||
adapter.fetch(
|
||||
{
|
||||
dialect: 'postgres',
|
||||
windowDays: 90,
|
||||
lastSuccessfulCursor: null,
|
||||
serviceAccountUserPatterns: [],
|
||||
redactionPatterns: [],
|
||||
maxTemplatesPerRun: 5000,
|
||||
minCalls: 5,
|
||||
},
|
||||
await tempDir(),
|
||||
{ connectionId: 'conn_pg', sourceKey: 'historic-sql' },
|
||||
),
|
||||
).rejects.toThrow('Historic SQL Postgres fetch requires deps.postgresReader');
|
||||
});
|
||||
|
||||
it('forwards manifest cursor through onPullSucceeded without changing the SourceAdapter signature', async () => {
|
||||
const stagedDir = await tempDir();
|
||||
await writeJson(stagedDir, 'manifest.json', {
|
||||
source: 'historic-sql',
|
||||
connectionId: 'conn_1',
|
||||
dialect: 'snowflake',
|
||||
fetchedAt: '2026-05-04T12:00:00.000Z',
|
||||
windowStart: '2026-02-03T12:00:00.000Z',
|
||||
windowEnd: '2026-05-04T12:00:00.000Z',
|
||||
nextSuccessfulCursor: '2026-05-04T11:55:00.000Z',
|
||||
templateCount: 0,
|
||||
capped: false,
|
||||
warnings: [],
|
||||
templates: [],
|
||||
});
|
||||
const onPullSucceeded = vi.fn(async () => {});
|
||||
const adapter = new HistoricSqlSourceAdapter({ sqlAnalysis, reader, queryClient: {}, onPullSucceeded });
|
||||
const completedAt = new Date('2026-05-04T12:01:00.000Z');
|
||||
|
||||
await adapter.onPullSucceeded({
|
||||
connectionId: 'conn_1',
|
||||
sourceKey: 'historic-sql',
|
||||
syncId: 'sync_1',
|
||||
trigger: 'scheduled_pull',
|
||||
completedAt,
|
||||
stagedDir,
|
||||
});
|
||||
|
||||
expect(onPullSucceeded).toHaveBeenCalledWith({
|
||||
connectionId: 'conn_1',
|
||||
sourceKey: 'historic-sql',
|
||||
syncId: 'sync_1',
|
||||
trigger: 'scheduled_pull',
|
||||
completedAt,
|
||||
stagedDir,
|
||||
nextSuccessfulCursor: '2026-05-04T11:55:00.000Z',
|
||||
await expect(adapter.detect(stagedDir)).resolves.toBe(true);
|
||||
await expect(adapter.chunk(stagedDir)).resolves.toMatchObject({
|
||||
workUnits: [
|
||||
{ unitKey: 'historic-sql-table-public-orders' },
|
||||
{ unitKey: 'historic-sql-patterns' },
|
||||
],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,39 +1,16 @@
|
|||
import { readFile } from 'node:fs/promises';
|
||||
import { rm } from 'node:fs/promises';
|
||||
import { join } from 'node:path';
|
||||
import type {
|
||||
ChunkResult,
|
||||
DiffSet,
|
||||
FetchContext,
|
||||
IngestTrigger,
|
||||
ScopeDescriptor,
|
||||
SourceAdapter,
|
||||
TriageSignals,
|
||||
} from '../../types.js';
|
||||
import { chunkHistoricSqlStagedDir, describeHistoricSqlScope } from './chunk.js';
|
||||
import type { ChunkResult, DiffSet, FetchContext, ScopeDescriptor, SourceAdapter } from '../../types.js';
|
||||
import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js';
|
||||
import { detectHistoricSqlStagedDir } from './detect.js';
|
||||
import { stageHistoricSqlTemplates } from './stage.js';
|
||||
import {
|
||||
pgssBaselinePath,
|
||||
stagePgStatStatementsTemplates,
|
||||
writePgssBaselineAtomic,
|
||||
type StagePgStatStatementsTemplatesResult,
|
||||
} from './stage-pgss.js';
|
||||
import {
|
||||
historicSqlManifestSchema,
|
||||
historicSqlMetadataSchema,
|
||||
historicSqlPullConfigSchema,
|
||||
historicSqlUsageSchema,
|
||||
type HistoricSqlSourceAdapterDeps,
|
||||
} from './types.js';
|
||||
import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js';
|
||||
import { type HistoricSqlSourceAdapterDeps } from './types.js';
|
||||
|
||||
export class HistoricSqlSourceAdapter implements SourceAdapter {
|
||||
readonly source = 'historic-sql';
|
||||
readonly skillNames = ['historic_sql_ingest'];
|
||||
readonly reconcileSkillNames = ['historic_sql_curator'];
|
||||
readonly evidenceIndexing = 'documents' as const;
|
||||
readonly triageSupported = true;
|
||||
|
||||
private readonly pendingPgssBaselines = new Map<string, StagePgStatStatementsTemplatesResult>();
|
||||
readonly skillNames = ['historic_sql_table_digest', 'historic_sql_patterns'];
|
||||
readonly reconcileSkillNames: string[] = [];
|
||||
readonly triageSupported = false;
|
||||
|
||||
constructor(private readonly deps: HistoricSqlSourceAdapterDeps) {}
|
||||
|
||||
|
|
@ -42,94 +19,27 @@ export class HistoricSqlSourceAdapter implements SourceAdapter {
|
|||
}
|
||||
|
||||
async fetch(pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise<void> {
|
||||
const config = historicSqlPullConfigSchema.parse(pullConfig);
|
||||
if (config.dialect === 'postgres') {
|
||||
if (!this.deps.postgresReader) {
|
||||
throw new Error('Historic SQL Postgres fetch requires deps.postgresReader');
|
||||
}
|
||||
const postgresQueryClient = this.deps.postgresQueryClient ?? this.deps.queryClient;
|
||||
if (
|
||||
!postgresQueryClient ||
|
||||
typeof postgresQueryClient !== 'object' ||
|
||||
!('executeQuery' in postgresQueryClient) ||
|
||||
typeof (postgresQueryClient as { executeQuery?: unknown }).executeQuery !== 'function'
|
||||
) {
|
||||
throw new Error('Historic SQL Postgres fetch requires deps.postgresQueryClient with executeQuery(sql, params?)');
|
||||
}
|
||||
const result = await stagePgStatStatementsTemplates({
|
||||
stagedDir,
|
||||
connectionId: ctx.connectionId,
|
||||
queryClient: postgresQueryClient as NonNullable<HistoricSqlSourceAdapterDeps['postgresQueryClient']>,
|
||||
reader: this.deps.postgresReader,
|
||||
sqlAnalysis: this.deps.sqlAnalysis,
|
||||
pullConfig: config,
|
||||
baselinePath: pgssBaselinePath(this.deps.postgresBaselineRootDir, ctx.connectionId),
|
||||
now: this.deps.now?.(),
|
||||
});
|
||||
this.pendingPgssBaselines.set(stagedDir, result);
|
||||
return;
|
||||
}
|
||||
|
||||
await stageHistoricSqlTemplates({
|
||||
await stageHistoricSqlAggregatedSnapshot({
|
||||
stagedDir,
|
||||
connectionId: ctx.connectionId,
|
||||
queryClient: this.deps.queryClient,
|
||||
reader: this.deps.reader,
|
||||
sqlAnalysis: this.deps.sqlAnalysis,
|
||||
pullConfig: config,
|
||||
pullConfig,
|
||||
now: this.deps.now?.(),
|
||||
});
|
||||
if (this.deps.legacyPostgresBaselineRootDir) {
|
||||
await rm(join(this.deps.legacyPostgresBaselineRootDir, ctx.connectionId, 'pgss-baseline.json'), {
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
chunk(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult> {
|
||||
return chunkHistoricSqlStagedDir(stagedDir, diffSet);
|
||||
return chunkHistoricSqlUnifiedStagedDir(stagedDir, diffSet);
|
||||
}
|
||||
|
||||
describeScope(stagedDir: string): Promise<ScopeDescriptor> {
|
||||
return describeHistoricSqlScope(stagedDir);
|
||||
}
|
||||
|
||||
async getTriageSignals(stagedDir: string, externalId: string): Promise<TriageSignals> {
|
||||
const manifest = historicSqlManifestSchema.parse(
|
||||
JSON.parse(await readFile(join(stagedDir, 'manifest.json'), 'utf-8')),
|
||||
);
|
||||
const template = manifest.templates.find((entry) => entry.id === externalId);
|
||||
if (!template) {
|
||||
return {};
|
||||
}
|
||||
const templateDir = template.path.replace(/\/page\.md$/, '');
|
||||
const metadata = historicSqlMetadataSchema.parse(
|
||||
JSON.parse(await readFile(join(stagedDir, templateDir, 'metadata.json'), 'utf-8')),
|
||||
);
|
||||
const usage = historicSqlUsageSchema.parse(
|
||||
JSON.parse(await readFile(join(stagedDir, templateDir, 'usage.json'), 'utf-8')),
|
||||
);
|
||||
|
||||
return {
|
||||
objectType: metadata.objectType,
|
||||
lastEditedAt: usage.stats.last_seen,
|
||||
propertyHints: metadata.properties.triage_signals,
|
||||
};
|
||||
}
|
||||
|
||||
async onPullSucceeded(ctx: {
|
||||
connectionId: string;
|
||||
sourceKey: string;
|
||||
syncId: string;
|
||||
trigger: IngestTrigger;
|
||||
completedAt: Date;
|
||||
stagedDir: string;
|
||||
}): Promise<void> {
|
||||
const manifest = historicSqlManifestSchema.parse(
|
||||
JSON.parse(await readFile(join(ctx.stagedDir, 'manifest.json'), 'utf-8')),
|
||||
);
|
||||
if (manifest.dialect === 'postgres') {
|
||||
const pending = this.pendingPgssBaselines.get(ctx.stagedDir);
|
||||
if (pending) {
|
||||
await writePgssBaselineAtomic(pending.baselinePath, pending.baseline);
|
||||
this.pendingPgssBaselines.delete(ctx.stagedDir);
|
||||
}
|
||||
}
|
||||
await this.deps.onPullSucceeded?.({ ...ctx, nextSuccessfulCursor: manifest.nextSuccessfulCursor });
|
||||
return describeHistoricSqlUnifiedScope(stagedDir);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,10 +28,17 @@ export const historicSqlUnifiedPullConfigSchema = z.preprocess((value) => {
|
|||
if (!isRecord(value)) {
|
||||
return value;
|
||||
}
|
||||
if (value.minExecutions === undefined && typeof value.minCalls === 'number') {
|
||||
return { ...value, minExecutions: value.minCalls };
|
||||
const next: Record<string, unknown> = { ...value };
|
||||
if (next.minExecutions === undefined && typeof next.minCalls === 'number') {
|
||||
next.minExecutions = next.minCalls;
|
||||
}
|
||||
return value;
|
||||
if (!next.filters && Array.isArray(next.serviceAccountUserPatterns)) {
|
||||
next.filters = {
|
||||
serviceAccounts: { patterns: next.serviceAccountUserPatterns, mode: 'exclude' },
|
||||
dropTrivialProbes: true,
|
||||
};
|
||||
}
|
||||
return next;
|
||||
}, z.object({
|
||||
dialect: historicSqlDialectSchema,
|
||||
windowDays: z.number().int().positive().default(90),
|
||||
|
|
@ -222,21 +229,10 @@ export interface PostgresPgssAggregateRow {
|
|||
|
||||
export interface HistoricSqlSourceAdapterDeps {
|
||||
sqlAnalysis: SqlAnalysisPort;
|
||||
reader: HistoricSqlQueryHistoryReader;
|
||||
reader: HistoricSqlReader;
|
||||
queryClient: unknown;
|
||||
postgresReader?: PostgresPgssReader;
|
||||
postgresQueryClient?: KtxPostgresQueryClient;
|
||||
postgresBaselineRootDir?: string;
|
||||
legacyPostgresBaselineRootDir?: string;
|
||||
now?: () => Date;
|
||||
onPullSucceeded?: (ctx: {
|
||||
connectionId: string;
|
||||
sourceKey: string;
|
||||
syncId: string;
|
||||
trigger: import('../../types.js').IngestTrigger;
|
||||
completedAt: Date;
|
||||
stagedDir: string;
|
||||
nextSuccessfulCursor: string | null;
|
||||
}) => Promise<void>;
|
||||
}
|
||||
|
||||
const historicSqlLiteralSlotClassificationSchema = z.enum(['constant', 'runtime', 'categorical']);
|
||||
|
|
|
|||
|
|
@ -152,11 +152,14 @@ describe('local ingest adapters', () => {
|
|||
await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).resolves.toEqual({
|
||||
dialect: 'postgres',
|
||||
windowDays: 90,
|
||||
lastSuccessfulCursor: null,
|
||||
serviceAccountUserPatterns: ['^svc_'],
|
||||
minExecutions: 7,
|
||||
concurrency: 12,
|
||||
filters: {
|
||||
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
|
||||
dropTrivialProbes: true,
|
||||
},
|
||||
redactionPatterns: [],
|
||||
maxTemplatesPerRun: 123,
|
||||
minCalls: 7,
|
||||
staleArchiveAfterDays: 90,
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -6,11 +6,10 @@ import type { SqlAnalysisPort } from '../sql-analysis/index.js';
|
|||
import { DbtSourceAdapter } from './adapters/dbt/dbt.adapter.js';
|
||||
import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
|
||||
import { HistoricSqlSourceAdapter } from './adapters/historic-sql/historic-sql.adapter.js';
|
||||
import { PostgresPgssQueryHistoryReader } from './adapters/historic-sql/postgres-pgss-query-history-reader.js';
|
||||
import { SnowflakeHistoricSqlQueryHistoryReader } from './adapters/historic-sql/snowflake-query-history-reader.js';
|
||||
import { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js';
|
||||
import {
|
||||
HISTORIC_SQL_SOURCE_KEY,
|
||||
historicSqlPullConfigSchema,
|
||||
historicSqlUnifiedPullConfigSchema,
|
||||
type KtxPostgresQueryClient,
|
||||
} from './adapters/historic-sql/types.js';
|
||||
import {
|
||||
|
|
@ -94,15 +93,9 @@ export function createDefaultLocalIngestAdapters(
|
|||
adapters.push(
|
||||
new HistoricSqlSourceAdapter({
|
||||
sqlAnalysis: options.historicSql.sqlAnalysis,
|
||||
reader: new SnowflakeHistoricSqlQueryHistoryReader(),
|
||||
queryClient: {
|
||||
executeQuery: async () => {
|
||||
throw new Error('Local historic-SQL currently supports Postgres pg_stat_statements only');
|
||||
},
|
||||
},
|
||||
postgresReader: new PostgresPgssQueryHistoryReader(),
|
||||
postgresQueryClient: options.historicSql.postgresQueryClient,
|
||||
postgresBaselineRootDir: options.historicSql.postgresBaselineRootDir,
|
||||
reader: new PostgresPgssReader(),
|
||||
queryClient: options.historicSql.postgresQueryClient,
|
||||
legacyPostgresBaselineRootDir: options.historicSql.postgresBaselineRootDir,
|
||||
now: options.historicSql.now,
|
||||
}),
|
||||
);
|
||||
|
|
@ -180,9 +173,8 @@ export async function localPullConfigForAdapter(
|
|||
if (historicSql?.enabled !== true) {
|
||||
throw new Error(`Connection "${connectionId}" does not have historicSql.enabled: true`);
|
||||
}
|
||||
return historicSqlPullConfigSchema.parse({
|
||||
return historicSqlUnifiedPullConfigSchema.parse({
|
||||
...historicSql,
|
||||
lastSuccessfulCursor: stringField(historicSql.lastSuccessfulCursor),
|
||||
});
|
||||
}
|
||||
if (adapter.source === 'looker') {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue