mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-13 08:15:14 +02:00
fix(ingest): trace policy conflicts and cleanup child worktrees
This commit is contained in:
parent
86837dd3ed
commit
46455e74d1
5 changed files with 117 additions and 16 deletions
|
|
@ -646,7 +646,10 @@ describe('IngestBundleRunner isolated diff path', () => {
|
|||
|
||||
await expect(
|
||||
runner.run({ jobId: 'job-sl-disallowed', connectionId: 'warehouse', sourceKey: 'metabase', trigger: 'upload', bundleRef: { kind: 'upload', uploadId: 'upload' } }),
|
||||
).rejects.toThrow(/slDisallowed WorkUnit lookml-mismatch touched semantic-layer\/warehouse\/orders.yaml/);
|
||||
).rejects.toThrow(/isolated diff textual conflict/);
|
||||
const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-sl-disallowed/trace.jsonl'), 'utf-8');
|
||||
expect(trace).toContain('patch_policy_rejected');
|
||||
expect(trace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/warehouse/orders.yaml');
|
||||
} finally {
|
||||
await rm(runtime.homeDir, { recursive: true, force: true });
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,4 +89,43 @@ describe('integrateWorkUnitPatch', () => {
|
|||
expect(result.status).toBe('semantic_conflict');
|
||||
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n');
|
||||
});
|
||||
|
||||
it('classifies slDisallowed patch policy failures as traced textual conflicts', async () => {
|
||||
const { homeDir, configDir, git, baseSha } = await makeRepo();
|
||||
await mkdir(join(configDir, 'semantic-layer/c1'), { recursive: true });
|
||||
await git.commitFiles(['semantic-layer/c1'], 'empty sl dir', 'System User', 'system@example.com');
|
||||
const childDir = join(homeDir, 'child-policy');
|
||||
await git.addWorktree(childDir, 'child-policy', baseSha);
|
||||
const childGit = git.forWorktree(childDir);
|
||||
await mkdir(join(childDir, 'semantic-layer/c1'), { recursive: true });
|
||||
await writeFile(join(childDir, 'semantic-layer/c1/orders.yaml'), 'name: orders\ncolumns: []\njoins: []\nmeasures: []\n');
|
||||
await childGit.commitFiles(['semantic-layer/c1/orders.yaml'], 'forbidden sl', 'System User', 'system@example.com');
|
||||
const patchPath = join(homeDir, 'patches/forbidden.patch');
|
||||
await childGit.writeBinaryNoRenamePatch(baseSha, 'HEAD', patchPath);
|
||||
const trace = new FileIngestTraceWriter({
|
||||
tracePath: join(homeDir, '.ktx/ingest-traces/job-policy/trace.jsonl'),
|
||||
jobId: 'job-policy',
|
||||
connectionId: 'c1',
|
||||
sourceKey: 'fake',
|
||||
level: 'trace',
|
||||
});
|
||||
|
||||
const result = await integrateWorkUnitPatch({
|
||||
unitKey: 'lookml-mismatch',
|
||||
patchPath,
|
||||
integrationGit: git,
|
||||
trace,
|
||||
author: { name: 'KTX Test', email: 'system@ktx.local' },
|
||||
validateAppliedTree: vi.fn().mockResolvedValue(undefined),
|
||||
slDisallowed: true,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
status: 'textual_conflict',
|
||||
touchedPaths: ['semantic-layer/c1/orders.yaml'],
|
||||
});
|
||||
const rawTrace = await readFile(trace.tracePath, 'utf-8');
|
||||
expect(rawTrace).toContain('patch_policy_rejected');
|
||||
expect(rawTrace).toContain('slDisallowed WorkUnit lookml-mismatch touched semantic-layer/c1/orders.yaml');
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import { readFile } from 'node:fs/promises';
|
|||
import type { GitService } from '../../core/index.js';
|
||||
import type { IngestTraceWriter } from '../ingest-trace.js';
|
||||
import { traceTimed } from '../ingest-trace.js';
|
||||
import { assertPatchAllowedForWorkUnit } from './git-patch.js';
|
||||
import { assertPatchAllowedForWorkUnit, parsePatchTouchedPaths } from './git-patch.js';
|
||||
|
||||
export type PatchIntegrationResult =
|
||||
| { status: 'accepted'; commitSha: string; touchedPaths: string[] }
|
||||
|
|
@ -26,12 +26,26 @@ function errorMessage(error: unknown): string {
|
|||
export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput): Promise<PatchIntegrationResult> {
|
||||
const preApplyHead = await input.integrationGit.revParseHead();
|
||||
const patch = await readFile(input.patchPath, 'utf-8');
|
||||
const touched = assertPatchAllowedForWorkUnit({
|
||||
unitKey: input.unitKey,
|
||||
patch,
|
||||
slDisallowed: input.slDisallowed,
|
||||
});
|
||||
const touchedPaths = touched.map((entry) => entry.path);
|
||||
const touchedPaths = parsePatchTouchedPaths(patch).map((entry) => entry.path);
|
||||
try {
|
||||
assertPatchAllowedForWorkUnit({
|
||||
unitKey: input.unitKey,
|
||||
patch,
|
||||
slDisallowed: input.slDisallowed,
|
||||
});
|
||||
} catch (error) {
|
||||
await input.trace.event('error', 'integration', 'patch_policy_rejected', {
|
||||
unitKey: input.unitKey,
|
||||
patchPath: input.patchPath,
|
||||
touchedPaths,
|
||||
reason: errorMessage(error),
|
||||
});
|
||||
return {
|
||||
status: 'textual_conflict',
|
||||
reason: errorMessage(error),
|
||||
touchedPaths,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
await traceTimed(
|
||||
|
|
|
|||
|
|
@ -92,4 +92,53 @@ describe('runIsolatedWorkUnit', () => {
|
|||
await expect(readFile(patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md');
|
||||
await expect(readFile(tracePath, 'utf-8')).resolves.toContain('work_unit_child_created');
|
||||
});
|
||||
|
||||
it('removes child worktrees after failed WorkUnit outcomes are traced', async () => {
|
||||
const { homeDir, git, baseSha } = await makeGit();
|
||||
const childDir = join(homeDir, '.worktrees/session-job-1-wu-fail');
|
||||
const sessionWorktreeService = {
|
||||
create: vi.fn(async (_key: string, startSha: string) => {
|
||||
await mkdir(join(homeDir, '.worktrees'), { recursive: true });
|
||||
await git.addWorktree(childDir, 'session/job-1-wu-fail', startSha);
|
||||
return {
|
||||
chatId: 'job-1-wu-fail',
|
||||
workdir: childDir,
|
||||
branch: 'session/job-1-wu-fail',
|
||||
baseSha: startSha,
|
||||
createdAt: new Date(),
|
||||
git: git.forWorktree(childDir),
|
||||
config: {},
|
||||
};
|
||||
}),
|
||||
cleanup: vi.fn(async () => undefined),
|
||||
};
|
||||
const trace = new FileIngestTraceWriter({
|
||||
tracePath: join(homeDir, '.ktx/ingest-traces/job-1/trace.jsonl'),
|
||||
jobId: 'job-1',
|
||||
connectionId: 'c1',
|
||||
sourceKey: 'fake',
|
||||
level: 'trace',
|
||||
});
|
||||
|
||||
const result = await runIsolatedWorkUnit({
|
||||
unitIndex: 0,
|
||||
ingestionBaseSha: baseSha,
|
||||
sessionWorktreeService: sessionWorktreeService as never,
|
||||
patchDir: join(homeDir, '.ktx/ingest-patches/job-1'),
|
||||
trace,
|
||||
run: async () => ({
|
||||
unitKey: 'wu-fail',
|
||||
status: 'failed',
|
||||
reason: 'agent loop errored',
|
||||
preSha: baseSha,
|
||||
postSha: baseSha,
|
||||
actions: [],
|
||||
touchedSlSources: [],
|
||||
}),
|
||||
workUnit: { unitKey: 'wu-fail', rawFiles: ['a.json'], peerFileIndex: [], dependencyPaths: [] },
|
||||
});
|
||||
|
||||
expect(result.status).toBe('failed');
|
||||
expect(sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success');
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import type { IngestSessionWorktree, IngestSessionWorktreePort } from '../ports.
|
|||
import type { WorkUnit } from '../types.js';
|
||||
import type { IngestTraceWriter } from '../ingest-trace.js';
|
||||
import type { WorkUnitOutcome } from '../stages/stage-3-work-units.js';
|
||||
import { assertPatchAllowedForWorkUnit } from './git-patch.js';
|
||||
import { parsePatchTouchedPaths } from './git-patch.js';
|
||||
|
||||
export interface RunIsolatedWorkUnitInput {
|
||||
unitIndex: number;
|
||||
|
|
@ -36,7 +36,7 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
|
|||
try {
|
||||
const outcome = await input.run(child);
|
||||
if (outcome.status !== 'success') {
|
||||
cleanupOutcome = 'crash';
|
||||
cleanupOutcome = 'success';
|
||||
await input.trace.event('error', 'work_unit', 'work_unit_failed_before_patch', {
|
||||
unitKey: input.workUnit.unitKey,
|
||||
reason: outcome.reason ?? 'unknown failure',
|
||||
|
|
@ -48,11 +48,7 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
|
|||
const patchPath = join(input.patchDir, patchFileName(input.unitIndex, input.workUnit.unitKey));
|
||||
await child.git.writeBinaryNoRenamePatch(input.ingestionBaseSha, 'HEAD', patchPath);
|
||||
const patch = await readFile(patchPath, 'utf-8');
|
||||
const touched = assertPatchAllowedForWorkUnit({
|
||||
unitKey: input.workUnit.unitKey,
|
||||
patch,
|
||||
slDisallowed: input.workUnit.slDisallowed === true,
|
||||
});
|
||||
const touched = parsePatchTouchedPaths(patch);
|
||||
cleanupOutcome = 'success';
|
||||
await input.trace.event('debug', 'work_unit', 'work_unit_patch_collected', {
|
||||
unitKey: input.workUnit.unitKey,
|
||||
|
|
@ -67,7 +63,6 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
|
|||
childWorktreePath: child.workdir,
|
||||
};
|
||||
} catch (error) {
|
||||
cleanupOutcome = 'crash';
|
||||
await input.trace.event(
|
||||
'error',
|
||||
'work_unit',
|
||||
|
|
@ -75,6 +70,7 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
|
|||
{ unitKey: input.workUnit.unitKey, worktreePath: child.workdir },
|
||||
error,
|
||||
);
|
||||
cleanupOutcome = 'success';
|
||||
throw error;
|
||||
} finally {
|
||||
await input.sessionWorktreeService.cleanup(child, cleanupOutcome);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue