diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index 1a7ed721..a7328574 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -609,6 +609,11 @@ export { } from './raw-sources-paths.js'; export { ingestReportSnapshotSchema, parseIngestReportSnapshot } from './report-snapshot.js'; export type { IngestReportBody, IngestReportSnapshot } from './reports.js'; +export * from './artifact-gates.js'; +export * from './ingest-trace.js'; +export * from './isolated-diff/git-patch.js'; +export * from './isolated-diff/patch-integrator.js'; +export * from './isolated-diff/work-unit-executor.js'; export * from './reports.js'; export { SourceAdapterRegistry } from './source-adapter-registry.js'; export type { SqliteBundleIngestStoreOptions } from './sqlite-bundle-ingest-store.js'; @@ -652,4 +657,7 @@ export type { TriageSignals, UnresolvedCardInfo, WorkUnit, + DeterministicProjectionContext, + ProjectionResult, } from './types.js'; +export * from './wiki-body-refs.js'; diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index 9ccf1aef..a315dbca 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -255,6 +255,7 @@ const buildRunner = (deps: ReturnType = makeDeps(), overrides: resolveUploadDir: (uploadId) => `/tmp/ktx-test/ingest-uploads/${uploadId}`, resolvePullDir: (jobId) => `/tmp/ktx-test/ingest-pulls/${jobId}`, resolveTranscriptDir: (jobId) => `/tmp/ktx-test/run/wu-transcripts/${jobId}`, + resolveTracePath: (jobId) => `/tmp/ktx-test/ingest-traces/${jobId}/trace.jsonl`, }, settings: { probeRowCount: 1, memoryIngestionModel: 'test-model' }, skillsRegistry: deps.skillsRegistry as any, @@ -1505,7 +1506,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { const runner = buildRunner(deps); (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ - currentHashes: new Map([['explores/b2b/sales_pipeline.json', 'h1']]), + currentHashes: new Map([['a.yml', 'h1']]), rawDirInWorktree: 'raw-sources/looker-run/fake/s', }); (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 614f8aaa..ef804405 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -6,14 +6,26 @@ import { type KtxLogger, noopLogger } from '../core/index.js'; import { createRuntimeToolDescriptorFromAiTool, type KtxRuntimeToolSet } from '../llm/index.js'; import type { CaptureSession, MemoryAction } from '../memory/index.js'; import type { SemanticLayerService, SemanticLayerSource, SlValidationDeps } from '../sl/index.js'; -import { createTouchedSlSources, type ToolContext, type ToolSession } from '../tools/index.js'; +import { createTouchedSlSources, type ToolContext, type ToolSession, type TouchedSlSource } from '../tools/index.js'; +import type { KnowledgeWikiService } from '../wiki/index.js'; import { findDanglingWikiRefsForActions } from '../wiki/wiki-ref-validation.js'; import { actionTargetConnectionId } from './action-identity.js'; import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js'; +import { validateFinalIngestArtifacts, validateProvenanceRawPaths } from './artifact-gates.js'; import { selectRelevantCanonicalPins } from './canonical-pins.js'; +import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js'; +import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js'; +import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js'; import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js'; -import type { MemoryFlowPlannedWorkUnit } from './memory-flow/types.js'; -import type { ContextEvidenceIndexSummary, IngestBundleRunnerDeps, PageTriageRunResult } from './ports.js'; +import type { CanonicalPin } from './canonical-pins.js'; +import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js'; +import type { + ContextEvidenceIndexSummary, + IngestBundleRunnerDeps, + IngestProvenanceRow, + IngestSessionWorktree, + PageTriageRunResult, +} from './ports.js'; import { buildSyncId, rawSourcesDirForSync } from './raw-sources-paths.js'; import { buildStageIndexFromReportBody, @@ -393,8 +405,231 @@ export class IngestBundleRunner { return workUnits.filter((wu) => wu.rawFiles.some((rawPath) => triageResult.fullRawPaths.has(rawPath))); } + private isIsolatedDiffEnabled(sourceKey: string): boolean { + return (this.deps.settings.isolatedDiffSourceKeys ?? []).includes(sourceKey); + } + + private createTrace(job: IngestBundleJob): IngestTraceWriter { + const storage = this.deps.storage as typeof this.deps.storage & { resolveTracePath?: (jobId: string) => string }; + return new FileIngestTraceWriter({ + tracePath: storage.resolveTracePath?.(job.jobId) ?? ingestTracePathForJob(this.deps.storage.homeDir, job.jobId), + jobId: job.jobId, + connectionId: job.connectionId, + sourceKey: job.sourceKey, + level: this.deps.settings.ingestTraceLevel ?? 'debug', + }); + } + + private wikiPageKeysFromPaths(paths: string[]): string[] { + return [ + ...new Set( + paths + .filter((path) => path.startsWith('wiki/global/') && path.endsWith('.md')) + .map((path) => path.slice('wiki/global/'.length, -'.md'.length)), + ), + ].sort(); + } + + private touchedSlSourcesFromPaths(paths: string[]): TouchedSlSource[] { + return paths + .filter((path) => path.startsWith('semantic-layer/') && path.endsWith('.yaml') && !path.includes('/_schema/')) + .map((path) => { + const [, connectionId, fileName] = path.split('/'); + return { connectionId: connectionId ?? '', sourceName: (fileName ?? '').replace(/\.yaml$/, '') }; + }) + .filter((source) => source.connectionId.length > 0 && source.sourceName.length > 0); + } + + private async runWorkUnitInWorktree(input: { + job: IngestBundleJob; + syncId: string; + wu: WorkUnit; + worktree: IngestSessionWorktree; + stagedDir: string; + contextReport: ContextEvidenceIndexSummary | null; + ingestToolMetadata: { runId: string; jobId: string; syncId: string; sourceKey: string }; + slConnectionIds: string[]; + wikiIndex: string; + slIndex: string; + priorProvenance: Map; + scopedWikiService: ReturnType; + scopedSemanticLayerService: ReturnType; + baseFraming: string; + skillsPrompt: string; + canonicalPins: CanonicalPin[]; + workUnitSettings: { maxConcurrency: number; stepBudget: number; failureMode: 'abort' | 'continue' }; + transcriptDir: string; + transcriptSummaries: Map; + recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => void; + stageIndex: StageIndex; + includeContextEvidenceTools: boolean; + currentTableExists(tableRef: string): Promise; + memoryFlow?: MemoryFlowEventSink; + wuSkillNames: string[]; + onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; + }): Promise { + const session: CaptureSession = { + userId: 'system', + chatId: input.wu.unitKey, + userMessage: `ingest(${input.job.sourceKey}) WU=${input.wu.unitKey}`, + connectionId: input.job.connectionId, + userScopedEnabled: false, + forceGlobalScope: true, + touchedSlSources: createTouchedSlSources(), + preHead: input.worktree.baseSha, + }; + const sessionActions: MemoryAction[] = []; + + const toolSession: ToolSession = { + connectionId: input.job.connectionId, + isWorktreeScoped: true, + preHead: input.worktree.baseSha, + touchedSlSources: session.touchedSlSources, + actions: sessionActions, + allowedRawPaths: new Set(input.wu.rawFiles), + allowedConnectionNames: new Set(input.slConnectionIds), + semanticLayerService: input.scopedSemanticLayerService, + wikiService: input.scopedWikiService, + configService: input.worktree.config, + gitService: input.worktree.git, + ingest: input.ingestToolMetadata, + }; + + const slValidationDeps: SlValidationDeps = { + semanticLayerService: input.scopedSemanticLayerService, + connections: this.deps.connections, + configService: input.worktree.config, + gitService: input.worktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + }; + + const wuToolset = this.deps.toolsetFactory.createIngestWuToolset(toolSession, { + includeContextEvidenceTools: input.includeContextEvidenceTools, + }); + const wuToolContext: ToolContext = { + sourceId: 'ingest', + messageId: `${input.job.jobId}-wu-${input.wu.unitKey}`, + userId: 'system', + connectionId: input.job.connectionId, + ingest: input.ingestToolMetadata, + session: toolSession, + }; + + const skillsLoadedPerWu: string[] = []; + const loadSkillTool: KtxRuntimeToolSet = { + load_skill: { + name: 'load_skill', + description: + 'Load a skill to get specialized instructions. Call this when a skill listed in the system prompt matches the current task.', + inputSchema: z.object({ name: z.string() }), + execute: async ({ name }) => { + const skill = await this.deps.skillsRegistry.getSkill(name, 'memory_agent'); + if (!skill) { + const available = + (await this.deps.skillsRegistry.listSkills('memory_agent')).map((s) => s.name).join(', ') || '(none)'; + return { markdown: `Skill "${name}" not available. Available: ${available}` }; + } + const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8'); + if (!skillsLoadedPerWu.includes(skill.name)) { + skillsLoadedPerWu.push(skill.name); + } + const structured = { + name: skill.name, + skillDirectory: skill.path, + content: this.deps.skillsRegistry.stripFrontmatter(body), + }; + return { + markdown: `# ${structured.name}\n\n${structured.content}`, + structured, + }; + }, + }, + }; + + const wuEmitUnmappedFallbackTool = { + emit_unmapped_fallback: createRuntimeToolDescriptorFromAiTool( + 'emit_unmapped_fallback', + createEmitUnmappedFallbackTool({ + stageIndex: input.stageIndex, + allowedPaths: new Set(input.wu.rawFiles), + tableRefExists: input.currentTableExists, + }), + ), + }; + + const systemPrompt = buildWuSystemPrompt({ + baseFraming: input.baseFraming, + skillsPrompt: input.skillsPrompt, + syncId: input.syncId, + sourceKey: input.job.sourceKey, + canonicalPins: input.canonicalPins, + }); + + input.memoryFlow?.emit({ + type: 'work_unit_started', + unitKey: input.wu.unitKey, + skills: input.wuSkillNames, + stepBudget: input.workUnitSettings.stepBudget, + }); + return executeWorkUnit( + { + sessionWorktreeGit: input.worktree.git, + agentRunner: this.deps.agentRunner, + validateTouchedSources: (touched) => + validateWuTouchedSources({ ...slValidationDeps, slValidator: this.deps.slValidator }, touched), + validateWikiRefs: (actions) => + findDanglingWikiRefsForActions({ + wikiService: input.scopedWikiService, + scope: 'GLOBAL', + scopeId: null, + actions, + }), + resetHardTo: (targetSha) => input.worktree.git.resetHardTo(targetSha), + buildSystemPrompt: () => systemPrompt, + buildUserPrompt: (wuInner) => + buildWuUserPrompt({ + wu: wuInner, + wikiIndex: input.wikiIndex, + slIndex: input.slIndex, + priorProvenance: input.priorProvenance, + }), + buildToolSet: (wuInner) => + wrapToolsWithLogger( + buildWuToolSet({ + sourceKey: input.job.sourceKey, + stagedDir: input.stagedDir, + wu: wuInner, + loadSkillTool, + emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool, + toolsetTools: wuToolset.toRuntimeTools(wuToolContext), + }), + join(input.transcriptDir, `${wuInner.unitKey}.jsonl`), + wuInner.unitKey, + { onEntry: input.recordTranscriptEntry(join(input.transcriptDir, `${wuInner.unitKey}.jsonl`)) }, + ), + captureSession: session, + sessionActions, + modelRole: 'candidateExtraction', + stepBudget: input.workUnitSettings.stepBudget, + sourceKey: input.job.sourceKey, + connectionId: input.job.connectionId, + jobId: input.job.jobId, + toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0, + onStepFinish: input.onStepFinish, + }, + input.wu, + ); + } + protected async runInner(job: IngestBundleJob, ctx?: IngestJobContext): Promise> { const syncId = buildSyncId(new Date(), job.jobId); + const trace = this.createTrace(job); + await trace.event('info', 'run', 'ingest_started', { + trigger: job.trigger, + bundleRefKind: job.bundleRef.kind, + }); + try { const memoryFlow = ctx?.memoryFlow; const baseSha = await this.deps.lockingService.withLock('config:repo', () => this.deps.gitService.revParseHead()); if (!baseSha) { @@ -434,7 +669,7 @@ export class IngestBundleRunner { const sessionWorktree = await this.deps.lockingService.withLock('config:repo', () => this.deps.sessionWorktreeService.create(job.jobId, baseSha), ); - let cleanupOutcome: 'success' | 'crash' = 'crash'; + let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash'; try { const { currentHashes, rawDirInWorktree } = await this.stageRawFilesStage1({ @@ -497,6 +732,15 @@ export class IngestBundleRunner { syncId, sourceKey: job.sourceKey, }; + const runTrace = trace.withContext({ runId: runRow.id, syncId }); + await runTrace.event('debug', 'snapshot', 'input_snapshot', { + baseSha, + stagedDir, + rawFileCount: currentHashes.size, + rawDirInWorktree, + diffSummary, + scopeFingerprint: scopeDescriptor?.fingerprint ?? null, + }); await stage1?.updateProgress( 1.0, @@ -626,12 +870,305 @@ export class IngestBundleRunner { workUnitCount: memoryFlowPlannedWorkUnits.length, evictionCount: eviction?.deletedRawPaths.length ?? 0, }); + const isolatedDiffEnabled = !overrideReport && this.isIsolatedDiffEnabled(job.sourceKey); + const isolatedDiffSummary = { + enabled: isolatedDiffEnabled, + integrationWorktreePath: isolatedDiffEnabled ? sessionWorktree.workdir : undefined, + ingestionBaseSha: undefined as string | undefined, + projectionSha: null as string | null, + acceptedPatches: 0, + textualConflicts: 0, + semanticConflicts: 0, + }; const stage3 = ctx?.startPhase(0.6); await stage3?.updateProgress(0.0, `Processing ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`); this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`); - if (!overrideReport) { + if (!overrideReport && isolatedDiffEnabled) { + await runTrace.event('info', 'routing', 'isolated_diff_enabled', { + sourceKey: job.sourceKey, + workUnitCount: workUnits.length, + integrationWorktreePath: sessionWorktree.workdir, + }); + + let projectionTouchedSources: TouchedSlSource[] = []; + let projectionChangedWikiPageKeys: string[] = []; + if (adapter.project) { + const projection = await traceTimed( + runTrace, + 'projection', + 'deterministic_projection', + { sourceKey: job.sourceKey }, + () => + adapter.project!({ + connectionId: job.connectionId, + sourceKey: job.sourceKey, + syncId, + jobId: job.jobId, + runId: runRow.id, + stagedDir, + workdir: sessionWorktree.workdir, + parseArtifacts, + }), + ); + if (projection.errors.length > 0) { + await this.deps.runs.markFailed(runRow.id); + throw new Error(`deterministic projection failed: ${projection.errors.join('; ')}`); + } + projectionTouchedSources = projection.touchedSources; + projectionChangedWikiPageKeys = projection.changedWikiPageKeys; + const projectionPaths = [ + ...projection.touchedSources.map((source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`), + ...projection.changedWikiPageKeys.map((pageKey) => `wiki/global/${pageKey}.md`), + ]; + const projectionCommit = + projectionPaths.length > 0 + ? await sessionWorktree.git.commitFiles( + projectionPaths, + `ingest(${job.sourceKey}): deterministic projection syncId=${syncId}`, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + ) + : await sessionWorktree.git.commitStaged( + `ingest(${job.sourceKey}): deterministic projection syncId=${syncId}`, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + ); + isolatedDiffSummary.projectionSha = projectionCommit.created ? projectionCommit.commitHash : null; + await runTrace.event('debug', 'projection', 'deterministic_projection_committed', { + projectionSha: isolatedDiffSummary.projectionSha, + touchedSources: projectionTouchedSources, + changedWikiPageKeys: projectionChangedWikiPageKeys, + warnings: projection.warnings, + }); + } + + const ingestionBaseSha = await sessionWorktree.git.revParseHead(); + isolatedDiffSummary.ingestionBaseSha = ingestionBaseSha; + const patchDir = join(this.deps.storage.homeDir, 'ingest-patches', job.jobId); + const workUnitSettings = { + maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1, + stepBudget: this.deps.settings.workUnitStepBudget ?? 40, + failureMode: this.deps.settings.workUnitFailureMode ?? 'continue', + }; + const limitWorkUnit = pLimit(workUnitSettings.maxConcurrency); + const workUnitOutcomesByIndex: WorkUnitOutcome[] = []; + let completedWorkUnits = 0; + + if (workUnits.length === 0) { + await stage3?.updateProgress(1.0, '0 of 0 work units complete'); + } + + try { + await Promise.all( + workUnits.map((wu, index) => + limitWorkUnit(async () => { + const outcome = await runIsolatedWorkUnit({ + unitIndex: index, + ingestionBaseSha, + sessionWorktreeService: this.deps.sessionWorktreeService, + patchDir, + trace: runTrace, + workUnit: wu, + run: async (child) => { + const scopedWikiService = this.deps.wikiService.forWorktree(child.workdir); + const scopedSemanticLayerService = this.deps.semanticLayerService.forWorktree(child.workdir); + return this.runWorkUnitInWorktree({ + job, + syncId, + wu, + worktree: child, + stagedDir, + contextReport, + ingestToolMetadata, + slConnectionIds, + wikiIndex, + slIndex, + priorProvenance: await this.deps.provenance.findLatestArtifactsForRawPaths( + job.connectionId, + job.sourceKey, + wu.rawFiles, + ), + scopedWikiService, + scopedSemanticLayerService, + baseFraming, + skillsPrompt, + canonicalPins, + workUnitSettings, + transcriptDir, + transcriptSummaries, + recordTranscriptEntry, + stageIndex, + includeContextEvidenceTools: adapter.evidenceIndexing === 'documents' && !!contextReport, + currentTableExists: (tableRef) => + this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef), + memoryFlow, + wuSkillNames, + onStepFinish: ({ stepIndex, stepBudget }) => { + memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget }); + }, + }); + }, + }); + workUnitOutcomesByIndex[index] = outcome; + for (const action of outcome.actions) { + memoryFlow?.emit({ + type: 'candidate_action', + unitKey: outcome.unitKey, + target: action.target, + action: action.type, + key: action.key, + }); + } + memoryFlow?.emit({ + type: 'work_unit_finished', + unitKey: outcome.unitKey, + status: outcome.status, + ...(outcome.reason ? { reason: outcome.reason } : {}), + }); + completedWorkUnits += 1; + await stage3?.updateProgress( + completedWorkUnits / workUnits.length, + `${completedWorkUnits} of ${workUnits.length} work units complete`, + ); + }), + ), + ); + } catch (error) { + await this.deps.runs.markFailed(runRow.id); + throw error; + } + + workUnitOutcomes.push( + ...workUnitOutcomesByIndex.filter((outcome): outcome is WorkUnitOutcome => Boolean(outcome)), + ); + failedWorkUnits.push( + ...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey), + ); + stageIndex.workUnits = workUnitOutcomes.map((o) => ({ + unitKey: o.unitKey, + rawFiles: workUnits.find((w) => w.unitKey === o.unitKey)?.rawFiles ?? [], + status: o.status, + reason: o.reason, + actions: o.actions, + touchedSlSources: o.touchedSlSources, + slDisallowed: o.slDisallowed, + slDisallowedReason: o.slDisallowedReason, + })); + + for (const [index, outcome] of workUnitOutcomesByIndex.entries()) { + if (!outcome || outcome.status !== 'success' || !outcome.patchPath) { + continue; + } + const wu = workUnits[index]; + if (!wu) { + continue; + } + const integration = await integrateWorkUnitPatch({ + unitKey: outcome.unitKey, + patchPath: outcome.patchPath, + integrationGit: sessionWorktree.git, + trace: runTrace, + author: this.deps.storage.systemGitAuthor, + slDisallowed: wu.slDisallowed === true, + validateAppliedTree: async (touchedPaths) => { + await validateFinalIngestArtifacts({ + connectionIds: slConnectionIds, + changedWikiPageKeys: this.wikiPageKeysFromPaths(touchedPaths), + touchedSlSources: this.touchedSlSourcesFromPaths(touchedPaths), + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }, + }); + if (integration.status === 'textual_conflict') { + isolatedDiffSummary.textualConflicts += 1; + await this.deps.runs.markFailed(runRow.id); + cleanupOutcome = 'conflict'; + throw new Error(`isolated diff textual conflict in ${outcome.unitKey}: ${integration.reason}`); + } + if (integration.status === 'semantic_conflict') { + isolatedDiffSummary.semanticConflicts += 1; + await this.deps.runs.markFailed(runRow.id); + cleanupOutcome = 'conflict'; + throw new Error(`isolated diff semantic conflict in ${outcome.unitKey}: ${integration.reason}`); + } + isolatedDiffSummary.acceptedPatches += 1; + } + + await traceTimed( + runTrace, + 'final_gates', + 'final_artifact_gates', + { + changedWikiPageKeys: [ + ...new Set([ + ...projectionChangedWikiPageKeys, + ...workUnitOutcomes + .flatMap((outcome) => outcome.patchTouchedPaths ?? []) + .flatMap((path) => this.wikiPageKeysFromPaths([path])), + ]), + ], + touchedSlSources: [...projectionTouchedSources, ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources)], + }, + async () => { + await validateFinalIngestArtifacts({ + connectionIds: slConnectionIds, + changedWikiPageKeys: [ + ...new Set([ + ...projectionChangedWikiPageKeys, + ...workUnitOutcomes + .flatMap((outcome) => outcome.patchTouchedPaths ?? []) + .flatMap((path) => this.wikiPageKeysFromPaths([path])), + ]), + ], + touchedSlSources: [...projectionTouchedSources, ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources)], + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }, + ); + } else if (!overrideReport) { + await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', { sourceKey: job.sourceKey }); const workUnitSettings = { maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1, stepBudget: this.deps.settings.workUnitStepBudget ?? 40, @@ -1291,7 +1828,7 @@ export class IngestBundleRunner { }; const producedPaths = new Set(); const pushActionProvenance = (rawPath: string, action: MemoryAction): void => { - const hash = currentHashes.get(rawPath) ?? 'unknown'; + const hash = currentHashes.get(rawPath) ?? ''; provenanceRows.push({ connectionId: job.connectionId, sourceKey: job.sourceKey, @@ -1319,7 +1856,7 @@ export class IngestBundleRunner { } } for (const resolution of stageIndex.artifactResolutions ?? []) { - const hash = currentHashes.get(resolution.rawPath) ?? 'unknown'; + const hash = currentHashes.get(resolution.rawPath) ?? ''; provenanceRows.push({ connectionId: job.connectionId, sourceKey: job.sourceKey, @@ -1351,6 +1888,11 @@ export class IngestBundleRunner { actionType: 'skipped', }); } + validateProvenanceRawPaths({ + rows: provenanceRows, + currentRawPaths: new Set(currentHashes.keys()), + deletedRawPaths: new Set(eviction?.deletedRawPaths ?? []), + }); await this.deps.provenance.insertMany(provenanceRows); memoryFlow?.emit({ type: 'provenance_recorded', rowCount: provenanceRows.length }); await stage5?.updateProgress( @@ -1399,6 +1941,8 @@ export class IngestBundleRunner { diffSummary, fetch: fetchReport ?? undefined, commitSha, + tracePath: runTrace.tracePath, + isolatedDiff: isolatedDiffEnabled ? isolatedDiffSummary : undefined, workUnits: stageIndex.workUnits.map((wu) => ({ unitKey: wu.unitKey, rawFiles: wu.rawFiles, @@ -1514,6 +2058,12 @@ export class IngestBundleRunner { }); } await stage7?.updateProgress(1.0, 'Done'); + await runTrace.event('info', 'run', 'ingest_finished', { + status: 'completed', + commitSha, + failedWorkUnits, + tracePath: runTrace.tracePath, + }); cleanupOutcome = 'success'; return { @@ -1528,5 +2078,9 @@ export class IngestBundleRunner { } finally { await this.deps.sessionWorktreeService.cleanup(sessionWorktree, cleanupOutcome); } + } catch (error) { + await trace.event('error', 'run', 'ingest_failed', { tracePath: trace.tracePath }, error); + throw error; + } } } diff --git a/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts b/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts index b42fa794..805a5dc4 100644 --- a/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts +++ b/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts @@ -81,8 +81,15 @@ describe('runIsolatedWorkUnit', () => { expect(sessionWorktreeService.create).toHaveBeenCalledWith('job-1-wu-1', baseSha); expect(sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success'); expect(result.status).toBe('success'); - expect(result.patchPath).toContain('0000-wu-1.patch'); - await expect(readFile(result.patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md'); + if (result.status !== 'success') { + throw new Error('expected successful work unit'); + } + const patchPath = result.patchPath; + if (!patchPath) { + throw new Error('expected patch path'); + } + expect(patchPath).toContain('0000-wu-1.patch'); + await expect(readFile(patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md'); await expect(readFile(tracePath, 'utf-8')).resolves.toContain('work_unit_child_created'); }); }); diff --git a/packages/context/src/ingest/types.ts b/packages/context/src/ingest/types.ts index 27472523..5cd26163 100644 --- a/packages/context/src/ingest/types.ts +++ b/packages/context/src/ingest/types.ts @@ -96,6 +96,25 @@ export interface ClusterWorkUnitsContext { embedding: KtxEmbeddingPort; } +export interface DeterministicProjectionContext { + connectionId: string; + sourceKey: string; + syncId: string; + jobId: string; + runId: string; + stagedDir: string; + workdir: string; + parseArtifacts?: unknown; +} + +export interface ProjectionResult { + warnings: string[]; + errors: string[]; + touchedSources: Array<{ connectionId: string; sourceName: string }>; + changedWikiPageKeys: string[]; + result?: unknown; +} + export interface SourceAdapter { readonly source: string; readonly skillNames: string[]; @@ -109,6 +128,7 @@ export interface SourceAdapter { listTargetConnectionIds?(stagedDir: string): Promise; chunk(stagedDir: string, diffSet?: DiffSet): Promise; clusterWorkUnits?(ctx: ClusterWorkUnitsContext): Promise; + project?(ctx: DeterministicProjectionContext): Promise; describeScope?(stagedDir: string): Promise; onPullSucceeded?(ctx: { connectionId: string;