diff --git a/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts b/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts new file mode 100644 index 00000000..b42fa794 --- /dev/null +++ b/packages/context/src/ingest/isolated-diff/work-unit-executor.test.ts @@ -0,0 +1,88 @@ +import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it, vi } from 'vitest'; +import { GitService } from '../../core/index.js'; +import { FileIngestTraceWriter } from '../ingest-trace.js'; +import { runIsolatedWorkUnit } from './work-unit-executor.js'; + +async function makeGit() { + const homeDir = await mkdtemp(join(tmpdir(), 'ktx-isolated-wu-')); + const configDir = join(homeDir, 'config'); + const git = new GitService({ + storage: { configDir, homeDir }, + git: { + userName: 'System User', + userEmail: 'system@example.com', + bootstrapMessage: 'init', + bootstrapAuthor: 'system', + bootstrapAuthorEmail: 'system@example.com', + }, + }); + await git.onModuleInit(); + await mkdir(join(configDir, 'raw-sources/c1/fake/s'), { recursive: true }); + await writeFile(join(configDir, 'raw-sources/c1/fake/s/a.json'), '{}\n'); + await git.commitFiles(['raw-sources/c1/fake/s/a.json'], 'raw snapshot', 'System User', 'system@example.com'); + return { homeDir, configDir, git, baseSha: await git.revParseHead() }; +} + +describe('runIsolatedWorkUnit', () => { + it('creates a child worktree at the ingestion base and persists a patch proposal', async () => { + const { homeDir, git, baseSha } = await makeGit(); + const childDir = join(homeDir, '.worktrees/session-job-1-wu-1'); + const sessionWorktreeService = { + create: vi.fn(async (_key: string, startSha: string) => { + await mkdir(join(homeDir, '.worktrees'), { recursive: true }); + await git.addWorktree(childDir, 'session/job-1-wu-1', startSha); + const childGit = git.forWorktree(childDir); + return { + chatId: 'job-1-wu-1', + workdir: childDir, + branch: 'session/job-1-wu-1', + baseSha: startSha, + createdAt: new Date(), + git: childGit, + config: {}, + }; + }), + cleanup: vi.fn(async () => undefined), + }; + const tracePath = join(homeDir, '.ktx/ingest-traces/job-1/trace.jsonl'); + const trace = new FileIngestTraceWriter({ + tracePath, + jobId: 'job-1', + connectionId: 'c1', + sourceKey: 'fake', + level: 'trace', + }); + + const result = await runIsolatedWorkUnit({ + unitIndex: 0, + ingestionBaseSha: baseSha, + sessionWorktreeService: sessionWorktreeService as never, + patchDir: join(homeDir, '.ktx/ingest-patches/job-1'), + trace, + run: async (child) => { + await mkdir(join(child.workdir, 'wiki/global'), { recursive: true }); + await writeFile(join(child.workdir, 'wiki/global/a.md'), '---\nsummary: A\nusage_mode: auto\n---\n\nBody\n'); + await child.git.commitFiles(['wiki/global/a.md'], 'test: write wiki', 'KTX Test', 'system@ktx.local'); + return { + unitKey: 'wu-1', + status: 'success', + preSha: baseSha, + postSha: await child.git.revParseHead(), + actions: [{ target: 'wiki', type: 'created', key: 'a', detail: 'A' }], + touchedSlSources: [], + }; + }, + workUnit: { unitKey: 'wu-1', rawFiles: ['a.json'], peerFileIndex: [], dependencyPaths: [] }, + }); + + expect(sessionWorktreeService.create).toHaveBeenCalledWith('job-1-wu-1', baseSha); + expect(sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success'); + expect(result.status).toBe('success'); + expect(result.patchPath).toContain('0000-wu-1.patch'); + await expect(readFile(result.patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md'); + await expect(readFile(tracePath, 'utf-8')).resolves.toContain('work_unit_child_created'); + }); +}); diff --git a/packages/context/src/ingest/isolated-diff/work-unit-executor.ts b/packages/context/src/ingest/isolated-diff/work-unit-executor.ts new file mode 100644 index 00000000..04c99c47 --- /dev/null +++ b/packages/context/src/ingest/isolated-diff/work-unit-executor.ts @@ -0,0 +1,87 @@ +import { mkdir, readFile } from 'node:fs/promises'; +import { join } from 'node:path'; +import type { SessionOutcome } from '../../core/index.js'; +import type { IngestSessionWorktree, IngestSessionWorktreePort } from '../ports.js'; +import type { WorkUnit } from '../types.js'; +import type { IngestTraceWriter } from '../ingest-trace.js'; +import type { WorkUnitOutcome } from '../stages/stage-3-work-units.js'; +import { assertPatchAllowedForWorkUnit } from './git-patch.js'; + +export interface RunIsolatedWorkUnitInput { + unitIndex: number; + ingestionBaseSha: string; + sessionWorktreeService: IngestSessionWorktreePort; + patchDir: string; + trace: IngestTraceWriter; + workUnit: WorkUnit; + run(child: IngestSessionWorktree): Promise; +} + +function patchFileName(unitIndex: number, unitKey: string): string { + const safeKey = unitKey.replace(/[^a-zA-Z0-9_.-]+/g, '-'); + return `${String(unitIndex).padStart(4, '0')}-${safeKey}.patch`; +} + +export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Promise { + const sessionKey = `${input.trace.context.jobId}-${input.workUnit.unitKey}`; + let cleanupOutcome: SessionOutcome = 'crash'; + const child = await input.sessionWorktreeService.create(sessionKey, input.ingestionBaseSha); + await input.trace.event('debug', 'work_unit', 'work_unit_child_created', { + unitKey: input.workUnit.unitKey, + unitIndex: input.unitIndex, + worktreePath: child.workdir, + baseSha: input.ingestionBaseSha, + }); + + try { + const outcome = await input.run(child); + if (outcome.status !== 'success') { + cleanupOutcome = 'crash'; + await input.trace.event('error', 'work_unit', 'work_unit_failed_before_patch', { + unitKey: input.workUnit.unitKey, + reason: outcome.reason ?? 'unknown failure', + }); + return { ...outcome, childWorktreePath: child.workdir }; + } + + await mkdir(input.patchDir, { recursive: true }); + const patchPath = join(input.patchDir, patchFileName(input.unitIndex, input.workUnit.unitKey)); + await child.git.writeBinaryNoRenamePatch(input.ingestionBaseSha, 'HEAD', patchPath); + const patch = await readFile(patchPath, 'utf-8'); + const touched = assertPatchAllowedForWorkUnit({ + unitKey: input.workUnit.unitKey, + patch, + slDisallowed: input.workUnit.slDisallowed === true, + }); + cleanupOutcome = 'success'; + await input.trace.event('debug', 'work_unit', 'work_unit_patch_collected', { + unitKey: input.workUnit.unitKey, + patchPath, + touchedPaths: touched.map((entry) => entry.path), + patchBytes: Buffer.byteLength(patch), + }); + return { + ...outcome, + patchPath, + patchTouchedPaths: touched.map((entry) => entry.path), + childWorktreePath: child.workdir, + }; + } catch (error) { + cleanupOutcome = 'crash'; + await input.trace.event( + 'error', + 'work_unit', + 'work_unit_child_failed', + { unitKey: input.workUnit.unitKey, worktreePath: child.workdir }, + error, + ); + throw error; + } finally { + await input.sessionWorktreeService.cleanup(child, cleanupOutcome); + await input.trace.event('trace', 'work_unit', 'work_unit_child_cleanup', { + unitKey: input.workUnit.unitKey, + outcome: cleanupOutcome, + worktreePath: child.workdir, + }); + } +} diff --git a/packages/context/src/ingest/stages/stage-3-work-units.ts b/packages/context/src/ingest/stages/stage-3-work-units.ts index dde6efbe..caa78ebf 100644 --- a/packages/context/src/ingest/stages/stage-3-work-units.ts +++ b/packages/context/src/ingest/stages/stage-3-work-units.ts @@ -41,6 +41,9 @@ export interface WorkUnitOutcome { touchedSlSources: TouchedSlSource[]; slDisallowed?: boolean; slDisallowedReason?: 'lookml_connection_mismatch'; + patchPath?: string; + patchTouchedPaths?: string[]; + childWorktreePath?: string; } export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit): Promise {