From df2eeaa96cbf9c040953a1f0362b8cd906c72095 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Tue, 12 May 2026 10:25:58 +0200 Subject: [PATCH] fix(cli): surface historic sql ingest progress (#18) Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> --- packages/cli/src/ingest.test.ts | 91 ++++++++++++++++++++++++ packages/cli/src/ingest.ts | 56 ++++++++++++--- packages/cli/src/setup-databases.test.ts | 1 + packages/cli/src/setup-databases.ts | 24 +++++-- 4 files changed, 160 insertions(+), 12 deletions(-) diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 9fc4dc82..3b580cc1 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -918,6 +918,97 @@ describe('runKtxIngest', () => { expect(io.stderr()).toBe(''); }); + it('prints plain WorkUnit step progress during long-running local ingest', async () => { + const projectDir = join(tempDir, 'historic-sql-step-progress-project'); + await mkdir(projectDir, { recursive: true }); + await writeFile( + join(projectDir, 'ktx.yaml'), + [ + 'project: historic-sql-step-progress-project', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' historicSql:', + ' enabled: true', + ' dialect: postgres', + ' minExecutions: 2', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + 'utf-8', + ); + const createdAdapters: SourceAdapter[] = [ + { source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) }, + ]; + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => { + input.memoryFlow?.update({ + plannedWorkUnits: [ + { + unitKey: 'historic-sql-table-public-orders', + rawFiles: ['tables/public/orders.json'], + peerFileCount: 0, + dependencyCount: 0, + }, + { + unitKey: 'historic-sql-table-public-customers', + rawFiles: ['tables/public/customers.json'], + peerFileCount: 0, + dependencyCount: 0, + }, + ], + }); + input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }); + input.memoryFlow?.emit({ + type: 'work_unit_started', + unitKey: 'historic-sql-table-public-orders', + skills: ['historic_sql_table_digest'], + stepBudget: 40, + }); + input.memoryFlow?.emit({ + type: 'work_unit_step', + unitKey: 'historic-sql-table-public-orders', + stepIndex: 7, + stepBudget: 40, + }); + input.memoryFlow?.emit({ + type: 'work_unit_finished', + unitKey: 'historic-sql-table-public-orders', + status: 'success', + }); + input.memoryFlow?.finish('done'); + return completedLocalBundleRun(input, input.jobId ?? 'historic-step-progress-job'); + }); + const io = makeIo({ isTTY: true }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'historic-sql', + outputMode: 'plain', + }, + io.io, + { + env: interactiveEnv(), + createAdapters: vi.fn(() => createdAdapters as never), + runLocalIngest: runLocal, + jobIdFactory: () => 'historic-step-progress-job', + }, + ), + ).resolves.toBe(0); + + const stdout = io.stdout(); + expect(stdout).toContain('[45%] Planned 2 work units'); + expect(stdout).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders'); + expect(stdout).toContain('[58%] Processing 1/2 work units: historic-sql-table-public-orders step 7/40'); + expect(stdout).toContain('[68%] Processed 1/2 work units'); + }); + it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index a580b3d5..39bf21bb 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -168,13 +168,37 @@ function formatDiffProgress(event: Extract event.type === 'work_unit_finished').length; +function workUnitEventsThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): MemoryFlowEvent[] { + return snapshot.events.slice(0, eventIndex + 1); +} + +function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { + return workUnitEventsThrough(snapshot, eventIndex).filter((event) => event.type === 'work_unit_finished').length; +} + +function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { + if (snapshot.plannedWorkUnits.length > 0) { + return snapshot.plannedWorkUnits.length; + } + const planEvent = workUnitEventsThrough(snapshot, eventIndex) + .filter((event) => event.type === 'chunks_planned') + .at(-1); + return planEvent?.workUnitCount ?? completedWorkUnitCountThrough(snapshot, eventIndex); +} + +function workUnitOrdinalThrough(snapshot: MemoryFlowReplayInput, eventIndex: number, unitKey: string): number { + const events = workUnitEventsThrough(snapshot, eventIndex); + const startedIndex = events.findIndex((event) => event.type === 'work_unit_started' && event.unitKey === unitKey); + if (startedIndex === -1) { + return completedWorkUnitCountThrough(snapshot, eventIndex) + 1; + } + return events.slice(0, startedIndex + 1).filter((event) => event.type === 'work_unit_started').length; } function plainIngestEventProgress( event: MemoryFlowEvent, snapshot: MemoryFlowReplayInput, + eventIndex: number, ): { percent: number; message: string } | null { switch (event.type) { case 'source_acquired': @@ -196,11 +220,27 @@ function plainIngestEventProgress( }; case 'stage_skipped': return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` }; - case 'work_unit_started': - return { percent: 55, message: `Processing ${event.unitKey}` }; + case 'work_unit_started': { + const total = plannedWorkUnitCountThrough(snapshot, eventIndex); + const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); + const progress = total > 0 ? `${ordinal}/${total} work units: ` : ''; + return { percent: 55, message: `Processing ${progress}${event.unitKey}` }; + } + case 'work_unit_step': { + const total = plannedWorkUnitCountThrough(snapshot, eventIndex); + const completed = completedWorkUnitCountThrough(snapshot, eventIndex); + const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); + const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0; + const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55; + const progress = total > 0 ? `${ordinal}/${total} work units: ` : ''; + return { + percent, + message: `Processing ${progress}${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`, + }; + } case 'work_unit_finished': { - const total = snapshot.plannedWorkUnits.length || completedWorkUnitCount(snapshot); - const completed = completedWorkUnitCount(snapshot); + const total = plannedWorkUnitCountThrough(snapshot, eventIndex); + const completed = completedWorkUnitCountThrough(snapshot, eventIndex); const percent = total > 0 ? 55 + Math.round((completed / total) * 25) : 80; return { percent, @@ -225,7 +265,6 @@ function plainIngestEventProgress( case 'report_created': return { percent: 98, message: `Created ingest report ${event.reportPath ?? event.runId}` }; case 'scope_detected': - case 'work_unit_step': case 'candidate_action': return null; } @@ -259,11 +298,12 @@ function createPlainIngestProgressRenderer( }, update(snapshot) { while (printedEvents < snapshot.events.length) { + const eventIndex = printedEvents; const event = snapshot.events[printedEvents++]; if (!event) { continue; } - const progress = plainIngestEventProgress(event, snapshot); + const progress = plainIngestEventProgress(event, snapshot, eventIndex); if (progress) { write(progress.percent, progress.message); } diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index 09b9d29f..a20df910 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -1295,6 +1295,7 @@ describe('setup databases step', () => { expect(config.connections.warehouse.historicSql).not.toHaveProperty('redactionPatterns'); expect(config.connections.warehouse.historicSql).not.toHaveProperty(legacyHistoricSqlServiceAccountPatternsKey); expect(config.ingest.adapters).toContain('historic-sql'); + expect(config.ingest.workUnits.maxConcurrency).toBe(6); expect(io.stdout()).toContain('Historic SQL probe...'); expect(io.stdout()).toContain('pg_stat_statements ready'); }); diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index bd554590..3d49f75b 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -14,6 +14,8 @@ import { runKtxScan } from './scan.js'; import { withSetupInterruptConfirmation } from './setup-interrupt.js'; import { writeProjectLocalSecretReference } from './setup-secrets.js'; +const HISTORIC_SQL_WORK_UNIT_MAX_CONCURRENCY = 6; + export type KtxSetupDatabaseDriver = | 'sqlite' | 'postgres' @@ -843,7 +845,7 @@ async function writeConnectionConfig(input: { ? (input.connection.historicSql as Record) : null; if (historicSql?.enabled === true) { - await ensureHistoricSqlAdapterEnabled(input.projectDir); + await ensureHistoricSqlIngestDefaults(input.projectDir); } } @@ -954,9 +956,19 @@ async function maybeConfigurePostgresSchemas(input: { return true; } -async function ensureHistoricSqlAdapterEnabled(projectDir: string): Promise { +async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); - if (project.config.ingest.adapters.includes('historic-sql')) { + const adapters = project.config.ingest.adapters.includes('historic-sql') + ? project.config.ingest.adapters + : [...project.config.ingest.adapters, 'historic-sql']; + const maxConcurrency = Math.max( + project.config.ingest.workUnits.maxConcurrency, + HISTORIC_SQL_WORK_UNIT_MAX_CONCURRENCY, + ); + if ( + adapters === project.config.ingest.adapters && + maxConcurrency === project.config.ingest.workUnits.maxConcurrency + ) { return; } await writeFile( @@ -965,7 +977,11 @@ async function ensureHistoricSqlAdapterEnabled(projectDir: string): Promise