mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
test: assert unified postgres historic sql smoke
This commit is contained in:
parent
598149b6a4
commit
b03eb27aae
1 changed files with 100 additions and 36 deletions
|
|
@ -8,6 +8,7 @@ COMPOSE_FILE="$EXAMPLE_DIR/docker-compose.yml"
|
|||
PROJECT_PARENT="${KTX_POSTGRES_HISTORIC_PROJECT_PARENT:-$(mktemp -d)}"
|
||||
PROJECT_DIR="$PROJECT_PARENT/postgres-historic-ktx"
|
||||
KTX_BIN="$KTX_ROOT/packages/cli/dist/bin.js"
|
||||
MAX_STAGE_SECONDS="${KTX_POSTGRES_HISTORIC_MAX_STAGE_SECONDS:-60}"
|
||||
export KTX_RUNTIME_ROOT="$PROJECT_PARENT/managed-runtime"
|
||||
unset KTX_DAEMON_URL
|
||||
unset KTX_SQL_ANALYSIS_URL
|
||||
|
|
@ -26,35 +27,99 @@ latest_manifest() {
|
|||
find "$PROJECT_DIR/raw-sources/warehouse/historic-sql" -name manifest.json | sort | tail -n 1
|
||||
}
|
||||
|
||||
assert_manifest() {
|
||||
assert_unified_snapshot() {
|
||||
local manifest_path="$1"
|
||||
local expected_first_run="$2"
|
||||
node - "$manifest_path" "$expected_first_run" <<'NODE'
|
||||
const { readFileSync } = require('node:fs');
|
||||
node - "$manifest_path" <<'NODE'
|
||||
const { dirname, join } = require('node:path');
|
||||
const { readFileSync, readdirSync } = require('node:fs');
|
||||
|
||||
const manifestPath = process.argv[2];
|
||||
const expectedFirstRun = process.argv[3] === 'true';
|
||||
const manifest = JSON.parse(readFileSync(manifestPath, 'utf8'));
|
||||
if (manifest.dialect !== 'postgres') throw new Error(`Expected dialect postgres, got ${manifest.dialect}`);
|
||||
if (manifest.degraded !== true) throw new Error('Expected degraded:true for Postgres PGSS v1');
|
||||
if (manifest.baselineFirstRun !== expectedFirstRun) {
|
||||
throw new Error(`Expected baselineFirstRun:${expectedFirstRun}, got ${manifest.baselineFirstRun}`);
|
||||
function assert(condition, message) {
|
||||
if (!condition) throw new Error(message);
|
||||
}
|
||||
if (!manifest.pgServerVersion) throw new Error('Expected pgServerVersion');
|
||||
if (!manifest.statsResetAt) throw new Error('Expected statsResetAt');
|
||||
if (!Array.isArray(manifest.templates) || manifest.templates.length === 0) {
|
||||
throw new Error('Expected at least one staged historic-SQL template');
|
||||
|
||||
assert(manifest.source === 'historic-sql', `Expected source historic-sql, got ${manifest.source}`);
|
||||
assert(manifest.dialect === 'postgres', `Expected dialect postgres, got ${manifest.dialect}`);
|
||||
assert(Number.isInteger(manifest.snapshotRowCount) && manifest.snapshotRowCount > 0, 'Expected snapshotRowCount > 0');
|
||||
assert(Number.isInteger(manifest.touchedTableCount) && manifest.touchedTableCount > 0, 'Expected touchedTableCount > 0');
|
||||
assert(Number.isInteger(manifest.parseFailures), 'Expected numeric parseFailures');
|
||||
assert(Array.isArray(manifest.warnings), 'Expected warnings array');
|
||||
assert(Array.isArray(manifest.probeWarnings), 'Expected probeWarnings array');
|
||||
for (const legacyKey of ['degraded', 'baselineFirstRun', 'pgServerVersion', 'statsResetAt', 'templates']) {
|
||||
assert(!(legacyKey in manifest), `Legacy manifest key is still present: ${legacyKey}`);
|
||||
}
|
||||
|
||||
const root = dirname(manifestPath);
|
||||
const tableDir = join(root, 'tables');
|
||||
const tableFiles = readdirSync(tableDir).filter((file) => file.endsWith('.json')).sort();
|
||||
assert(tableFiles.length === manifest.touchedTableCount, `Expected ${manifest.touchedTableCount} table files, got ${tableFiles.length}`);
|
||||
|
||||
const firstTable = JSON.parse(readFileSync(join(tableDir, tableFiles[0]), 'utf8'));
|
||||
assert(typeof firstTable.table === 'string' && firstTable.table.length > 0, 'Expected staged table name');
|
||||
assert(firstTable.stats && typeof firstTable.stats.executionsBucket === 'string', 'Expected bucketed table stats');
|
||||
assert(firstTable.columnsByClause && typeof firstTable.columnsByClause === 'object', 'Expected columnsByClause object');
|
||||
assert(Array.isArray(firstTable.observedJoins), 'Expected observedJoins array');
|
||||
assert(Array.isArray(firstTable.topTemplates) && firstTable.topTemplates.length > 0, 'Expected topTemplates');
|
||||
|
||||
const patterns = JSON.parse(readFileSync(join(root, 'patterns-input.json'), 'utf8'));
|
||||
assert(Array.isArray(patterns.templates) && patterns.templates.length > 0, 'Expected patterns-input templates');
|
||||
assert(
|
||||
patterns.templates.every((template) => Array.isArray(template.tablesTouched) && template.tablesTouched.length > 0),
|
||||
'Expected every pattern template to have touched tables',
|
||||
);
|
||||
NODE
|
||||
}
|
||||
|
||||
assert_stage_record() {
|
||||
local record_path="$1"
|
||||
local label="$2"
|
||||
local expected_work_units="$3"
|
||||
node - "$record_path" "$label" "$expected_work_units" "$MAX_STAGE_SECONDS" <<'NODE'
|
||||
const { readFileSync } = require('node:fs');
|
||||
|
||||
const record = JSON.parse(readFileSync(process.argv[2], 'utf8'));
|
||||
const label = process.argv[3];
|
||||
const expectedWorkUnits = process.argv[4];
|
||||
const maxSeconds = Number(process.argv[5]);
|
||||
function assert(condition, message) {
|
||||
if (!condition) throw new Error(message);
|
||||
}
|
||||
|
||||
assert(record.status === 'done', `${label}: expected status done, got ${record.status}`);
|
||||
assert(record.adapter === 'historic-sql', `${label}: expected historic-sql adapter`);
|
||||
assert(record.connectionId === 'warehouse', `${label}: expected warehouse connection`);
|
||||
assert(record.rawFileCount >= 3, `${label}: expected manifest, patterns input, and at least one table file`);
|
||||
assert(Array.isArray(record.errors) && record.errors.length === 0, `${label}: expected no errors`);
|
||||
|
||||
if (expectedWorkUnits === 'zero') {
|
||||
assert(record.workUnitCount === 0, `${label}: expected zero WorkUnits, got ${record.workUnitCount}`);
|
||||
assert(Array.isArray(record.workUnits) && record.workUnits.length === 0, `${label}: expected empty workUnits`);
|
||||
} else if (expectedWorkUnits === 'nonzero') {
|
||||
assert(record.workUnitCount > 0, `${label}: expected nonzero WorkUnits`);
|
||||
assert(record.workUnits.some((unit) => unit.unitKey === 'historic-sql-patterns'), `${label}: expected patterns WorkUnit`);
|
||||
assert(record.workUnits.some((unit) => unit.unitKey.startsWith('historic-sql-table-')), `${label}: expected table WorkUnit`);
|
||||
} else {
|
||||
throw new Error(`${label}: unknown expected work unit mode ${expectedWorkUnits}`);
|
||||
}
|
||||
|
||||
const elapsedMs = Date.parse(record.completedAt) - Date.parse(record.startedAt);
|
||||
assert(Number.isFinite(elapsedMs) && elapsedMs >= 0, `${label}: invalid elapsed time`);
|
||||
assert(elapsedMs <= maxSeconds * 1000, `${label}: stage-only ingest took ${elapsedMs}ms, over ${maxSeconds}s`);
|
||||
NODE
|
||||
}
|
||||
|
||||
run_historic_stage_only() {
|
||||
local job_id="$1"
|
||||
node - "$KTX_ROOT" "$PROJECT_DIR" "$job_id" <<'NODE'
|
||||
local record_path="$2"
|
||||
node - "$KTX_ROOT" "$PROJECT_DIR" "$job_id" "$record_path" <<'NODE'
|
||||
const { writeFile } = await import('node:fs/promises');
|
||||
const { join } = await import('node:path');
|
||||
|
||||
const ktxRoot = process.argv[2];
|
||||
const projectDir = process.argv[3];
|
||||
const jobId = process.argv[4];
|
||||
const recordPath = process.argv[5];
|
||||
const { loadKtxProject } = await import(join(ktxRoot, 'packages/context/dist/project/index.js'));
|
||||
const { runLocalStageOnlyIngest } = await import(join(ktxRoot, 'packages/context/dist/ingest/index.js'));
|
||||
const { createKtxCliLocalIngestAdapters } = await import(join(ktxRoot, 'packages/cli/dist/local-adapters.js'));
|
||||
|
|
@ -81,15 +146,8 @@ const record = await runLocalStageOnlyIngest({
|
|||
trigger: 'manual_resync',
|
||||
jobId,
|
||||
});
|
||||
await adapter.onPullSucceeded?.({
|
||||
connectionId: 'warehouse',
|
||||
sourceKey: 'historic-sql',
|
||||
syncId: record.syncId,
|
||||
trigger: 'manual_resync',
|
||||
completedAt: new Date(record.completedAt),
|
||||
stagedDir: join(project.projectDir, '.ktx/cache/local-ingest', jobId, 'staged'),
|
||||
});
|
||||
console.log(record.syncId);
|
||||
await writeFile(recordPath, `${JSON.stringify(record, null, 2)}\n`, 'utf8');
|
||||
console.log(`${record.syncId} workUnits=${record.workUnitCount}`);
|
||||
NODE
|
||||
}
|
||||
|
||||
|
|
@ -112,25 +170,31 @@ node "$KTX_BIN" --project-dir "$PROJECT_DIR" setup \
|
|||
--database-url env:WAREHOUSE_DATABASE_URL \
|
||||
--database-schema public \
|
||||
--enable-historic-sql \
|
||||
--historic-sql-min-calls 2 \
|
||||
--historic-sql-min-executions 2 \
|
||||
--yes \
|
||||
--no-input
|
||||
|
||||
run_historic_stage_only "historic-first-$$"
|
||||
node "$KTX_BIN" runtime install --yes
|
||||
node "$KTX_BIN" runtime start
|
||||
|
||||
FIRST_RECORD="$PROJECT_PARENT/first-record.json"
|
||||
run_historic_stage_only "historic-first-$$" "$FIRST_RECORD"
|
||||
FIRST_MANIFEST="$(latest_manifest)"
|
||||
assert_manifest "$FIRST_MANIFEST" true
|
||||
assert_unified_snapshot "$FIRST_MANIFEST"
|
||||
assert_stage_record "$FIRST_RECORD" first nonzero
|
||||
|
||||
UNCHANGED_RECORD="$PROJECT_PARENT/unchanged-record.json"
|
||||
run_historic_stage_only "historic-unchanged-$$" "$UNCHANGED_RECORD"
|
||||
UNCHANGED_MANIFEST="$(latest_manifest)"
|
||||
assert_unified_snapshot "$UNCHANGED_MANIFEST"
|
||||
assert_stage_record "$UNCHANGED_RECORD" unchanged zero
|
||||
|
||||
"$EXAMPLE_DIR/scripts/generate-workload.sh" extra
|
||||
run_historic_stage_only "historic-second-$$"
|
||||
SECOND_MANIFEST="$(latest_manifest)"
|
||||
assert_manifest "$SECOND_MANIFEST" false
|
||||
|
||||
docker compose -f "$COMPOSE_FILE" exec -T postgres \
|
||||
psql -U postgres -d analytics -v ON_ERROR_STOP=1 -c "SELECT pg_stat_statements_reset();" >/dev/null
|
||||
"$EXAMPLE_DIR/scripts/generate-workload.sh" extra
|
||||
run_historic_stage_only "historic-reset-$$"
|
||||
RESET_MANIFEST="$(latest_manifest)"
|
||||
assert_manifest "$RESET_MANIFEST" true
|
||||
CHANGED_RECORD="$PROJECT_PARENT/changed-record.json"
|
||||
run_historic_stage_only "historic-changed-$$" "$CHANGED_RECORD"
|
||||
CHANGED_MANIFEST="$(latest_manifest)"
|
||||
assert_unified_snapshot "$CHANGED_MANIFEST"
|
||||
assert_stage_record "$CHANGED_RECORD" changed nonzero
|
||||
|
||||
echo "Postgres historic SQL smoke passed"
|
||||
echo "Project dir: $PROJECT_DIR"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue