From b981cabdc63306a630753aa242ea220f1cba15ea Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 11 May 2026 22:52:47 +0200 Subject: [PATCH] Include historic SQL projection in memory counts --- packages/cli/src/ingest.test.ts | 57 +++++++++++++++++ packages/cli/src/ingest.ts | 13 +--- .../src/ingest/ingest-bundle.runner.test.ts | 61 +++++++++++++++++++ .../src/ingest/ingest-bundle.runner.ts | 6 +- packages/context/src/ingest/reports.ts | 44 +++++++++++++ 5 files changed, 169 insertions(+), 12 deletions(-) diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 4e0d35e4..5f19c076 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -544,6 +544,63 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Diff: +2/~0/-0/=0\n'); }); + it('includes historic-sql projection output in saved memory counts', async () => { + const projectDir = join(tempDir, 'project'); + await writeWarehouseConfig(projectDir); + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => { + const result = completedLocalBundleRun(input, 'historic-sql-projection'); + return { + ...result, + report: localFakeBundleReport('historic-sql-projection', { + sourceKey: 'historic-sql', + body: { + workUnits: [], + postProcessor: { + sourceKey: 'historic-sql', + status: 'success', + result: { + tableUsageMerged: 56, + staleTablesMarked: 1, + patternPagesWritten: 30, + stalePatternPagesMarked: 2, + archivedPatternPages: 3, + legacyPagesDeleted: 4, + }, + errors: [], + warnings: [], + touchedSources: [], + }, + }, + }), + }; + }); + + const io = makeIo(); + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'historic-sql', + outputMode: 'plain', + }, + io.io, + { + runLocalIngest: runLocal, + createAdapters: vi.fn(() => [ + { source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) }, + ]), + jobIdFactory: () => 'historic-sql-projection', + }, + ), + ).resolves.toBe(0); + + expect(io.stderr()).toBe(''); + expect(io.stdout()).toContain('Adapter: historic-sql\n'); + expect(io.stdout()).toContain('Saved memory: 39 wiki, 57 SL\n'); + }); + it('returns a non-zero code when local ingest reports failed work units', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 5ce967a4..a580b3d5 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -14,6 +14,7 @@ import { renderMemoryFlowReplay, runLocalIngest, runLocalMetabaseIngest, + savedMemoryCountsForReport, } from '@ktx/context/ingest'; import { loadKtxProject } from '@ktx/context/project'; import { readIngestReportSnapshotFile } from './ingest-report-file.js'; @@ -89,16 +90,8 @@ function reportStatus(report: IngestReportSnapshot): 'done' | 'error' { return report.body.failedWorkUnits.length > 0 ? 'error' : 'done'; } -function reportActionCounts(report: IngestReportSnapshot): { wikiCount: number; slCount: number } { - const actions = report.body.workUnits.flatMap((workUnit) => workUnit.actions); - return { - wikiCount: actions.filter((action) => action.target === 'wiki').length, - slCount: actions.filter((action) => action.target === 'sl').length, - }; -} - function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void { - const counts = reportActionCounts(report); + const counts = savedMemoryCountsForReport(report); io.stdout.write(`Report: ${report.id}\n`); io.stdout.write(`Run: ${report.runId}\n`); io.stdout.write(`Job: ${report.jobId}\n`); @@ -117,7 +110,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIngestIo): void { const counts = result.children.reduce( (acc, child) => { - const childCounts = reportActionCounts(child.report); + const childCounts = savedMemoryCountsForReport(child.report); return { wikiCount: acc.wikiCount + childCounts.wikiCount, slCount: acc.slCount + childCounts.slCount, diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index 8b323a1d..ead6704d 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -1428,6 +1428,67 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success'); }); + it('includes historic-sql post-processor output in memory-flow saved counts', async () => { + const deps = makeDeps(); + deps.adapter.source = 'historic-sql'; + deps.registry.get.mockReturnValue(deps.adapter); + deps.adapter.chunk.mockResolvedValue({ + workUnits: [ + { + unitKey: 'historic-sql-table-public-orders', + rawFiles: ['tables/public/orders.json'], + peerFileIndex: [], + dependencyPaths: [], + }, + ], + }); + const postProcessor = { + run: vi.fn().mockResolvedValue({ + result: { + tableUsageMerged: 2, + staleTablesMarked: 1, + patternPagesWritten: 3, + stalePatternPagesMarked: 1, + archivedPatternPages: 1, + legacyPagesDeleted: 1, + }, + warnings: [], + errors: [], + touchedSources: [{ connectionId: 'c1', sourceName: 'orders' }], + }), + }; + const runner = buildRunner(deps, { postProcessors: { 'historic-sql': postProcessor } }); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([['tables/public/orders.json', 'h1']]), + rawDirInWorktree: 'raw-sources/c1/historic-sql/s', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); + const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput()); + + await runner.run( + { + jobId: 'j1', + connectionId: 'c1', + sourceKey: 'historic-sql', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }, + { + jobId: 'j1', + memoryFlow, + startPhase: () => new TestJobContext('j1', null, () => Promise.resolve(), () => Promise.resolve()), + }, + ); + + expect(memoryFlow.snapshot().events).toContainEqual( + expect.objectContaining({ + type: 'saved', + wikiCount: 6, + slCount: 3, + }), + ); + }); + it('marks post-processor infrastructure failure as failed and preserves worktree cleanup state', async () => { const deps = makeDeps(); deps.adapter.source = 'metricflow'; diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 0515842a..a226bdd0 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -15,6 +15,7 @@ import type { ContextEvidenceIndexSummary, IngestBundleRunnerDeps, PageTriageRun import { buildSyncId, rawSourcesDirForSync } from './raw-sources-paths.js'; import { buildStageIndexFromReportBody, + postProcessorSavedMemoryCounts, type IngestReportPostProcessorOutcome, type IngestReportSnapshot, } from './reports.js'; @@ -1111,11 +1112,12 @@ export class IngestBundleRunner { } const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha; const memoryFlowSavedActions = stageIndex.workUnits.flatMap((wu) => wu.actions).concat(reconcileActions); + const postProcessorMemoryCounts = postProcessorSavedMemoryCounts(postProcessorOutcome); memoryFlow?.emit({ type: 'saved', commitSha, - wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki'), - slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl'), + wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki') + postProcessorMemoryCounts.wikiCount, + slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl') + postProcessorMemoryCounts.slCount, }); await stage6?.updateProgress(1.0, commitSha ? `Saved changes (${commitSha.slice(0, 8)})` : 'No changes to save'); diff --git a/packages/context/src/ingest/reports.ts b/packages/context/src/ingest/reports.ts index 7cf4418a..6f60f149 100644 --- a/packages/context/src/ingest/reports.ts +++ b/packages/context/src/ingest/reports.ts @@ -79,6 +79,50 @@ export interface IngestReportSnapshot { createdAt: string; } +export interface IngestSavedMemoryCounts { + wikiCount: number; + slCount: number; +} + +function numericResultField(result: Record, field: string): number { + const value = result[field]; + return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : 0; +} + +export function postProcessorSavedMemoryCounts( + postProcessor: IngestReportPostProcessorOutcome | undefined, +): IngestSavedMemoryCounts { + if (!postProcessor || postProcessor.sourceKey !== 'historic-sql') { + return { wikiCount: 0, slCount: 0 }; + } + const result = postProcessor.result; + if (!result || typeof result !== 'object' || Array.isArray(result)) { + return { wikiCount: 0, slCount: 0 }; + } + const record = result as Record; + return { + wikiCount: + numericResultField(record, 'patternPagesWritten') + + numericResultField(record, 'stalePatternPagesMarked') + + numericResultField(record, 'archivedPatternPages') + + numericResultField(record, 'legacyPagesDeleted'), + slCount: numericResultField(record, 'tableUsageMerged') + numericResultField(record, 'staleTablesMarked'), + }; +} + +export function savedMemoryCountsForReport(report: IngestReportSnapshot): IngestSavedMemoryCounts { + const actions = report.body.workUnits.flatMap((workUnit) => workUnit.actions); + const directCounts = { + wikiCount: actions.filter((action) => action.target === 'wiki').length, + slCount: actions.filter((action) => action.target === 'sl').length, + }; + const postProcessorCounts = postProcessorSavedMemoryCounts(report.body.postProcessor); + return { + wikiCount: directCounts.wikiCount + postProcessorCounts.wikiCount, + slCount: directCounts.slCount + postProcessorCounts.slCount, + }; +} + export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex { return { jobId,