From b03eb27aaebc5ab4bdb8a1e7b114386e29095ae5 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 19:40:05 +0200 Subject: [PATCH] test: assert unified postgres historic sql smoke --- examples/postgres-historic/scripts/smoke.sh | 136 ++++++++++++++------ 1 file changed, 100 insertions(+), 36 deletions(-) diff --git a/examples/postgres-historic/scripts/smoke.sh b/examples/postgres-historic/scripts/smoke.sh index 488535a4..d7e9b836 100755 --- a/examples/postgres-historic/scripts/smoke.sh +++ b/examples/postgres-historic/scripts/smoke.sh @@ -8,6 +8,7 @@ COMPOSE_FILE="$EXAMPLE_DIR/docker-compose.yml" PROJECT_PARENT="${KTX_POSTGRES_HISTORIC_PROJECT_PARENT:-$(mktemp -d)}" PROJECT_DIR="$PROJECT_PARENT/postgres-historic-ktx" KTX_BIN="$KTX_ROOT/packages/cli/dist/bin.js" +MAX_STAGE_SECONDS="${KTX_POSTGRES_HISTORIC_MAX_STAGE_SECONDS:-60}" export KTX_RUNTIME_ROOT="$PROJECT_PARENT/managed-runtime" unset KTX_DAEMON_URL unset KTX_SQL_ANALYSIS_URL @@ -26,35 +27,99 @@ latest_manifest() { find "$PROJECT_DIR/raw-sources/warehouse/historic-sql" -name manifest.json | sort | tail -n 1 } -assert_manifest() { +assert_unified_snapshot() { local manifest_path="$1" - local expected_first_run="$2" - node - "$manifest_path" "$expected_first_run" <<'NODE' -const { readFileSync } = require('node:fs'); + node - "$manifest_path" <<'NODE' +const { dirname, join } = require('node:path'); +const { readFileSync, readdirSync } = require('node:fs'); + const manifestPath = process.argv[2]; -const expectedFirstRun = process.argv[3] === 'true'; const manifest = JSON.parse(readFileSync(manifestPath, 'utf8')); -if (manifest.dialect !== 'postgres') throw new Error(`Expected dialect postgres, got ${manifest.dialect}`); -if (manifest.degraded !== true) throw new Error('Expected degraded:true for Postgres PGSS v1'); -if (manifest.baselineFirstRun !== expectedFirstRun) { - throw new Error(`Expected baselineFirstRun:${expectedFirstRun}, got ${manifest.baselineFirstRun}`); +function assert(condition, message) { + if (!condition) throw new Error(message); } -if (!manifest.pgServerVersion) throw new Error('Expected pgServerVersion'); -if (!manifest.statsResetAt) throw new Error('Expected statsResetAt'); -if (!Array.isArray(manifest.templates) || manifest.templates.length === 0) { - throw new Error('Expected at least one staged historic-SQL template'); + +assert(manifest.source === 'historic-sql', `Expected source historic-sql, got ${manifest.source}`); +assert(manifest.dialect === 'postgres', `Expected dialect postgres, got ${manifest.dialect}`); +assert(Number.isInteger(manifest.snapshotRowCount) && manifest.snapshotRowCount > 0, 'Expected snapshotRowCount > 0'); +assert(Number.isInteger(manifest.touchedTableCount) && manifest.touchedTableCount > 0, 'Expected touchedTableCount > 0'); +assert(Number.isInteger(manifest.parseFailures), 'Expected numeric parseFailures'); +assert(Array.isArray(manifest.warnings), 'Expected warnings array'); +assert(Array.isArray(manifest.probeWarnings), 'Expected probeWarnings array'); +for (const legacyKey of ['degraded', 'baselineFirstRun', 'pgServerVersion', 'statsResetAt', 'templates']) { + assert(!(legacyKey in manifest), `Legacy manifest key is still present: ${legacyKey}`); } + +const root = dirname(manifestPath); +const tableDir = join(root, 'tables'); +const tableFiles = readdirSync(tableDir).filter((file) => file.endsWith('.json')).sort(); +assert(tableFiles.length === manifest.touchedTableCount, `Expected ${manifest.touchedTableCount} table files, got ${tableFiles.length}`); + +const firstTable = JSON.parse(readFileSync(join(tableDir, tableFiles[0]), 'utf8')); +assert(typeof firstTable.table === 'string' && firstTable.table.length > 0, 'Expected staged table name'); +assert(firstTable.stats && typeof firstTable.stats.executionsBucket === 'string', 'Expected bucketed table stats'); +assert(firstTable.columnsByClause && typeof firstTable.columnsByClause === 'object', 'Expected columnsByClause object'); +assert(Array.isArray(firstTable.observedJoins), 'Expected observedJoins array'); +assert(Array.isArray(firstTable.topTemplates) && firstTable.topTemplates.length > 0, 'Expected topTemplates'); + +const patterns = JSON.parse(readFileSync(join(root, 'patterns-input.json'), 'utf8')); +assert(Array.isArray(patterns.templates) && patterns.templates.length > 0, 'Expected patterns-input templates'); +assert( + patterns.templates.every((template) => Array.isArray(template.tablesTouched) && template.tablesTouched.length > 0), + 'Expected every pattern template to have touched tables', +); +NODE +} + +assert_stage_record() { + local record_path="$1" + local label="$2" + local expected_work_units="$3" + node - "$record_path" "$label" "$expected_work_units" "$MAX_STAGE_SECONDS" <<'NODE' +const { readFileSync } = require('node:fs'); + +const record = JSON.parse(readFileSync(process.argv[2], 'utf8')); +const label = process.argv[3]; +const expectedWorkUnits = process.argv[4]; +const maxSeconds = Number(process.argv[5]); +function assert(condition, message) { + if (!condition) throw new Error(message); +} + +assert(record.status === 'done', `${label}: expected status done, got ${record.status}`); +assert(record.adapter === 'historic-sql', `${label}: expected historic-sql adapter`); +assert(record.connectionId === 'warehouse', `${label}: expected warehouse connection`); +assert(record.rawFileCount >= 3, `${label}: expected manifest, patterns input, and at least one table file`); +assert(Array.isArray(record.errors) && record.errors.length === 0, `${label}: expected no errors`); + +if (expectedWorkUnits === 'zero') { + assert(record.workUnitCount === 0, `${label}: expected zero WorkUnits, got ${record.workUnitCount}`); + assert(Array.isArray(record.workUnits) && record.workUnits.length === 0, `${label}: expected empty workUnits`); +} else if (expectedWorkUnits === 'nonzero') { + assert(record.workUnitCount > 0, `${label}: expected nonzero WorkUnits`); + assert(record.workUnits.some((unit) => unit.unitKey === 'historic-sql-patterns'), `${label}: expected patterns WorkUnit`); + assert(record.workUnits.some((unit) => unit.unitKey.startsWith('historic-sql-table-')), `${label}: expected table WorkUnit`); +} else { + throw new Error(`${label}: unknown expected work unit mode ${expectedWorkUnits}`); +} + +const elapsedMs = Date.parse(record.completedAt) - Date.parse(record.startedAt); +assert(Number.isFinite(elapsedMs) && elapsedMs >= 0, `${label}: invalid elapsed time`); +assert(elapsedMs <= maxSeconds * 1000, `${label}: stage-only ingest took ${elapsedMs}ms, over ${maxSeconds}s`); NODE } run_historic_stage_only() { local job_id="$1" - node - "$KTX_ROOT" "$PROJECT_DIR" "$job_id" <<'NODE' + local record_path="$2" + node - "$KTX_ROOT" "$PROJECT_DIR" "$job_id" "$record_path" <<'NODE' +const { writeFile } = await import('node:fs/promises'); const { join } = await import('node:path'); const ktxRoot = process.argv[2]; const projectDir = process.argv[3]; const jobId = process.argv[4]; +const recordPath = process.argv[5]; const { loadKtxProject } = await import(join(ktxRoot, 'packages/context/dist/project/index.js')); const { runLocalStageOnlyIngest } = await import(join(ktxRoot, 'packages/context/dist/ingest/index.js')); const { createKtxCliLocalIngestAdapters } = await import(join(ktxRoot, 'packages/cli/dist/local-adapters.js')); @@ -81,15 +146,8 @@ const record = await runLocalStageOnlyIngest({ trigger: 'manual_resync', jobId, }); -await adapter.onPullSucceeded?.({ - connectionId: 'warehouse', - sourceKey: 'historic-sql', - syncId: record.syncId, - trigger: 'manual_resync', - completedAt: new Date(record.completedAt), - stagedDir: join(project.projectDir, '.ktx/cache/local-ingest', jobId, 'staged'), -}); -console.log(record.syncId); +await writeFile(recordPath, `${JSON.stringify(record, null, 2)}\n`, 'utf8'); +console.log(`${record.syncId} workUnits=${record.workUnitCount}`); NODE } @@ -112,25 +170,31 @@ node "$KTX_BIN" --project-dir "$PROJECT_DIR" setup \ --database-url env:WAREHOUSE_DATABASE_URL \ --database-schema public \ --enable-historic-sql \ - --historic-sql-min-calls 2 \ + --historic-sql-min-executions 2 \ --yes \ --no-input -run_historic_stage_only "historic-first-$$" +node "$KTX_BIN" runtime install --yes +node "$KTX_BIN" runtime start + +FIRST_RECORD="$PROJECT_PARENT/first-record.json" +run_historic_stage_only "historic-first-$$" "$FIRST_RECORD" FIRST_MANIFEST="$(latest_manifest)" -assert_manifest "$FIRST_MANIFEST" true +assert_unified_snapshot "$FIRST_MANIFEST" +assert_stage_record "$FIRST_RECORD" first nonzero + +UNCHANGED_RECORD="$PROJECT_PARENT/unchanged-record.json" +run_historic_stage_only "historic-unchanged-$$" "$UNCHANGED_RECORD" +UNCHANGED_MANIFEST="$(latest_manifest)" +assert_unified_snapshot "$UNCHANGED_MANIFEST" +assert_stage_record "$UNCHANGED_RECORD" unchanged zero "$EXAMPLE_DIR/scripts/generate-workload.sh" extra -run_historic_stage_only "historic-second-$$" -SECOND_MANIFEST="$(latest_manifest)" -assert_manifest "$SECOND_MANIFEST" false - -docker compose -f "$COMPOSE_FILE" exec -T postgres \ - psql -U postgres -d analytics -v ON_ERROR_STOP=1 -c "SELECT pg_stat_statements_reset();" >/dev/null -"$EXAMPLE_DIR/scripts/generate-workload.sh" extra -run_historic_stage_only "historic-reset-$$" -RESET_MANIFEST="$(latest_manifest)" -assert_manifest "$RESET_MANIFEST" true +CHANGED_RECORD="$PROJECT_PARENT/changed-record.json" +run_historic_stage_only "historic-changed-$$" "$CHANGED_RECORD" +CHANGED_MANIFEST="$(latest_manifest)" +assert_unified_snapshot "$CHANGED_MANIFEST" +assert_stage_record "$CHANGED_RECORD" changed nonzero echo "Postgres historic SQL smoke passed" echo "Project dir: $PROJECT_DIR"