Include historic SQL projection in memory counts

This commit is contained in:
Andrey Avtomonov 2026-05-11 22:52:47 +02:00
parent 1bd29c7eb1
commit b981cabdc6
5 changed files with 169 additions and 12 deletions

View file

@ -544,6 +544,63 @@ describe('runKtxIngest', () => {
expect(io.stdout()).toContain('Diff: +2/~0/-0/=0\n');
});
it('includes historic-sql projection output in saved memory counts', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
const result = completedLocalBundleRun(input, 'historic-sql-projection');
return {
...result,
report: localFakeBundleReport('historic-sql-projection', {
sourceKey: 'historic-sql',
body: {
workUnits: [],
postProcessor: {
sourceKey: 'historic-sql',
status: 'success',
result: {
tableUsageMerged: 56,
staleTablesMarked: 1,
patternPagesWritten: 30,
stalePatternPagesMarked: 2,
archivedPatternPages: 3,
legacyPagesDeleted: 4,
},
errors: [],
warnings: [],
touchedSources: [],
},
},
}),
};
});
const io = makeIo();
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'historic-sql',
outputMode: 'plain',
},
io.io,
{
runLocalIngest: runLocal,
createAdapters: vi.fn(() => [
{ source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
]),
jobIdFactory: () => 'historic-sql-projection',
},
),
).resolves.toBe(0);
expect(io.stderr()).toBe('');
expect(io.stdout()).toContain('Adapter: historic-sql\n');
expect(io.stdout()).toContain('Saved memory: 39 wiki, 57 SL\n');
});
it('returns a non-zero code when local ingest reports failed work units', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -14,6 +14,7 @@ import {
renderMemoryFlowReplay,
runLocalIngest,
runLocalMetabaseIngest,
savedMemoryCountsForReport,
} from '@ktx/context/ingest';
import { loadKtxProject } from '@ktx/context/project';
import { readIngestReportSnapshotFile } from './ingest-report-file.js';
@ -89,16 +90,8 @@ function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
return report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
}
function reportActionCounts(report: IngestReportSnapshot): { wikiCount: number; slCount: number } {
const actions = report.body.workUnits.flatMap((workUnit) => workUnit.actions);
return {
wikiCount: actions.filter((action) => action.target === 'wiki').length,
slCount: actions.filter((action) => action.target === 'sl').length,
};
}
function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void {
const counts = reportActionCounts(report);
const counts = savedMemoryCountsForReport(report);
io.stdout.write(`Report: ${report.id}\n`);
io.stdout.write(`Run: ${report.runId}\n`);
io.stdout.write(`Job: ${report.jobId}\n`);
@ -117,7 +110,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIngestIo): void {
const counts = result.children.reduce(
(acc, child) => {
const childCounts = reportActionCounts(child.report);
const childCounts = savedMemoryCountsForReport(child.report);
return {
wikiCount: acc.wikiCount + childCounts.wikiCount,
slCount: acc.slCount + childCounts.slCount,

View file

@ -1428,6 +1428,67 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(deps.sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success');
});
it('includes historic-sql post-processor output in memory-flow saved counts', async () => {
const deps = makeDeps();
deps.adapter.source = 'historic-sql';
deps.registry.get.mockReturnValue(deps.adapter);
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'historic-sql-table-public-orders',
rawFiles: ['tables/public/orders.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
const postProcessor = {
run: vi.fn().mockResolvedValue({
result: {
tableUsageMerged: 2,
staleTablesMarked: 1,
patternPagesWritten: 3,
stalePatternPagesMarked: 1,
archivedPatternPages: 1,
legacyPagesDeleted: 1,
},
warnings: [],
errors: [],
touchedSources: [{ connectionId: 'c1', sourceName: 'orders' }],
}),
};
const runner = buildRunner(deps, { postProcessors: { 'historic-sql': postProcessor } });
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['tables/public/orders.json', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/historic-sql/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'historic-sql',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{
jobId: 'j1',
memoryFlow,
startPhase: () => new TestJobContext('j1', null, () => Promise.resolve(), () => Promise.resolve()),
},
);
expect(memoryFlow.snapshot().events).toContainEqual(
expect.objectContaining({
type: 'saved',
wikiCount: 6,
slCount: 3,
}),
);
});
it('marks post-processor infrastructure failure as failed and preserves worktree cleanup state', async () => {
const deps = makeDeps();
deps.adapter.source = 'metricflow';

View file

@ -15,6 +15,7 @@ import type { ContextEvidenceIndexSummary, IngestBundleRunnerDeps, PageTriageRun
import { buildSyncId, rawSourcesDirForSync } from './raw-sources-paths.js';
import {
buildStageIndexFromReportBody,
postProcessorSavedMemoryCounts,
type IngestReportPostProcessorOutcome,
type IngestReportSnapshot,
} from './reports.js';
@ -1111,11 +1112,12 @@ export class IngestBundleRunner {
}
const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha;
const memoryFlowSavedActions = stageIndex.workUnits.flatMap((wu) => wu.actions).concat(reconcileActions);
const postProcessorMemoryCounts = postProcessorSavedMemoryCounts(postProcessorOutcome);
memoryFlow?.emit({
type: 'saved',
commitSha,
wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki'),
slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl'),
wikiCount: countMemoryFlowActions(memoryFlowSavedActions, 'wiki') + postProcessorMemoryCounts.wikiCount,
slCount: countMemoryFlowActions(memoryFlowSavedActions, 'sl') + postProcessorMemoryCounts.slCount,
});
await stage6?.updateProgress(1.0, commitSha ? `Saved changes (${commitSha.slice(0, 8)})` : 'No changes to save');

View file

@ -79,6 +79,50 @@ export interface IngestReportSnapshot {
createdAt: string;
}
export interface IngestSavedMemoryCounts {
wikiCount: number;
slCount: number;
}
function numericResultField(result: Record<string, unknown>, field: string): number {
const value = result[field];
return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : 0;
}
export function postProcessorSavedMemoryCounts(
postProcessor: IngestReportPostProcessorOutcome | undefined,
): IngestSavedMemoryCounts {
if (!postProcessor || postProcessor.sourceKey !== 'historic-sql') {
return { wikiCount: 0, slCount: 0 };
}
const result = postProcessor.result;
if (!result || typeof result !== 'object' || Array.isArray(result)) {
return { wikiCount: 0, slCount: 0 };
}
const record = result as Record<string, unknown>;
return {
wikiCount:
numericResultField(record, 'patternPagesWritten') +
numericResultField(record, 'stalePatternPagesMarked') +
numericResultField(record, 'archivedPatternPages') +
numericResultField(record, 'legacyPagesDeleted'),
slCount: numericResultField(record, 'tableUsageMerged') + numericResultField(record, 'staleTablesMarked'),
};
}
export function savedMemoryCountsForReport(report: IngestReportSnapshot): IngestSavedMemoryCounts {
const actions = report.body.workUnits.flatMap((workUnit) => workUnit.actions);
const directCounts = {
wikiCount: actions.filter((action) => action.target === 'wiki').length,
slCount: actions.filter((action) => action.target === 'sl').length,
};
const postProcessorCounts = postProcessorSavedMemoryCounts(report.body.postProcessor);
return {
wikiCount: directCounts.wikiCount + postProcessorCounts.wikiCount,
slCount: directCounts.slCount + postProcessorCounts.slCount,
};
}
export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex {
return {
jobId,