diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 14b91333..ab7c717b 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -635,6 +635,117 @@ describe('runKtxIngest', () => { expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase'); }); + it('emits structured child ingest progress during Metabase fan-out', async () => { + const projectDir = join(tempDir, 'project'); + await writeMetabaseConfig(projectDir); + const io = makeIo(); + const progressEvents: Array<{ percent: number; message: string; transient?: boolean }> = []; + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'json', + }, + io.io, + { + progress: (event) => progressEvents.push(event), + runLocalMetabaseIngest: async (input) => { + input.progress?.onMetabaseFanoutPlanned?.({ + metabaseConnectionId: 'prod-metabase', + children: [{ metabaseDatabaseId: 1, targetConnectionId: 'warehouse_a' }], + }); + input.progress?.onMetabaseChildStarted?.({ + metabaseConnectionId: 'prod-metabase', + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', + jobId: 'metabase-child-1', + }); + input.memoryFlow?.update({ + plannedWorkUnits: [ + { + unitKey: 'metabase-col-6', + rawFiles: ['cards/40.json'], + peerFileCount: 0, + dependencyCount: 0, + }, + ], + }); + input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }); + input.memoryFlow?.emit({ + type: 'work_unit_started', + unitKey: 'metabase-col-6', + skills: ['sl_capture'], + stepBudget: 40, + }); + input.memoryFlow?.emit({ + type: 'work_unit_step', + unitKey: 'metabase-col-6', + stepIndex: 7, + stepBudget: 40, + }); + input.memoryFlow?.emit({ + type: 'stage_progress', + stage: 'integration', + percent: 81, + message: 'Resolving text conflict for metabase-col-6', + }); + input.memoryFlow?.emit({ type: 'work_unit_finished', unitKey: 'metabase-col-6', status: 'success' }); + input.memoryFlow?.update({ + plannedWorkUnits: [ + { + unitKey: 'metabase-col-7', + rawFiles: ['cards/48.json'], + peerFileCount: 0, + dependencyCount: 0, + }, + ], + }); + input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }); + input.memoryFlow?.emit({ + type: 'work_unit_started', + unitKey: 'metabase-col-7', + skills: ['sl_capture'], + stepBudget: 40, + }); + input.progress?.onMetabaseChildCompleted?.({ + metabaseConnectionId: 'prod-metabase', + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', + jobId: 'metabase-child-1', + status: 'done', + }); + return { + metabaseConnectionId: 'prod-metabase', + status: 'all_succeeded', + totals: { workUnits: 1, failedWorkUnits: 0 }, + children: [], + }; + }, + }, + ), + ).resolves.toBe(0); + + expect(progressEvents).toEqual( + expect.arrayContaining([ + { percent: 45, message: 'Planned 1 task' }, + { percent: 55, message: 'Processing 1/1 tasks: metabase-col-6' }, + { + percent: 60, + message: 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 step 7/40', + transient: true, + }, + { percent: 81, message: 'Resolving text conflict for metabase-col-6' }, + { percent: 81, message: 'Processing 1/1 tasks: metabase-col-7' }, + ]), + ); + expect(io.stdout()).toContain('"status": "all_succeeded"'); + expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase'); + }); + it('runs Metabase scheduled ingest through the public CLI command path with real fan-out', async () => { const projectDir = join(tempDir, 'metabase-cli-project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index ac146e79..a45308f1 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -295,7 +295,11 @@ function formatDiffProgress(event: Extract event.type === 'chunks_planned'); + const startIndex = latestPlanIndex >= 0 ? latestPlanIndex + 1 : 0; + return snapshot.events.slice(startIndex, eventIndex + 1); } function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { @@ -319,7 +323,8 @@ function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex if (snapshot.plannedWorkUnits.length > 0) { return snapshot.plannedWorkUnits.length; } - const planEvent = workUnitEventsThrough(snapshot, eventIndex) + const planEvent = snapshot.events + .slice(0, eventIndex + 1) .filter((event) => event.type === 'chunks_planned') .at(-1); return planEvent?.workUnitCount ?? completedWorkUnitCountThrough(snapshot, eventIndex); @@ -365,6 +370,12 @@ function plainIngestEventProgress( }; case 'stage_skipped': return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` }; + case 'stage_progress': + return { + percent: event.percent, + message: event.message, + ...(event.transient !== undefined ? { transient: event.transient } : {}), + }; case 'work_unit_started': { const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); @@ -711,6 +722,25 @@ export async function runKtxIngest( } if (args.adapter === 'metabase') { const executeMetabaseFanout = deps.runLocalMetabaseIngest ?? runLocalMetabaseIngest; + const runOutputMode = effectiveIngestOutputMode(args.outputMode, io, env, { + requireInput: (args.inputMode ?? 'auto') === 'auto', + }); + const plainProgress = shouldWritePlainIngestProgress(runOutputMode, io, env) + ? createPlainIngestProgressRenderer(args, io) + : null; + const structuredProgress = deps.progress + ? createPlainIngestProgressObserver(args, deps.progress) + : null; + const initialMemoryFlow = + plainProgress || structuredProgress ? initialRunMemoryFlowInput(args, 'pending') : undefined; + const memoryFlow = initialMemoryFlow + ? createMemoryFlowLiveBuffer(initialMemoryFlow, { + onChange: (snapshot) => { + plainProgress?.update(snapshot); + structuredProgress?.update(snapshot); + }, + }) + : undefined; const progress = args.outputMode === 'json' && !deps.progress ? undefined @@ -721,20 +751,29 @@ export async function runKtxIngest( : io, deps.progress, ); - const result = await executeMetabaseFanout({ - project: ingestProject, - adapters: createAdapters(ingestProject, adapterOptions), - metabaseConnectionId: args.connectionId, - ...localIngestOptions, - queryExecutor, - trigger: 'manual_resync', - jobIdFactory: deps.jobIdFactory, - ...(progress ? { progress } : {}), - }); - if (args.outputMode === 'json') { - io.stdout.write(`${JSON.stringify(result, null, 2)}\n`); - } else { - writeMetabaseFanoutStatus(result, io); + plainProgress?.start(); + structuredProgress?.start(); + let result: LocalMetabaseFanoutResult; + try { + result = await executeMetabaseFanout({ + project: ingestProject, + adapters: createAdapters(ingestProject, adapterOptions), + metabaseConnectionId: args.connectionId, + ...localIngestOptions, + queryExecutor, + trigger: 'manual_resync', + jobIdFactory: deps.jobIdFactory, + ...(memoryFlow ? { memoryFlow } : {}), + ...(progress ? { progress } : {}), + }); + plainProgress?.flush(); + if (args.outputMode === 'json') { + io.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + } else { + writeMetabaseFanoutStatus(result, io); + } + } finally { + plainProgress?.flush(); } return result.status === 'all_succeeded' ? 0 : 1; } diff --git a/packages/context/src/ingest/adapters/metabase/fetch.test.ts b/packages/context/src/ingest/adapters/metabase/fetch.test.ts index 7e7e4e4a..1f93765e 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.test.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.test.ts @@ -138,6 +138,52 @@ describe('fetchMetabaseBundle', () => { expect(warn).not.toHaveBeenCalled(); }); + it('emits memory-flow progress while fetching Metabase cards', async () => { + const events: unknown[] = []; + + await fetchMetabaseBundle({ + pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 }, + stagedDir, + ctx: { + ...makeFetchContext(), + memoryFlow: { + emit: (event) => events.push(event), + update: vi.fn(), + finish: vi.fn(), + snapshot: vi.fn(), + }, + }, + clientFactory, + sourceStateReader, + }); + + expect(events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: 'stage_progress', + stage: 'source', + message: 'Fetching Metabase database 42 metadata', + }), + expect.objectContaining({ + type: 'stage_progress', + stage: 'source', + message: 'Fetching 1 Metabase card for database 42', + }), + expect.objectContaining({ + type: 'stage_progress', + stage: 'source', + message: 'Checked 1/1 Metabase cards for database 42; wrote 1', + transient: true, + }), + expect.objectContaining({ + type: 'stage_progress', + stage: 'source', + message: 'Fetched Metabase database 42: 1 cards, 0 unresolved', + }), + ]), + ); + }); + it('routes Metabase fetch warnings through the injected logger', async () => { const logger = { log: vi.fn(), diff --git a/packages/context/src/ingest/adapters/metabase/fetch.ts b/packages/context/src/ingest/adapters/metabase/fetch.ts index d4e8b59b..c67dc2a7 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch.ts @@ -83,6 +83,15 @@ function resolvePath(index: Map, collectionId: export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise { const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig); const logger = params.logger ?? noopMetabaseFetchLogger; + const emitFetchProgress = (percent: number, message: string, transient = false): void => { + params.ctx.memoryFlow?.emit({ + type: 'stage_progress', + stage: 'source', + percent, + message, + ...(transient ? { transient } : {}), + }); + }; const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId); const mapping = syncState.mappings.find( (m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled, @@ -100,6 +109,7 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr const client = await params.clientFactory.createClient(pullConfig, params.ctx); try { + emitFetchProgress(26, `Fetching Metabase database ${pullConfig.metabaseDatabaseId} metadata`); let mappingDatabaseName = mapping.metabaseDatabaseName; let mappingEngine = mapping.metabaseEngine; if (mappingDatabaseName === null) { @@ -133,6 +143,12 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr await mkdir(join(params.stagedDir, STAGED_FILES.databasesDir), { recursive: true }); const cardIdsToFetch = await resolveCardIdsToFetch(client, scope, pullConfig.metabaseDatabaseId, logger); + emitFetchProgress( + 28, + `Fetching ${cardIdsToFetch.length} Metabase card${cardIdsToFetch.length === 1 ? '' : 's'} for database ${ + pullConfig.metabaseDatabaseId + }`, + ); const referencedCollectionIds = new Set(); let writtenCards = 0; @@ -212,7 +228,19 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr } } } + const knownTotal = Math.max(cardIdsToFetch.length, fetched.size + queue.length); + if (fetched.size === 1 || fetched.size % 10 === 0 || queue.length === 0) { + emitFetchProgress( + 30, + `Checked ${fetched.size}/${knownTotal} Metabase cards for database ${pullConfig.metabaseDatabaseId}; wrote ${writtenCards}`, + true, + ); + } } + emitFetchProgress( + 32, + `Fetched Metabase database ${pullConfig.metabaseDatabaseId}: ${writtenCards} cards, ${unresolvedCards.length} unresolved`, + ); for (const colId of referencedCollectionIds) { const node = collectionIndex.get(colId); diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 11dc3513..21fc82de 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -20,7 +20,7 @@ import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolve import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js'; import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js'; import type { CanonicalPin } from './canonical-pins.js'; -import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js'; +import type { MemoryFlowEvent, MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js'; import type { ContextEvidenceIndexSummary, IngestBundleRunnerDeps, @@ -76,6 +76,8 @@ import type { } from './types.js'; import { repairWikiSlRefs, type WikiSlRefRepairResult } from './wiki-sl-ref-repair.js'; +type MemoryFlowStageProgress = Extract; + function workUnitToMemoryFlowPlannedWorkUnit(workUnit: WorkUnit): MemoryFlowPlannedWorkUnit { return { unitKey: workUnit.unitKey, @@ -313,7 +315,7 @@ export class IngestBundleRunner { protected async resolveStagedDir( ref: IngestBundleJob['bundleRef'], - ctx: { connectionId: string; sourceKey: string; jobId: string }, + ctx: { connectionId: string; sourceKey: string; jobId: string; memoryFlow?: MemoryFlowEventSink }, ): Promise { if (ref.kind === 'upload') { return this.deps.storage.resolveUploadDir(ref.uploadId); @@ -327,7 +329,11 @@ export class IngestBundleRunner { if (!adapter.fetch) { throw new Error(`source adapter '${ctx.sourceKey}' does not support scheduled_pull (no fetch() method)`); } - await adapter.fetch(ref.config, stagedDir, { connectionId: ctx.connectionId, sourceKey: ctx.sourceKey }); + await adapter.fetch(ref.config, stagedDir, { + connectionId: ctx.connectionId, + sourceKey: ctx.sourceKey, + ...(ctx.memoryFlow ? { memoryFlow: ctx.memoryFlow } : {}), + }); return stagedDir; } @@ -978,6 +984,20 @@ export class IngestBundleRunner { }); try { const memoryFlow = ctx?.memoryFlow; + const emitStageProgress = ( + stage: MemoryFlowStageProgress['stage'], + percent: number, + message: string, + options: { transient?: boolean } = {}, + ): void => { + memoryFlow?.emit({ + type: 'stage_progress', + stage, + percent, + message, + ...(options.transient !== undefined ? { transient: options.transient } : {}), + }); + }; const baseSha = await this.deps.lockingService.withLock('config:repo', () => this.deps.gitService.revParseHead()); if (!baseSha) { throw new Error('ingest-bundle: config repo has no HEAD'); @@ -1017,6 +1037,7 @@ export class IngestBundleRunner { connectionId: job.connectionId, sourceKey: job.sourceKey, jobId: job.jobId, + ...(memoryFlow ? { memoryFlow } : {}), }), ); const fetchReport = adapter.readFetchReport ? await adapter.readFetchReport(stagedDir) : null; @@ -1482,6 +1503,10 @@ export class IngestBundleRunner { })); activePhase = 'integration'; + const integrablePatchCount = workUnitOutcomesByIndex.filter( + (outcome) => outcome?.status === 'success' && !!outcome.patchPath, + ).length; + let integratedPatchCount = 0; for (const [index, outcome] of workUnitOutcomesByIndex.entries()) { if (!outcome || outcome.status !== 'success' || !outcome.patchPath) { continue; @@ -1496,6 +1521,11 @@ export class IngestBundleRunner { allowedTargetConnectionIds: slConnectionIds, }; activeFailureDetails = integrationFailureDetails; + emitStageProgress( + 'integration', + 80, + `Integrating ${integratedPatchCount + 1}/${integrablePatchCount} patches: ${outcome.unitKey}`, + ); const integration = await integrateWorkUnitPatch({ unitKey: outcome.unitKey, patchPath: outcome.patchPath, @@ -1532,8 +1562,9 @@ export class IngestBundleRunner { ), }); }, - resolveTextualConflict: (context) => - resolveTextualConflict({ + resolveTextualConflict: async (context) => { + emitStageProgress('integration', 81, `Resolving text conflict for ${context.unitKey}`); + const result = await resolveTextualConflict({ agentRunner: this.deps.agentRunner, workdir: sessionWorktree.workdir, unitKey: context.unitKey, @@ -1543,9 +1574,19 @@ export class IngestBundleRunner { reason: context.reason, maxAttempts: 1, stepBudget: 12, - }), - repairGateFailure: (context) => - repairFinalGateFailure({ + }); + emitStageProgress( + 'integration', + 82, + result.status === 'repaired' + ? `Resolved text conflict for ${context.unitKey}` + : `Text conflict resolver failed for ${context.unitKey}`, + ); + return result; + }, + repairGateFailure: async (context) => { + emitStageProgress('integration', 82, `Repairing semantic gate for ${context.unitKey}`); + const result = await repairFinalGateFailure({ agentRunner: this.deps.agentRunner, workdir: sessionWorktree.workdir, gateError: context.reason, @@ -1554,7 +1595,16 @@ export class IngestBundleRunner { repairKind: 'patch_semantic_gate', maxAttempts: 1, stepBudget: 16, - }), + }); + emitStageProgress( + 'integration', + 83, + result.status === 'repaired' + ? `Repaired semantic gate for ${context.unitKey}` + : `Semantic gate repair failed for ${context.unitKey}`, + ); + return result; + }, }); if (integration.textualResolution) { isolatedDiffSummary.resolverAttempts += integration.textualResolution.attempts; @@ -1598,6 +1648,12 @@ export class IngestBundleRunner { } activeFailureDetails = undefined; isolatedDiffSummary.acceptedPatches += 1; + integratedPatchCount += 1; + emitStageProgress( + 'integration', + 83, + `Integrated ${integratedPatchCount}/${integrablePatchCount} patches`, + ); } } else if (!overrideReport) { @@ -2004,6 +2060,7 @@ export class IngestBundleRunner { (eviction?.deletedRawPaths.length ?? 0) > 0 || hasCandidateReconcileWork; if (hasReconcileWork || overrideReport) { + emitStageProgress('reconciliation', 84, 'Reconciling results'); await stage4?.updateProgress(0.0, 'Reconciling results'); } @@ -2052,6 +2109,12 @@ export class IngestBundleRunner { getReconciliationActions: () => reconcileActions, onStepFinish: stage4 ? ({ passNumber, stepIndex, stepBudget }) => { + emitStageProgress( + 'reconciliation', + 85, + `Reconciling results: pass ${passNumber} step ${stepIndex}/${stepBudget}`, + { transient: true }, + ); void stage4.updateProgress( stepIndex / stepBudget, `Reconciling results · pass ${passNumber} step ${stepIndex}`, @@ -2105,6 +2168,9 @@ export class IngestBundleRunner { force: !!overrideReport, onStepFinish: stage4 ? ({ stepIndex, stepBudget }) => { + emitStageProgress('reconciliation', 85, `Reconciling results: step ${stepIndex}/${stepBudget}`, { + transient: true, + }); void stage4.updateProgress(stepIndex / stepBudget, `Reconciling results · step ${stepIndex}`); } : undefined, @@ -2147,6 +2213,7 @@ export class IngestBundleRunner { activePhase = 'post_processor'; if (postProcessor) { const stagePostProcessor = ctx?.startPhase(0.04); + emitStageProgress('post_processor', 87, 'Running deterministic imports'); await stagePostProcessor?.updateProgress(0.0, 'Running deterministic imports'); try { const result = await traceTimed( @@ -2173,6 +2240,7 @@ export class IngestBundleRunner { warnings: result.warnings, touchedSources: result.touchedSources, }; + emitStageProgress('post_processor', 88, 'Deterministic imports complete'); await stagePostProcessor?.updateProgress(1.0, 'Deterministic imports complete'); } catch (error) { postProcessorOutcome = { @@ -2200,6 +2268,7 @@ export class IngestBundleRunner { ]), ].sort(); activePhase = 'wiki_sl_ref_repair'; + emitStageProgress('wiki_sl_ref_repair', 88, 'Repairing wiki semantic-layer references'); wikiSlRefRepairResult = await traceTimed( runTrace, 'wiki_sl_ref_repair', @@ -2218,6 +2287,7 @@ export class IngestBundleRunner { repairs: wikiSlRefRepairResult.repairs, warnings: wikiSlRefRepairResult.warnings, }); + emitStageProgress('wiki_sl_ref_repair', 88, 'Checked wiki semantic-layer references'); const postReconciliationSha = await sessionWorktree.git.revParseHead(); const postReconciliationPaths = preReconciliationSha && postReconciliationSha && preReconciliationSha !== postReconciliationSha @@ -2261,6 +2331,7 @@ export class IngestBundleRunner { }; activePhase = 'target_policy'; activeFailureDetails = targetPolicyTraceData; + emitStageProgress('final_gates', 88, 'Checking semantic-layer target policy'); await traceTimed(runTrace, 'target_policy', 'semantic_layer_target_policy', targetPolicyTraceData, async () => { assertSemanticLayerTargetPathsAllowed({ paths: finalTargetPolicyPaths, @@ -2288,6 +2359,7 @@ export class IngestBundleRunner { }; activePhase = 'final_gates'; activeFailureDetails = finalArtifactGateTraceData; + emitStageProgress('final_gates', 89, 'Running final artifact gates'); try { await traceTimed( runTrace, @@ -2329,6 +2401,7 @@ export class IngestBundleRunner { changedWikiPageKeys: finalChangedWikiPageKeys, touchedSlSources: finalTouchedSlSources, }); + emitStageProgress('final_gates', 89, 'Repairing final artifact gates'); const gateRepair = await repairFinalGateFailure({ agentRunner: this.deps.agentRunner, workdir: sessionWorktree.workdir, @@ -2408,6 +2481,7 @@ export class IngestBundleRunner { activeFailureDetails = undefined; activePhase = 'provenance_validation'; + emitStageProgress('provenance', 90, 'Validating provenance rows'); latestReportWorkUnits = this.toReportWorkUnits(stageIndex); latestReconciliationActions = reconcileActions; latestConflictsResolved = stageIndex.conflictsResolved; @@ -2452,6 +2526,7 @@ export class IngestBundleRunner { // Stage 6 — squash commit activePhase = 'squash'; const stage6 = ctx?.startPhase(0.04); + emitStageProgress('save', 91, 'Saving changes'); await stage6?.updateProgress(0.0, 'Saving changes'); try { await sessionWorktree.git.assertWorktreeClean(); @@ -2521,6 +2596,7 @@ export class IngestBundleRunner { } const stage5 = ctx?.startPhase(0.04); + emitStageProgress('provenance', 95, 'Recording history'); await stage5?.updateProgress(0.0, 'Recording history'); activePhase = 'provenance'; @@ -2535,6 +2611,7 @@ export class IngestBundleRunner { ); const stage7 = ctx?.startPhase(0.04); + emitStageProgress('report', 97, 'Wrapping up'); await stage7?.updateProgress(0.0, 'Wrapping up'); activePhase = 'report'; diff --git a/packages/context/src/ingest/memory-flow/schema.test.ts b/packages/context/src/ingest/memory-flow/schema.test.ts index c54752f8..b8c70856 100644 --- a/packages/context/src/ingest/memory-flow/schema.test.ts +++ b/packages/context/src/ingest/memory-flow/schema.test.ts @@ -21,6 +21,7 @@ function snapshot(overrides: Partial = {}): MemoryFlowRep { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, + { type: 'stage_progress', stage: 'integration', percent: 80, message: 'Integrating 1/1 patches: orders' }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_step', unitKey: 'orders', stepIndex: 1, stepBudget: 40 }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, diff --git a/packages/context/src/ingest/memory-flow/schema.ts b/packages/context/src/ingest/memory-flow/schema.ts index 0e268f17..7f00cde3 100644 --- a/packages/context/src/ingest/memory-flow/schema.ts +++ b/packages/context/src/ingest/memory-flow/schema.ts @@ -53,6 +53,23 @@ export const memoryFlowEventSchema = z.discriminatedUnion('type', [ stage: z.enum(['source', 'chunks', 'workUnits', 'actions', 'gates', 'saved']), reason: z.string().min(1), }), + eventSchema({ + type: z.literal('stage_progress'), + stage: z.enum([ + 'source', + 'integration', + 'reconciliation', + 'post_processor', + 'wiki_sl_ref_repair', + 'final_gates', + 'save', + 'provenance', + 'report', + ]), + percent: z.number().min(0).max(100), + message: z.string().min(1), + transient: z.boolean().optional(), + }), eventSchema({ type: z.literal('work_unit_started'), unitKey: z.string().min(1), diff --git a/packages/context/src/ingest/memory-flow/types.ts b/packages/context/src/ingest/memory-flow/types.ts index 8a40ac04..df8dfff3 100644 --- a/packages/context/src/ingest/memory-flow/types.ts +++ b/packages/context/src/ingest/memory-flow/types.ts @@ -44,6 +44,22 @@ type MemoryFlowEventPayload = stage: MemoryFlowColumnId; reason: string; } + | { + type: 'stage_progress'; + stage: + | 'source' + | 'integration' + | 'reconciliation' + | 'post_processor' + | 'wiki_sl_ref_repair' + | 'final_gates' + | 'save' + | 'provenance' + | 'report'; + percent: number; + message: string; + transient?: boolean; + } | { type: 'work_unit_started'; unitKey: string; diff --git a/packages/context/src/ingest/types.ts b/packages/context/src/ingest/types.ts index 5cd26163..fb8938d3 100644 --- a/packages/context/src/ingest/types.ts +++ b/packages/context/src/ingest/types.ts @@ -47,6 +47,7 @@ export interface ChunkResult { export interface FetchContext { connectionId: string; sourceKey: string; + memoryFlow?: MemoryFlowEventSink; } type SourceFetchIssueKind =