mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat: execute ingest work units in child worktrees
This commit is contained in:
parent
43e6d4542d
commit
c2edec84c2
3 changed files with 178 additions and 0 deletions
|
|
@ -0,0 +1,88 @@
|
|||
import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { GitService } from '../../core/index.js';
|
||||
import { FileIngestTraceWriter } from '../ingest-trace.js';
|
||||
import { runIsolatedWorkUnit } from './work-unit-executor.js';
|
||||
|
||||
async function makeGit() {
|
||||
const homeDir = await mkdtemp(join(tmpdir(), 'ktx-isolated-wu-'));
|
||||
const configDir = join(homeDir, 'config');
|
||||
const git = new GitService({
|
||||
storage: { configDir, homeDir },
|
||||
git: {
|
||||
userName: 'System User',
|
||||
userEmail: 'system@example.com',
|
||||
bootstrapMessage: 'init',
|
||||
bootstrapAuthor: 'system',
|
||||
bootstrapAuthorEmail: 'system@example.com',
|
||||
},
|
||||
});
|
||||
await git.onModuleInit();
|
||||
await mkdir(join(configDir, 'raw-sources/c1/fake/s'), { recursive: true });
|
||||
await writeFile(join(configDir, 'raw-sources/c1/fake/s/a.json'), '{}\n');
|
||||
await git.commitFiles(['raw-sources/c1/fake/s/a.json'], 'raw snapshot', 'System User', 'system@example.com');
|
||||
return { homeDir, configDir, git, baseSha: await git.revParseHead() };
|
||||
}
|
||||
|
||||
describe('runIsolatedWorkUnit', () => {
|
||||
it('creates a child worktree at the ingestion base and persists a patch proposal', async () => {
|
||||
const { homeDir, git, baseSha } = await makeGit();
|
||||
const childDir = join(homeDir, '.worktrees/session-job-1-wu-1');
|
||||
const sessionWorktreeService = {
|
||||
create: vi.fn(async (_key: string, startSha: string) => {
|
||||
await mkdir(join(homeDir, '.worktrees'), { recursive: true });
|
||||
await git.addWorktree(childDir, 'session/job-1-wu-1', startSha);
|
||||
const childGit = git.forWorktree(childDir);
|
||||
return {
|
||||
chatId: 'job-1-wu-1',
|
||||
workdir: childDir,
|
||||
branch: 'session/job-1-wu-1',
|
||||
baseSha: startSha,
|
||||
createdAt: new Date(),
|
||||
git: childGit,
|
||||
config: {},
|
||||
};
|
||||
}),
|
||||
cleanup: vi.fn(async () => undefined),
|
||||
};
|
||||
const tracePath = join(homeDir, '.ktx/ingest-traces/job-1/trace.jsonl');
|
||||
const trace = new FileIngestTraceWriter({
|
||||
tracePath,
|
||||
jobId: 'job-1',
|
||||
connectionId: 'c1',
|
||||
sourceKey: 'fake',
|
||||
level: 'trace',
|
||||
});
|
||||
|
||||
const result = await runIsolatedWorkUnit({
|
||||
unitIndex: 0,
|
||||
ingestionBaseSha: baseSha,
|
||||
sessionWorktreeService: sessionWorktreeService as never,
|
||||
patchDir: join(homeDir, '.ktx/ingest-patches/job-1'),
|
||||
trace,
|
||||
run: async (child) => {
|
||||
await mkdir(join(child.workdir, 'wiki/global'), { recursive: true });
|
||||
await writeFile(join(child.workdir, 'wiki/global/a.md'), '---\nsummary: A\nusage_mode: auto\n---\n\nBody\n');
|
||||
await child.git.commitFiles(['wiki/global/a.md'], 'test: write wiki', 'KTX Test', 'system@ktx.local');
|
||||
return {
|
||||
unitKey: 'wu-1',
|
||||
status: 'success',
|
||||
preSha: baseSha,
|
||||
postSha: await child.git.revParseHead(),
|
||||
actions: [{ target: 'wiki', type: 'created', key: 'a', detail: 'A' }],
|
||||
touchedSlSources: [],
|
||||
};
|
||||
},
|
||||
workUnit: { unitKey: 'wu-1', rawFiles: ['a.json'], peerFileIndex: [], dependencyPaths: [] },
|
||||
});
|
||||
|
||||
expect(sessionWorktreeService.create).toHaveBeenCalledWith('job-1-wu-1', baseSha);
|
||||
expect(sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success');
|
||||
expect(result.status).toBe('success');
|
||||
expect(result.patchPath).toContain('0000-wu-1.patch');
|
||||
await expect(readFile(result.patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md');
|
||||
await expect(readFile(tracePath, 'utf-8')).resolves.toContain('work_unit_child_created');
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
import { mkdir, readFile } from 'node:fs/promises';
|
||||
import { join } from 'node:path';
|
||||
import type { SessionOutcome } from '../../core/index.js';
|
||||
import type { IngestSessionWorktree, IngestSessionWorktreePort } from '../ports.js';
|
||||
import type { WorkUnit } from '../types.js';
|
||||
import type { IngestTraceWriter } from '../ingest-trace.js';
|
||||
import type { WorkUnitOutcome } from '../stages/stage-3-work-units.js';
|
||||
import { assertPatchAllowedForWorkUnit } from './git-patch.js';
|
||||
|
||||
export interface RunIsolatedWorkUnitInput {
|
||||
unitIndex: number;
|
||||
ingestionBaseSha: string;
|
||||
sessionWorktreeService: IngestSessionWorktreePort;
|
||||
patchDir: string;
|
||||
trace: IngestTraceWriter;
|
||||
workUnit: WorkUnit;
|
||||
run(child: IngestSessionWorktree): Promise<WorkUnitOutcome>;
|
||||
}
|
||||
|
||||
function patchFileName(unitIndex: number, unitKey: string): string {
|
||||
const safeKey = unitKey.replace(/[^a-zA-Z0-9_.-]+/g, '-');
|
||||
return `${String(unitIndex).padStart(4, '0')}-${safeKey}.patch`;
|
||||
}
|
||||
|
||||
export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Promise<WorkUnitOutcome> {
|
||||
const sessionKey = `${input.trace.context.jobId}-${input.workUnit.unitKey}`;
|
||||
let cleanupOutcome: SessionOutcome = 'crash';
|
||||
const child = await input.sessionWorktreeService.create(sessionKey, input.ingestionBaseSha);
|
||||
await input.trace.event('debug', 'work_unit', 'work_unit_child_created', {
|
||||
unitKey: input.workUnit.unitKey,
|
||||
unitIndex: input.unitIndex,
|
||||
worktreePath: child.workdir,
|
||||
baseSha: input.ingestionBaseSha,
|
||||
});
|
||||
|
||||
try {
|
||||
const outcome = await input.run(child);
|
||||
if (outcome.status !== 'success') {
|
||||
cleanupOutcome = 'crash';
|
||||
await input.trace.event('error', 'work_unit', 'work_unit_failed_before_patch', {
|
||||
unitKey: input.workUnit.unitKey,
|
||||
reason: outcome.reason ?? 'unknown failure',
|
||||
});
|
||||
return { ...outcome, childWorktreePath: child.workdir };
|
||||
}
|
||||
|
||||
await mkdir(input.patchDir, { recursive: true });
|
||||
const patchPath = join(input.patchDir, patchFileName(input.unitIndex, input.workUnit.unitKey));
|
||||
await child.git.writeBinaryNoRenamePatch(input.ingestionBaseSha, 'HEAD', patchPath);
|
||||
const patch = await readFile(patchPath, 'utf-8');
|
||||
const touched = assertPatchAllowedForWorkUnit({
|
||||
unitKey: input.workUnit.unitKey,
|
||||
patch,
|
||||
slDisallowed: input.workUnit.slDisallowed === true,
|
||||
});
|
||||
cleanupOutcome = 'success';
|
||||
await input.trace.event('debug', 'work_unit', 'work_unit_patch_collected', {
|
||||
unitKey: input.workUnit.unitKey,
|
||||
patchPath,
|
||||
touchedPaths: touched.map((entry) => entry.path),
|
||||
patchBytes: Buffer.byteLength(patch),
|
||||
});
|
||||
return {
|
||||
...outcome,
|
||||
patchPath,
|
||||
patchTouchedPaths: touched.map((entry) => entry.path),
|
||||
childWorktreePath: child.workdir,
|
||||
};
|
||||
} catch (error) {
|
||||
cleanupOutcome = 'crash';
|
||||
await input.trace.event(
|
||||
'error',
|
||||
'work_unit',
|
||||
'work_unit_child_failed',
|
||||
{ unitKey: input.workUnit.unitKey, worktreePath: child.workdir },
|
||||
error,
|
||||
);
|
||||
throw error;
|
||||
} finally {
|
||||
await input.sessionWorktreeService.cleanup(child, cleanupOutcome);
|
||||
await input.trace.event('trace', 'work_unit', 'work_unit_child_cleanup', {
|
||||
unitKey: input.workUnit.unitKey,
|
||||
outcome: cleanupOutcome,
|
||||
worktreePath: child.workdir,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -41,6 +41,9 @@ export interface WorkUnitOutcome {
|
|||
touchedSlSources: TouchedSlSource[];
|
||||
slDisallowed?: boolean;
|
||||
slDisallowedReason?: 'lookml_connection_mismatch';
|
||||
patchPath?: string;
|
||||
patchTouchedPaths?: string[];
|
||||
childWorktreePath?: string;
|
||||
}
|
||||
|
||||
export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit): Promise<WorkUnitOutcome> {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue