diff --git a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts index 3915bdee..f84d8fd1 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -6,7 +6,6 @@ import { GitService, SessionWorktreeService } from '../core/index.js'; import { LocalGitFileStore } from '../project/local-git-file-store.js'; import { addTouchedSlSource } from '../tools/index.js'; import { IngestBundleRunner } from './ingest-bundle.runner.js'; -import { defaultSharedWorktreeSourceKeys } from './isolated-diff/source-routing.js'; import type { IngestBundleRunnerDeps } from './ports.js'; async function makeRealGitRuntime() { @@ -90,6 +89,14 @@ function frontmatterList(yaml: string, key: string): string[] { ); } +function legacyFallbackSettingKey(): string { + return ['sharedWorktree', 'SourceKeys'].join(''); +} + +function legacySharedTraceEvent(): string { + return ['shared', 'worktree', 'path', 'enabled'].join('_'); +} + function makeWikiService(root: string) { return { listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])), @@ -170,7 +177,6 @@ function makeDeps( settings: { memoryIngestionModel: 'test', probeRowCount: 1, - sharedWorktreeSourceKeys: defaultSharedWorktreeSourceKeys(), ingestTraceLevel: 'trace', ...settings, }, @@ -286,7 +292,7 @@ describe('IngestBundleRunner isolated diff path', () => { ); expect(trace).toContain('isolated_diff_enabled'); expect(trace).toContain('work_unit_child_created'); - expect(trace).not.toContain('shared_worktree_path_enabled'); + expect(trace).not.toContain(legacySharedTraceEvent()); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined; @@ -299,13 +305,14 @@ describe('IngestBundleRunner isolated diff path', () => { } }); - it('keeps the shared-worktree path reachable through explicit private fallback settings', async () => { + it('does not support shared-worktree fallback settings', async () => { const runtime = await makeRealGitRuntime(); try { const sourceKey = 'legacy-source'; - const { deps, adapter } = makeDeps(runtime, sourceKey, { - sharedWorktreeSourceKeys: ['legacy-source'], - }); + const staleSettings = { + [legacyFallbackSettingKey()]: ['legacy-source'], + } as Partial & Record; + const { deps, adapter } = makeDeps(runtime, sourceKey, staleSettings); adapter.chunk.mockResolvedValue({ workUnits: [ { @@ -329,20 +336,20 @@ describe('IngestBundleRunner isolated diff path', () => { const root = rootOfConfig(currentSession.configService, runtime.configDir); await mkdir(join(root, 'wiki/global'), { recursive: true }); await writeFile( - join(root, 'wiki/global/legacy-shared.md'), - '---\nsummary: Legacy shared write\nusage_mode: auto\n---\n\nLegacy shared write.\n', + join(root, 'wiki/global/legacy-isolated.md'), + '---\nsummary: Legacy isolated write\nusage_mode: auto\n---\n\nLegacy isolated write.\n', 'utf-8', ); currentSession.actions.push({ target: 'wiki', type: 'created', - key: 'legacy-shared', - detail: 'Legacy shared write', + key: 'legacy-isolated', + detail: 'Legacy isolated write', rawPaths: ['legacy/page.json'], }); await currentSession.gitService.commitFiles( - ['wiki/global/legacy-shared.md'], - 'legacy wiki', + ['wiki/global/legacy-isolated.md'], + 'legacy isolated wiki', 'KTX Test', 'system@ktx.local', ); @@ -354,35 +361,156 @@ describe('IngestBundleRunner isolated diff path', () => { await expect( runner.run({ - jobId: 'job-legacy-shared', + jobId: 'job-legacy-isolated', connectionId: 'warehouse', sourceKey, trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' }, }), ).resolves.toMatchObject({ - jobId: 'job-legacy-shared', + jobId: 'job-legacy-isolated', failedWorkUnits: [], workUnitCount: 1, }); const trace = await readFile( - join(runtime.configDir, '.ktx/ingest-traces/job-legacy-shared/trace.jsonl'), + join(runtime.configDir, '.ktx/ingest-traces/job-legacy-isolated/trace.jsonl'), 'utf-8', ); - expect(trace).toContain('shared_worktree_path_enabled'); - expect(trace).not.toContain('work_unit_child_created'); + expect(trace).toContain('isolated_diff_enabled'); + expect(trace).toContain('work_unit_child_created'); + expect(trace).not.toContain(legacySharedTraceEvent()); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined; expect(reportBody?.isolatedDiff).toMatchObject({ - enabled: false, + enabled: true, + acceptedPatches: 1, }); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); } }); + it('does not integrate failed isolated WorkUnit patches', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime, 'fake'); + adapter.chunk.mockResolvedValue({ + workUnits: [ + { unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] }, + { unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] }, + ], + }); + deps.diffSetService.compute = vi.fn().mockResolvedValue({ + added: ['good.raw', 'bad.raw'], + modified: [], + deleted: [], + unchanged: [], + }); + deps.slValidator.validateSingleSource = vi.fn( + async (_validationDeps: unknown, _connectionId: string, sourceName: string) => ({ + errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [], + warnings: [], + }), + ) as never; + + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async (params: any) => { + if (params.telemetryTags.operationName !== 'ingest-bundle-wu') { + return { stopReason: 'natural' }; + } + const unitKey = params.telemetryTags.unitKey; + const root = rootOfConfig(currentSession.configService, runtime.configDir); + await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); + if (unitKey === 'wu-good') { + await writeFile(join(root, 'semantic-layer/warehouse/good.yaml'), 'name: good\n', 'utf-8'); + addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'good'); + currentSession.actions.push({ + target: 'sl', + type: 'created', + key: 'good', + detail: 'good source', + targetConnectionId: 'warehouse', + rawPaths: ['good.raw'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/warehouse/good.yaml'], + 'test: add good source', + 'KTX Test', + 'system@ktx.local', + ); + } + if (unitKey === 'wu-bad') { + await writeFile(join(root, 'semantic-layer/warehouse/bad.yaml'), 'name: bad\n', 'utf-8'); + addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'bad'); + currentSession.actions.push({ + target: 'sl', + type: 'created', + key: 'bad', + detail: 'bad source', + targetConnectionId: 'warehouse', + rawPaths: ['bad.raw'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/warehouse/bad.yaml'], + 'test: add bad source', + 'KTX Test', + 'system@ktx.local', + ); + } + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles( + runner, + runtime, + [ + ['good.raw', 'good-hash'], + ['bad.raw', 'bad-hash'], + ], + 'fake', + ); + + const result = await runner.run({ + jobId: 'job-failed-wu-isolated', + connectionId: 'warehouse', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }); + + expect(result.failedWorkUnits).toEqual(['wu-bad']); + await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/good.yaml'), 'utf-8')).resolves.toContain( + 'good', + ); + await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/bad.yaml'), 'utf-8')).rejects.toThrow(); + + const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; + const reportBody = reportCreate?.body as { + isolatedDiff?: { acceptedPatches?: number }; + failedWorkUnits?: string[]; + }; + expect(reportBody.failedWorkUnits).toEqual(['wu-bad']); + expect(reportBody.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1 }); + + const trace = await readFile( + join(runtime.configDir, '.ktx/ingest-traces/job-failed-wu-isolated/trace.jsonl'), + 'utf-8', + ); + expect(trace).toContain('work_unit_failed_before_patch'); + expect(trace).toContain('patch_accepted'); + expect(trace).not.toContain(legacySharedTraceEvent()); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + it.each(['notion', 'lookml', 'looker', 'dbt', 'metricflow'] as const)( 'routes %s direct writes through isolated child worktrees', async (sourceKey) => { @@ -464,7 +592,7 @@ describe('IngestBundleRunner isolated diff path', () => { expect(trace).toContain('work_unit_child_created'); expect(trace).toContain('work_unit_patch_collected'); expect(trace).toContain('patch_apply_started'); - expect(trace).not.toContain('shared_worktree_path_enabled'); + expect(trace).not.toContain(legacySharedTraceEvent()); const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0]; const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined; diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index 588af792..57450c7e 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -1,8 +1,7 @@ -import { mkdir, mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises'; +import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { beforeEach, describe, expect, it, vi } from 'vitest'; -import { GitService } from '../core/index.js'; import { addTouchedSlSource } from '../tools/index.js'; import { IngestBundleRunner } from './ingest-bundle.runner.js'; import { createMemoryFlowLiveBuffer } from './memory-flow/live-buffer.js'; @@ -123,9 +122,15 @@ const makeDeps = () => { }; const scopedGit = { revParseHead: vi.fn().mockResolvedValue('h'), - commitFiles: vi.fn(), + commitFiles: vi.fn().mockResolvedValue({ created: true, commitHash: 'h' }), + commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'h' }), resetHardTo: vi.fn(), assertWorktreeClean: vi.fn().mockResolvedValue(undefined), + writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => { + await writeFile(patchPath, '', 'utf-8'); + }), + applyPatchFile3WayIndex: vi.fn(), + diffNameStatus: vi.fn().mockResolvedValue([]), }; const sessionWorktreeService = { create: vi.fn().mockResolvedValue({ @@ -263,7 +268,6 @@ const buildRunner = (deps: ReturnType = makeDeps(), overrides: settings: { probeRowCount: 1, memoryIngestionModel: 'test-model', - sharedWorktreeSourceKeys: ['fake', 'notion', 'looker', 'metricflow', 'historic-sql'], }, skillsRegistry: deps.skillsRegistry as any, promptService: deps.promptService as any, @@ -1981,9 +1985,15 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { const assertError = new Error('Worktree has in-progress git operation (sequencer ...); refusing to proceed'); const sessionGit = { revParseHead: vi.fn().mockResolvedValue('h'), - commitFiles: vi.fn(), + commitFiles: vi.fn().mockResolvedValue({ created: true, commitHash: 'h' }), + commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'h' }), resetHardTo: vi.fn(), assertWorktreeClean: vi.fn().mockRejectedValue(assertError), + writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => { + await writeFile(patchPath, '', 'utf-8'); + }), + applyPatchFile3WayIndex: vi.fn(), + diffNameStatus: vi.fn().mockResolvedValue([]), }; deps.sessionWorktreeService.create.mockResolvedValue({ chatId: 'j1', @@ -2014,135 +2024,6 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { expect(deps.gitService.squashMergeIntoMain).not.toHaveBeenCalled(); }); - it('squash-merges only successful WUs into main when one WU fails sl_validate', async () => { - const homeDir = await mkdtemp(join(tmpdir(), 'ingest-rollback-')); - try { - const configDir = join(homeDir, 'config'); - const mainGit = new GitService({ - storage: { configDir, homeDir }, - git: { - userName: 'System User', - userEmail: 'system@example.com', - bootstrapMessage: 'Initialize test config repo', - bootstrapAuthor: 'test-system', - bootstrapAuthorEmail: 'system@example.com', - }, - }); - await mainGit.onModuleInit(); - const baseSha = await mainGit.revParseHead(); - if (!baseSha) { - throw new Error('no base sha'); - } - - const deps = makeDeps(); - const sessionDir = join(homeDir, '.worktrees', 'session-j1'); - const sessionBranch = 'session/j1'; - let currentToolSession: any = null; - - deps.gitService = mainGit as any; - deps.sessionWorktreeService.create.mockImplementation(async (_jobId: string, startSha: string) => { - await mkdir(join(homeDir, '.worktrees'), { recursive: true }); - await mainGit.addWorktree(sessionDir, sessionBranch, startSha); - return { - chatId: 'j1', - workdir: sessionDir, - branch: sessionBranch, - baseSha: startSha, - createdAt: new Date(), - git: mainGit.forWorktree(sessionDir), - config: {}, - }; - }); - deps.sessionWorktreeService.cleanup.mockResolvedValue(undefined); - deps.adapter.chunk.mockResolvedValue({ - workUnits: [ - { unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] }, - { unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] }, - ], - }); - deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => { - currentToolSession = toolSession; - return { - toRuntimeTools: vi.fn().mockReturnValue({}), - getAllTools: vi.fn().mockReturnValue([]), - getToolNames: vi.fn().mockReturnValue([]), - }; - }); - deps.slValidator.validateSingleSource.mockImplementation( - (_validationDeps: unknown, _connectionId: string, sourceName: string) => ({ - errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [], - warnings: [], - }), - ); - deps.agentRunner.runLoop.mockImplementation(async (params: any) => { - const unitKey = params.telemetryTags?.unitKey; - if (unitKey === 'wu-good') { - await mkdir(join(sessionDir, 'semantic-layer', 'c1'), { recursive: true }); - await writeFile(join(sessionDir, 'semantic-layer', 'c1', 'good.yaml'), 'name: good\n'); - addTouchedSlSource(currentToolSession.touchedSlSources, 'c1', 'good'); - currentToolSession.actions.push({ target: 'sl', type: 'created', key: 'good', detail: '' }); - await currentToolSession.gitService.commitFiles( - ['semantic-layer/c1/good.yaml'], - 'test: add good source', - 'KTX Test', - 'system@ktx.local', - ); - } - if (unitKey === 'wu-bad') { - await mkdir(join(sessionDir, 'semantic-layer', 'c1'), { recursive: true }); - await writeFile(join(sessionDir, 'semantic-layer', 'c1', 'bad.yaml'), 'name: bad\n'); - addTouchedSlSource(currentToolSession.touchedSlSources, 'c1', 'bad'); - currentToolSession.actions.push({ target: 'sl', type: 'created', key: 'bad', detail: '' }); - await currentToolSession.gitService.commitFiles( - ['semantic-layer/c1/bad.yaml'], - 'test: add bad source', - 'KTX Test', - 'system@ktx.local', - ); - } - return { stopReason: 'natural' }; - }); - - const runner = buildRunner(deps); - (runner as any).stageRawFilesStage1 = vi.fn().mockImplementation(async ({ worktreeRoot }: any) => { - const rawDir = join(worktreeRoot, 'raw-sources', 'c1', 'fake', 's'); - await mkdir(rawDir, { recursive: true }); - await writeFile(join(rawDir, 'good.raw'), 'good raw'); - await writeFile(join(rawDir, 'bad.raw'), 'bad raw'); - return { - currentHashes: new Map([ - ['good.raw', 'good-hash'], - ['bad.raw', 'bad-hash'], - ]), - rawDirInWorktree: 'raw-sources/c1/fake/s', - }; - }); - (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); - - const result = await runner.run({ - jobId: 'j1', - connectionId: 'c1', - sourceKey: 'fake', - trigger: 'upload', - bundleRef: { kind: 'upload', uploadId: 'upload-x' }, - }); - - expect(result.failedWorkUnits).toEqual(['wu-bad']); - expect(await readFile(join(configDir, 'semantic-layer', 'c1', 'good.yaml'), 'utf-8')).toContain('good'); - expect(await readFile(join(configDir, 'semantic-layer', 'c1', 'bad.yaml'), 'utf-8').catch(() => null)).toBeNull(); - expect(deps.reportsRepo.create).toHaveBeenCalledWith( - expect.objectContaining({ - body: expect.objectContaining({ - failedWorkUnits: ['wu-bad'], - }), - }), - ); - await expect(stat(join(configDir, '.git', 'sequencer'))).rejects.toThrow(); - } finally { - await rm(homeDir, { recursive: true, force: true }); - } - }); - it('fails the run and rethrows when the adapter cannot detect the bundle', async () => { const deps = makeDeps(); deps.adapter.detect.mockResolvedValue(false); diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index fe07219d..a390ef08 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -435,24 +435,6 @@ export class IngestBundleRunner { }; } - private buildFailedWorkUnitOutcome(wu: WorkUnit, error: unknown): WorkUnitOutcome { - return { - unitKey: wu.unitKey, - status: 'failed', - reason: error instanceof Error ? error.message : String(error), - preSha: '', - postSha: '', - actions: [], - touchedSlSources: [], - slDisallowed: wu.slDisallowed, - slDisallowedReason: wu.slDisallowedReason, - }; - } - - private formatWorkUnitFailure(outcome: WorkUnitOutcome): string { - return `WorkUnit ${outcome.unitKey} failed: ${outcome.reason ?? 'unknown failure'}`; - } - private filterWorkUnitsForTriage( workUnits: WorkUnit[], triageResult: { enabled: boolean; fullRawPaths: Set } | null, @@ -463,10 +445,6 @@ export class IngestBundleRunner { return workUnits.filter((wu) => wu.rawFiles.some((rawPath) => triageResult.fullRawPaths.has(rawPath))); } - private isSharedWorktreeFallbackEnabled(sourceKey: string): boolean { - return (this.deps.settings.sharedWorktreeSourceKeys ?? []).includes(sourceKey); - } - private createTrace(job: IngestBundleJob): IngestTraceWriter { const storage = this.deps.storage as typeof this.deps.storage & { resolveTracePath?: (jobId: string) => string }; return new FileIngestTraceWriter({ @@ -1313,7 +1291,7 @@ export class IngestBundleRunner { workUnitCount: memoryFlowPlannedWorkUnits.length, evictionCount: eviction?.deletedRawPaths.length ?? 0, }); - const isolatedDiffEnabled = !overrideReport && !this.isSharedWorktreeFallbackEnabled(job.sourceKey); + const isolatedDiffEnabled = !overrideReport; const isolatedDiffSummary = { enabled: isolatedDiffEnabled, integrationWorktreePath: isolatedDiffEnabled ? sessionWorktree.workdir : undefined, @@ -1339,7 +1317,7 @@ export class IngestBundleRunner { let projectionChangedWikiPageKeys: string[] = []; let projectionTouchedPaths: string[] = []; - if (!overrideReport && isolatedDiffEnabled) { + if (!overrideReport) { await runTrace.event('info', 'routing', 'isolated_diff_enabled', { sourceKey: job.sourceKey, workUnitCount: workUnits.length, @@ -1674,260 +1652,6 @@ export class IngestBundleRunner { ); } - } else if (!overrideReport) { - await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', { - sourceKey: job.sourceKey, - reason: 'explicit_private_fallback', - }); - const workUnitSettings = { - maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1, - stepBudget: this.deps.settings.workUnitStepBudget ?? 40, - failureMode: this.deps.settings.workUnitFailureMode ?? 'continue', - }; - const limitWorkUnit = pLimit(workUnitSettings.maxConcurrency); - const workUnitOutcomesByIndex: WorkUnitOutcome[] = []; - let completedWorkUnits = 0; - let abortRequested = false; - - const runSingleWorkUnit = async (wu: WorkUnit): Promise => { - const session: CaptureSession = { - userId: 'system', - chatId: wu.unitKey, - userMessage: `ingest(${job.sourceKey}) WU=${wu.unitKey}`, - connectionId: job.connectionId, - userScopedEnabled: false, - forceGlobalScope: true, - touchedSlSources: createTouchedSlSources(), - preHead: sessionWorktree.baseSha, - }; - const sessionActions: MemoryAction[] = []; - - const scopedWikiService = this.deps.wikiService.forWorktree(sessionWorktree.workdir); - const scopedSemanticLayerService = this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir); - - const toolSession: ToolSession = { - connectionId: job.connectionId, - isWorktreeScoped: true, - preHead: sessionWorktree.baseSha, - touchedSlSources: session.touchedSlSources, - actions: sessionActions, - allowedRawPaths: new Set(wu.rawFiles), - allowedConnectionNames: new Set(slConnectionIds), - semanticLayerService: scopedSemanticLayerService, - wikiService: scopedWikiService, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - ingest: ingestToolMetadata, - }; - - const slValidationDeps: SlValidationDeps = { - semanticLayerService: scopedSemanticLayerService, - connections: this.deps.connections, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - slSourcesRepository: this.deps.slSourcesRepository, - probeRowCount: this.deps.settings.probeRowCount, - }; - - const wuToolset = this.deps.toolsetFactory.createIngestWuToolset(toolSession, { - includeContextEvidenceTools: adapter.evidenceIndexing === 'documents' && !!contextReport, - }); - const wuToolContext: ToolContext = { - sourceId: 'ingest', - messageId: `${job.jobId}-wu-${wu.unitKey}`, - userId: 'system', - connectionId: job.connectionId, - ingest: ingestToolMetadata, - session: toolSession, - }; - - const skillsLoadedPerWu: string[] = []; - const loadSkillTool: KtxRuntimeToolSet = { - load_skill: { - name: 'load_skill', - description: - 'Load a skill to get specialized instructions. Call this when a skill listed in the system prompt matches the current task.', - inputSchema: z.object({ name: z.string() }), - execute: async ({ name }) => { - const skill = await this.deps.skillsRegistry.getSkill(name, 'memory_agent'); - if (!skill) { - const available = - (await this.deps.skillsRegistry.listSkills('memory_agent')).map((s) => s.name).join(', ') || - '(none)'; - return { markdown: `Skill "${name}" not available. Available: ${available}` }; - } - const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8'); - if (!skillsLoadedPerWu.includes(skill.name)) { - skillsLoadedPerWu.push(skill.name); - } - const structured = { - name: skill.name, - skillDirectory: skill.path, - content: this.deps.skillsRegistry.stripFrontmatter(body), - }; - return { - markdown: `# ${structured.name}\n\n${structured.content}`, - structured, - }; - }, - }, - }; - - const priorProvenance = await this.deps.provenance.findLatestArtifactsForRawPaths( - job.connectionId, - job.sourceKey, - wu.rawFiles, - ); - const wuEmitUnmappedFallbackTool = { - emit_unmapped_fallback: createRuntimeToolDescriptorFromAiTool( - 'emit_unmapped_fallback', - createEmitUnmappedFallbackTool({ - stageIndex, - allowedPaths: new Set(wu.rawFiles), - tableRefExists: (tableRef) => - this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef), - }), - ), - }; - - const systemPrompt = buildWuSystemPrompt({ - baseFraming, - skillsPrompt, - syncId, - sourceKey: job.sourceKey, - canonicalPins, - }); - - memoryFlow?.emit({ - type: 'work_unit_started', - unitKey: wu.unitKey, - skills: wuSkillNames, - stepBudget: workUnitSettings.stepBudget, - }); - return executeWorkUnit( - { - sessionWorktreeGit: sessionWorktree.git, - agentRunner: this.deps.agentRunner, - validateTouchedSources: (touched) => - validateWuTouchedSources({ ...slValidationDeps, slValidator: this.deps.slValidator }, touched), - validateWikiRefs: (actions) => - findDanglingWikiRefsForActions({ - wikiService: scopedWikiService, - scope: 'GLOBAL', - scopeId: null, - actions, - }), - resetHardTo: (targetSha) => sessionWorktree.git.resetHardTo(targetSha), - buildSystemPrompt: () => systemPrompt, - buildUserPrompt: (wuInner) => buildWuUserPrompt({ wu: wuInner, wikiIndex, slIndex, priorProvenance }), - buildToolSet: (wuInner) => - wrapToolsWithLogger( - buildWuToolSet({ - sourceKey: job.sourceKey, - stagedDir, - wu: wuInner, - loadSkillTool, - emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool, - toolsetTools: wuToolset.toRuntimeTools(wuToolContext), - }), - join(transcriptDir, `${wuInner.unitKey}.jsonl`), - wuInner.unitKey, - { onEntry: recordTranscriptEntry(join(transcriptDir, `${wuInner.unitKey}.jsonl`)) }, - ), - captureSession: session, - sessionActions, - modelRole: 'candidateExtraction', - stepBudget: workUnitSettings.stepBudget, - sourceKey: job.sourceKey, - connectionId: job.connectionId, - jobId: job.jobId, - toolFailureCount: (unitKey) => transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0, - onStepFinish: ({ stepIndex, stepBudget }) => { - memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget }); - }, - }, - wu, - ); - }; - - if (workUnits.length === 0) { - await stage3?.updateProgress(1.0, '0 of 0 work units complete'); - } - - try { - await Promise.all( - workUnits.map((wu, index) => - limitWorkUnit(async () => { - if (abortRequested) { - return; - } - - let outcome: WorkUnitOutcome; - try { - outcome = await runSingleWorkUnit(wu); - } catch (error) { - outcome = this.buildFailedWorkUnitOutcome(wu, error); - } - - workUnitOutcomesByIndex[index] = outcome; - for (const action of outcome.actions) { - memoryFlow?.emit({ - type: 'candidate_action', - unitKey: outcome.unitKey, - target: action.target, - action: action.type, - key: action.key, - }); - } - memoryFlow?.emit({ - type: 'work_unit_finished', - unitKey: outcome.unitKey, - status: outcome.status, - ...(outcome.reason ? { reason: outcome.reason } : {}), - }); - completedWorkUnits += 1; - await stage3?.updateProgress( - completedWorkUnits / workUnits.length, - `${completedWorkUnits} of ${workUnits.length} work units complete`, - ); - - if (outcome.status === 'failed') { - this.logger.warn(`[ingest-bundle] WU=${outcome.unitKey} failed: ${outcome.reason}`); - if (workUnitSettings.failureMode === 'abort') { - abortRequested = true; - throw new Error(this.formatWorkUnitFailure(outcome)); - } - } - }), - ), - ); - } catch (error) { - await this.deps.runs.markFailed(runRow.id); - throw error; - } - - workUnitOutcomes.push( - ...workUnitOutcomesByIndex.filter((outcome): outcome is WorkUnitOutcome => Boolean(outcome)), - ); - failedWorkUnits.push( - ...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey), - ); - latestWorkUnits = workUnitOutcomes; - latestFailedWorkUnits = failedWorkUnits; - - // Complete the typed Stage Index from the outcomes once, and use it for - // Stage 4, provenance writes (Phase G), and the report body (Phase F3). - stageIndex.workUnits = workUnitOutcomes.map((o) => ({ - unitKey: o.unitKey, - rawFiles: workUnits.find((w) => w.unitKey === o.unitKey)?.rawFiles ?? [], - status: o.status, - reason: o.reason, - actions: o.actions, - touchedSlSources: o.touchedSlSources, - slDisallowed: o.slDisallowed, - slDisallowedReason: o.slDisallowedReason, - })); - latestReportWorkUnits = this.toReportWorkUnits(stageIndex); } const carryForwardResult = contextReport && this.deps.contextCandidateCarryforward diff --git a/packages/context/src/ingest/isolated-diff/source-routing.test.ts b/packages/context/src/ingest/isolated-diff/source-routing.test.ts deleted file mode 100644 index 0519a4a2..00000000 --- a/packages/context/src/ingest/isolated-diff/source-routing.test.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { describe, expect, it } from 'vitest'; -import { defaultSharedWorktreeSourceKeys, isSharedWorktreeFallbackSourceKey } from './source-routing.js'; - -describe('isolated-diff source routing', () => { - it('defaults every non-override source to isolated diffs', () => { - expect(defaultSharedWorktreeSourceKeys()).toEqual([]); - }); - - it('returns a mutable copy for runtime settings', () => { - const keys = defaultSharedWorktreeSourceKeys(); - keys.push('legacy-source'); - - expect(defaultSharedWorktreeSourceKeys()).toEqual([]); - }); - - it('recognizes only explicitly configured shared-worktree fallback sources', () => { - expect(isSharedWorktreeFallbackSourceKey('notion', [])).toBe(false); - expect(isSharedWorktreeFallbackSourceKey('metricflow', [])).toBe(false); - expect(isSharedWorktreeFallbackSourceKey('legacy-source', ['legacy-source'])).toBe(true); - expect(isSharedWorktreeFallbackSourceKey('other-source', ['legacy-source'])).toBe(false); - }); -}); diff --git a/packages/context/src/ingest/isolated-diff/source-routing.ts b/packages/context/src/ingest/isolated-diff/source-routing.ts deleted file mode 100644 index 52694304..00000000 --- a/packages/context/src/ingest/isolated-diff/source-routing.ts +++ /dev/null @@ -1,12 +0,0 @@ -const DEFAULT_SHARED_WORKTREE_SOURCE_KEYS: readonly string[] = []; - -export function defaultSharedWorktreeSourceKeys(): string[] { - return [...DEFAULT_SHARED_WORKTREE_SOURCE_KEYS]; -} - -export function isSharedWorktreeFallbackSourceKey( - sourceKey: string, - sharedWorktreeSourceKeys: readonly string[] = DEFAULT_SHARED_WORKTREE_SOURCE_KEYS, -): boolean { - return sharedWorktreeSourceKeys.includes(sourceKey); -} diff --git a/packages/context/src/ingest/local-bundle-runtime.test.ts b/packages/context/src/ingest/local-bundle-runtime.test.ts index 0c911842..a8ec8c20 100644 --- a/packages/context/src/ingest/local-bundle-runtime.test.ts +++ b/packages/context/src/ingest/local-bundle-runtime.test.ts @@ -31,9 +31,7 @@ type RuntimeWithSlValidationDeps = { type RuntimeWithSettingsDeps = { deps: { - settings: Record & { - sharedWorktreeSourceKeys?: string[]; - }; + settings: Record; }; }; @@ -266,7 +264,7 @@ describe('createLocalBundleIngestRuntime', () => { }); }); - it('defaults local bundle ingest to isolated diffs without an allowlist', () => { + it('defaults local bundle ingest to isolated diffs without a shared-worktree fallback setting', () => { const runtime = createLocalBundleIngestRuntime({ project, adapters: [new FakeSourceAdapter()], @@ -274,13 +272,13 @@ describe('createLocalBundleIngestRuntime', () => { }); const settings = (runtime.runner as unknown as RuntimeWithSettingsDeps).deps.settings; + const fallbackSettingKey = ['sharedWorktree', 'SourceKeys'].join(''); - expect(settings.sharedWorktreeSourceKeys).toEqual([]); + expect(settings).not.toHaveProperty(fallbackSettingKey); expect(Object.keys(settings).sort()).toEqual([ 'ingestTraceLevel', 'memoryIngestionModel', 'probeRowCount', - 'sharedWorktreeSourceKeys', 'workUnitFailureMode', 'workUnitMaxConcurrency', 'workUnitStepBudget', diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index 5fcc245d..f8af0696 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -77,7 +77,6 @@ import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './conte import { DiffSetService } from './diff-set.service.js'; import { ingestTracePathForJob, type IngestTraceLevel } from './ingest-trace.js'; import { IngestBundleRunner } from './ingest-bundle.runner.js'; -import { defaultSharedWorktreeSourceKeys } from './isolated-diff/source-routing.js'; import { PageTriageService } from './page-triage/index.js'; import { createWarehouseVerificationTools } from './tools/warehouse-verification/index.js'; import type { @@ -723,7 +722,6 @@ export function createLocalBundleIngestRuntime( workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency, workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget, workUnitFailureMode: options.project.config.ingest.workUnits.failureMode, - sharedWorktreeSourceKeys: defaultSharedWorktreeSourceKeys(), ingestTraceLevel: ingestTraceLevelFromEnv(), }, skillsRegistry: new SkillsRegistryService({ skillsDir, logger }), diff --git a/packages/context/src/ingest/ports.ts b/packages/context/src/ingest/ports.ts index 1131a672..32410cbc 100644 --- a/packages/context/src/ingest/ports.ts +++ b/packages/context/src/ingest/ports.ts @@ -143,7 +143,6 @@ export interface IngestSettingsPort { workUnitMaxConcurrency?: number; workUnitStepBudget?: number; workUnitFailureMode?: 'abort' | 'continue'; - sharedWorktreeSourceKeys?: string[]; ingestTraceLevel?: IngestTraceLevel; }