fix(cli): surface historic sql ingest progress

This commit is contained in:
Andrey Avtomonov 2026-05-12 01:04:45 +02:00
parent 61ed276b44
commit 23ecdff2cd
4 changed files with 160 additions and 12 deletions

View file

@ -918,6 +918,97 @@ describe('runKtxIngest', () => {
expect(io.stderr()).toBe('');
});
it('prints plain WorkUnit step progress during long-running local ingest', async () => {
const projectDir = join(tempDir, 'historic-sql-step-progress-project');
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: historic-sql-step-progress-project',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' historicSql:',
' enabled: true',
' dialect: postgres',
' minExecutions: 2',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
'utf-8',
);
const createdAdapters: SourceAdapter[] = [
{ source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
];
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
input.memoryFlow?.update({
plannedWorkUnits: [
{
unitKey: 'historic-sql-table-public-orders',
rawFiles: ['tables/public/orders.json'],
peerFileCount: 0,
dependencyCount: 0,
},
{
unitKey: 'historic-sql-table-public-customers',
rawFiles: ['tables/public/customers.json'],
peerFileCount: 0,
dependencyCount: 0,
},
],
});
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 });
input.memoryFlow?.emit({
type: 'work_unit_started',
unitKey: 'historic-sql-table-public-orders',
skills: ['historic_sql_table_digest'],
stepBudget: 40,
});
input.memoryFlow?.emit({
type: 'work_unit_step',
unitKey: 'historic-sql-table-public-orders',
stepIndex: 7,
stepBudget: 40,
});
input.memoryFlow?.emit({
type: 'work_unit_finished',
unitKey: 'historic-sql-table-public-orders',
status: 'success',
});
input.memoryFlow?.finish('done');
return completedLocalBundleRun(input, input.jobId ?? 'historic-step-progress-job');
});
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'historic-sql',
outputMode: 'plain',
},
io.io,
{
env: interactiveEnv(),
createAdapters: vi.fn(() => createdAdapters as never),
runLocalIngest: runLocal,
jobIdFactory: () => 'historic-step-progress-job',
},
),
).resolves.toBe(0);
const stdout = io.stdout();
expect(stdout).toContain('[45%] Planned 2 work units');
expect(stdout).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders');
expect(stdout).toContain('[58%] Processing 1/2 work units: historic-sql-table-public-orders step 7/40');
expect(stdout).toContain('[68%] Processed 1/2 work units');
});
it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -168,13 +168,37 @@ function formatDiffProgress(event: Extract<MemoryFlowEvent, { type: 'diff_comput
return `+${event.added}/~${event.modified}/-${event.deleted}/=${event.unchanged}`;
}
function completedWorkUnitCount(snapshot: MemoryFlowReplayInput): number {
return snapshot.events.filter((event) => event.type === 'work_unit_finished').length;
function workUnitEventsThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): MemoryFlowEvent[] {
return snapshot.events.slice(0, eventIndex + 1);
}
function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
return workUnitEventsThrough(snapshot, eventIndex).filter((event) => event.type === 'work_unit_finished').length;
}
function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
if (snapshot.plannedWorkUnits.length > 0) {
return snapshot.plannedWorkUnits.length;
}
const planEvent = workUnitEventsThrough(snapshot, eventIndex)
.filter((event) => event.type === 'chunks_planned')
.at(-1);
return planEvent?.workUnitCount ?? completedWorkUnitCountThrough(snapshot, eventIndex);
}
function workUnitOrdinalThrough(snapshot: MemoryFlowReplayInput, eventIndex: number, unitKey: string): number {
const events = workUnitEventsThrough(snapshot, eventIndex);
const startedIndex = events.findIndex((event) => event.type === 'work_unit_started' && event.unitKey === unitKey);
if (startedIndex === -1) {
return completedWorkUnitCountThrough(snapshot, eventIndex) + 1;
}
return events.slice(0, startedIndex + 1).filter((event) => event.type === 'work_unit_started').length;
}
function plainIngestEventProgress(
event: MemoryFlowEvent,
snapshot: MemoryFlowReplayInput,
eventIndex: number,
): { percent: number; message: string } | null {
switch (event.type) {
case 'source_acquired':
@ -196,11 +220,27 @@ function plainIngestEventProgress(
};
case 'stage_skipped':
return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` };
case 'work_unit_started':
return { percent: 55, message: `Processing ${event.unitKey}` };
case 'work_unit_started': {
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
const progress = total > 0 ? `${ordinal}/${total} work units: ` : '';
return { percent: 55, message: `Processing ${progress}${event.unitKey}` };
}
case 'work_unit_step': {
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const completed = completedWorkUnitCountThrough(snapshot, eventIndex);
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0;
const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55;
const progress = total > 0 ? `${ordinal}/${total} work units: ` : '';
return {
percent,
message: `Processing ${progress}${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`,
};
}
case 'work_unit_finished': {
const total = snapshot.plannedWorkUnits.length || completedWorkUnitCount(snapshot);
const completed = completedWorkUnitCount(snapshot);
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const completed = completedWorkUnitCountThrough(snapshot, eventIndex);
const percent = total > 0 ? 55 + Math.round((completed / total) * 25) : 80;
return {
percent,
@ -225,7 +265,6 @@ function plainIngestEventProgress(
case 'report_created':
return { percent: 98, message: `Created ingest report ${event.reportPath ?? event.runId}` };
case 'scope_detected':
case 'work_unit_step':
case 'candidate_action':
return null;
}
@ -259,11 +298,12 @@ function createPlainIngestProgressRenderer(
},
update(snapshot) {
while (printedEvents < snapshot.events.length) {
const eventIndex = printedEvents;
const event = snapshot.events[printedEvents++];
if (!event) {
continue;
}
const progress = plainIngestEventProgress(event, snapshot);
const progress = plainIngestEventProgress(event, snapshot, eventIndex);
if (progress) {
write(progress.percent, progress.message);
}

View file

@ -1295,6 +1295,7 @@ describe('setup databases step', () => {
expect(config.connections.warehouse.historicSql).not.toHaveProperty('redactionPatterns');
expect(config.connections.warehouse.historicSql).not.toHaveProperty(legacyHistoricSqlServiceAccountPatternsKey);
expect(config.ingest.adapters).toContain('historic-sql');
expect(config.ingest.workUnits.maxConcurrency).toBe(6);
expect(io.stdout()).toContain('Historic SQL probe...');
expect(io.stdout()).toContain('pg_stat_statements ready');
});

View file

@ -14,6 +14,8 @@ import { runKtxScan } from './scan.js';
import { withSetupInterruptConfirmation } from './setup-interrupt.js';
import { writeProjectLocalSecretReference } from './setup-secrets.js';
const HISTORIC_SQL_WORK_UNIT_MAX_CONCURRENCY = 6;
export type KtxSetupDatabaseDriver =
| 'sqlite'
| 'postgres'
@ -843,7 +845,7 @@ async function writeConnectionConfig(input: {
? (input.connection.historicSql as Record<string, unknown>)
: null;
if (historicSql?.enabled === true) {
await ensureHistoricSqlAdapterEnabled(input.projectDir);
await ensureHistoricSqlIngestDefaults(input.projectDir);
}
}
@ -954,9 +956,19 @@ async function maybeConfigurePostgresSchemas(input: {
return true;
}
async function ensureHistoricSqlAdapterEnabled(projectDir: string): Promise<void> {
async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise<void> {
const project = await loadKtxProject({ projectDir });
if (project.config.ingest.adapters.includes('historic-sql')) {
const adapters = project.config.ingest.adapters.includes('historic-sql')
? project.config.ingest.adapters
: [...project.config.ingest.adapters, 'historic-sql'];
const maxConcurrency = Math.max(
project.config.ingest.workUnits.maxConcurrency,
HISTORIC_SQL_WORK_UNIT_MAX_CONCURRENCY,
);
if (
adapters === project.config.ingest.adapters &&
maxConcurrency === project.config.ingest.workUnits.maxConcurrency
) {
return;
}
await writeFile(
@ -965,7 +977,11 @@ async function ensureHistoricSqlAdapterEnabled(projectDir: string): Promise<void
...project.config,
ingest: {
...project.config.ingest,
adapters: [...project.config.ingest.adapters, 'historic-sql'],
adapters,
workUnits: {
...project.config.ingest.workUnits,
maxConcurrency,
},
},
}),
'utf-8',