diff --git a/packages/cli/src/context/ingest/artifact-gates.ts b/packages/cli/src/context/ingest/artifact-gates.ts index 35735575..a67f8455 100644 --- a/packages/cli/src/context/ingest/artifact-gates.ts +++ b/packages/cli/src/context/ingest/artifact-gates.ts @@ -2,20 +2,16 @@ import type { SemanticLayerService } from '../../context/sl/semantic-layer.servi import type { TouchedSlSource } from '../../context/tools/touched-sl-sources.js'; import type { KnowledgeWikiService } from '../../context/wiki/knowledge-wiki.service.js'; import { findMissingWikiRefs } from '../wiki/wiki-ref-validation.js'; +import type { WuValidationResult } from './stages/validate-wu-sources.js'; import { findInvalidWikiBodyRefs } from './wiki-body-refs.js'; -interface TouchedValidationResult { - invalidSources: string[]; - validSources: string[]; -} - export interface FinalArtifactGateInput { connectionIds: string[]; changedWikiPageKeys: string[]; touchedSlSources: TouchedSlSource[]; wikiService: KnowledgeWikiService; semanticLayerService: SemanticLayerService; - validateTouchedSources(touched: TouchedSlSource[]): Promise; + validateTouchedSources(touched: TouchedSlSource[]): Promise; tableExists(connectionId: string, tableRef: string): Promise; } @@ -40,54 +36,6 @@ function slEntityNames(source: Awaited(); - const unique: TouchedSlSource[] = []; - for (const source of sources) { - const key = `${source.connectionId}:${source.sourceName}`; - if (seen.has(key)) { - continue; - } - seen.add(key); - unique.push(source); - } - return unique.sort((left, right) => { - const byConnection = left.connectionId.localeCompare(right.connectionId); - return byConnection === 0 ? left.sourceName.localeCompare(right.sourceName) : byConnection; - }); -} - -async function expandTouchedSlSourcesWithDirectJoinNeighbors(input: FinalArtifactGateInput): Promise { - const expanded = [...input.touchedSlSources]; - const touchedByConnection = new Map>(); - for (const source of input.touchedSlSources) { - const bucket = touchedByConnection.get(source.connectionId) ?? new Set(); - bucket.add(source.sourceName); - touchedByConnection.set(source.connectionId, bucket); - } - - for (const connectionId of input.connectionIds) { - const touched = touchedByConnection.get(connectionId); - if (!touched || touched.size === 0) { - continue; - } - const { sources } = await input.semanticLayerService.loadAllSources(connectionId); - for (const source of sources) { - const sourceIsTouched = touched.has(source.name); - if (sourceIsTouched) { - for (const join of source.joins ?? []) { - expanded.push({ connectionId, sourceName: join.to }); - } - } - if ((source.joins ?? []).some((join) => touched.has(join.to))) { - expanded.push({ connectionId, sourceName: source.name }); - } - } - } - - return uniqueTouchedSources(expanded); -} - async function validateWikiSlRefs(input: FinalArtifactGateInput): Promise { const errors: string[] = []; const sourcesByConnection = new Map>['sources']>(); @@ -146,9 +94,13 @@ async function validateWikiRefs(input: FinalArtifactGateInput): Promise { - const touchedWithDependencies = await expandTouchedSlSourcesWithDirectJoinNeighbors(input); - const validation = await input.validateTouchedSources(touchedWithDependencies); - const errors: string[] = validation.invalidSources.map((source) => `semantic-layer validation failed for ${source}`); + // Join-neighbor expansion happens inside validateTouchedSources so work-unit + // validation and this gate check the same set — a source that passes one + // passes the other. + const validation = await input.validateTouchedSources(input.touchedSlSources); + const errors: string[] = validation.invalidSources.map( + (invalid) => `semantic-layer validation failed for ${invalid.source}: ${invalid.errors.join('; ')}`, + ); errors.push(...(await validateWikiSlRefs(input))); const danglingWikiRefs = await validateWikiRefs(input); if (danglingWikiRefs.length > 0) { diff --git a/packages/cli/src/context/ingest/constrained-repair.ts b/packages/cli/src/context/ingest/constrained-repair.ts new file mode 100644 index 00000000..104eaefc --- /dev/null +++ b/packages/cli/src/context/ingest/constrained-repair.ts @@ -0,0 +1,225 @@ +import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import { z } from 'zod'; +import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../context/llm/runtime-port.js'; +import type { IngestTraceWriter } from './ingest-trace.js'; +import { traceTimed } from './ingest-trace.js'; + +/** + * Shared loop for the two integration-time repair agents (semantic gate + * repair, textual conflict resolution). Success is decided by re-running the + * failed check — `verify` — never by whether the agent edited files: an + * ineffective edit fails, and an explicit no-change declaration that verifies + * succeeds. + */ + +export type RepairVerification = { ok: true } | { ok: false; reason: string }; + +export type ConstrainedRepairResult = + | { status: 'repaired'; attempts: number; changedPaths: string[] } + | { status: 'failed'; attempts: number; reason: string }; + +export interface ConstrainedRepairToolContext { + workdir: string; + allowedPaths: ReadonlySet; + editedPaths: Set; + declareNoChange(reason: string): void; +} + +export interface ConstrainedRepairLoopInput { + agentRunner: AgentRunnerPort; + workdir: string; + allowedPaths: string[]; + trace: IngestTraceWriter; + tracePhase: string; + traceEventName: string; + traceData: Record; + systemPrompt: string; + buildUserPrompt(input: { attempt: number; maxAttempts: number; previousFailure: string | null }): string; + buildExtraTools?(context: ConstrainedRepairToolContext): KtxRuntimeToolSet; + verify(changedPaths: string[]): Promise; + /** Failure reason when an attempt neither edits nor declares no-change. */ + noChangeFailureReason: string; + telemetryTags: Record; + maxAttempts?: number; + stepBudget?: number; + abortSignal?: AbortSignal; +} + +const readRepairFileSchema = z.object({ + path: z.string().min(1), +}); + +const writeRepairFileSchema = z.object({ + path: z.string().min(1), + content: z.string(), +}); + +function normalizeRepoPath(path: string): string { + const normalized = path.replace(/\\/g, '/').replace(/^\/+/, ''); + const parts = normalized.split('/').filter((part) => part.length > 0); + if (parts.length === 0 || parts.some((part) => part === '.' || part === '..')) { + throw new Error(`repair path must be a repository-relative path: ${path}`); + } + return parts.join('/'); +} + +function assertAllowedPath(path: string, allowedPaths: ReadonlySet): string { + const normalized = normalizeRepoPath(path); + if (!allowedPaths.has(normalized)) { + throw new Error(`repair path not allowed: ${normalized}`); + } + return normalized; +} + +async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> { + try { + return { exists: true, content: await readFile(path, 'utf-8') }; + } catch (error) { + if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') { + return { exists: false, content: '' }; + } + throw error; + } +} + +function buildRepairFileTools(context: ConstrainedRepairToolContext): KtxRuntimeToolSet { + return { + read_repair_file: { + name: 'read_repair_file', + description: 'Read one allowed file from the integration worktree.', + inputSchema: readRepairFileSchema, + execute: async ({ path }: z.infer) => { + const normalized = assertAllowedPath(path, context.allowedPaths); + const file = await readOptionalFile(join(context.workdir, normalized)); + return { + markdown: file.exists ? file.content : `(missing file: ${normalized})`, + structured: { path: normalized, exists: file.exists }, + }; + }, + }, + write_repair_file: { + name: 'write_repair_file', + description: 'Replace one allowed integration worktree file with repaired text content.', + inputSchema: writeRepairFileSchema, + execute: async ({ path, content }: z.infer) => { + const normalized = assertAllowedPath(path, context.allowedPaths); + const fullPath = join(context.workdir, normalized); + await mkdir(dirname(fullPath), { recursive: true }); + await writeFile(fullPath, content, 'utf-8'); + context.editedPaths.add(normalized); + return { + markdown: `Wrote ${normalized}`, + structured: { path: normalized, bytes: Buffer.byteLength(content) }, + }; + }, + }, + }; +} + +export function buildDeleteRepairFileTool(context: ConstrainedRepairToolContext): KtxRuntimeToolSet { + const deleteRepairFileSchema = z.object({ + path: z.string().min(1), + }); + return { + delete_repair_file: { + name: 'delete_repair_file', + description: 'Delete one allowed integration worktree file when the failed patch proves the deletion is correct.', + inputSchema: deleteRepairFileSchema, + execute: async ({ path }: z.infer) => { + const normalized = assertAllowedPath(path, context.allowedPaths); + await rm(join(context.workdir, normalized), { force: true }); + context.editedPaths.add(normalized); + return { + markdown: `Deleted ${normalized}`, + structured: { path: normalized }, + }; + }, + }, + }; +} + +export async function runConstrainedRepairLoop(input: ConstrainedRepairLoopInput): Promise { + const allowedPaths = new Set(input.allowedPaths.map(normalizeRepoPath)); + const sortedAllowedPaths = [...allowedPaths].sort(); + const maxAttempts = input.maxAttempts ?? 2; + const stepBudget = input.stepBudget ?? 16; + // Edits persist in the worktree across attempts, so the verified set and the + // reported changedPaths accumulate over the whole loop. + const editedPaths = new Set(); + let lastFailure = 'repair did not run'; + let previousFailure: string | null = null; + + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + let noChangeDeclaration: string | null = null; + const toolContext: ConstrainedRepairToolContext = { + workdir: input.workdir, + allowedPaths, + editedPaths, + declareNoChange: (reason: string) => { + noChangeDeclaration = reason; + }, + }; + const traceData = { + ...input.traceData, + attempt, + maxAttempts, + allowedPaths: sortedAllowedPaths, + }; + const result = await traceTimed(input.trace, input.tracePhase, input.traceEventName, traceData, async () => + input.agentRunner.runLoop({ + modelRole: 'repair', + systemPrompt: input.systemPrompt, + userPrompt: input.buildUserPrompt({ attempt, maxAttempts, previousFailure }), + toolSet: { + ...buildRepairFileTools(toolContext), + ...(input.buildExtraTools?.(toolContext) ?? {}), + }, + stepBudget, + telemetryTags: input.telemetryTags, + abortSignal: input.abortSignal, + }), + ); + + if (result.stopReason === 'error') { + lastFailure = result.error?.message ?? 'repair agent loop errored'; + previousFailure = lastFailure; + await input.trace.event('error', input.tracePhase, `${input.traceEventName}_failed`, traceData, result.error); + continue; + } + + const changedPaths = [...editedPaths].sort(); + if (changedPaths.length === 0 && noChangeDeclaration === null) { + // Nothing changed and nothing was claimed: the failed check would fail + // identically, so skip verification and retry. + lastFailure = input.noChangeFailureReason; + previousFailure = lastFailure; + await input.trace.event('error', input.tracePhase, `${input.traceEventName}_failed`, { + ...traceData, + reason: lastFailure, + }); + continue; + } + + const verification = await input.verify(changedPaths); + if (!verification.ok) { + lastFailure = verification.reason; + previousFailure = lastFailure; + await input.trace.event('error', input.tracePhase, `${input.traceEventName}_failed`, { + ...traceData, + changedPaths, + reason: lastFailure, + }); + continue; + } + + await input.trace.event('debug', input.tracePhase, `${input.traceEventName}_repaired`, { + ...traceData, + changedPaths, + ...(noChangeDeclaration !== null ? { noChangeDeclaration } : {}), + }); + return { status: 'repaired', attempts: attempt, changedPaths }; + } + + return { status: 'failed', attempts: maxAttempts, reason: lastFailure }; +} diff --git a/packages/cli/src/context/ingest/final-gate-repair.ts b/packages/cli/src/context/ingest/final-gate-repair.ts index d6d05e41..ff2d1a9a 100644 --- a/packages/cli/src/context/ingest/final-gate-repair.ts +++ b/packages/cli/src/context/ingest/final-gate-repair.ts @@ -1,15 +1,12 @@ -import { mkdir, readFile, writeFile } from 'node:fs/promises'; -import { dirname, join } from 'node:path'; import { z } from 'zod'; import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../context/llm/runtime-port.js'; +import type { ConstrainedRepairResult, RepairVerification } from './constrained-repair.js'; +import { runConstrainedRepairLoop } from './constrained-repair.js'; import type { IngestTraceWriter } from './ingest-trace.js'; -import { traceTimed } from './ingest-trace.js'; type FinalGateRepairKind = 'patch_semantic_gate' | 'final_artifact_gate'; -export type FinalGateRepairResult = - | { status: 'repaired'; attempts: number; changedPaths: string[] } - | { status: 'failed'; attempts: number; reason: string }; +export type FinalGateRepairResult = ConstrainedRepairResult; export interface RepairFinalGateFailureInput { agentRunner: AgentRunnerPort; @@ -18,48 +15,17 @@ export interface RepairFinalGateFailureInput { allowedPaths: string[]; trace: IngestTraceWriter; repairKind: FinalGateRepairKind; + /** + * Re-runs the failed gate against the current worktree. The repair counts + * as successful only when this passes — editing files is not the success + * signal. + */ + verify(changedPaths: string[]): Promise; maxAttempts?: number; stepBudget?: number; abortSignal?: AbortSignal; } -const readRepairFileSchema = z.object({ - path: z.string().min(1), -}); - -const writeRepairFileSchema = z.object({ - path: z.string().min(1), - content: z.string(), -}); - -function normalizeRepoPath(path: string): string { - const normalized = path.replace(/\\/g, '/').replace(/^\/+/, ''); - const parts = normalized.split('/').filter((part) => part.length > 0); - if (parts.length === 0 || parts.some((part) => part === '.' || part === '..')) { - throw new Error(`gate repair path must be a repository-relative path: ${path}`); - } - return parts.join('/'); -} - -function assertAllowedPath(path: string, allowedPaths: ReadonlySet): string { - const normalized = normalizeRepoPath(path); - if (!allowedPaths.has(normalized)) { - throw new Error(`gate repair path not allowed: ${normalized}`); - } - return normalized; -} - -async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> { - try { - return { exists: true, content: await readFile(path, 'utf-8') }; - } catch (error) { - if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') { - return { exists: false, content: '' }; - } - throw error; - } -} - function buildGateRepairSystemPrompt(): string { return ` You repair one ktx isolated-diff artifact gate failure inside the integration worktree. @@ -82,7 +48,11 @@ function buildGateRepairUserPrompt(input: { repairKind: FinalGateRepairKind; attempt: number; maxAttempts: number; + previousFailure: string | null; }): string { + const previousFailureBlock = input.previousFailure + ? `\nPrevious attempt did not pass the gate:\n${input.previousFailure}\n` + : ''; return `Repair isolated-diff artifact gates. Repair kind: ${input.repairKind} @@ -93,56 +63,22 @@ ${input.allowedPaths.map((path) => `- ${path}`).join('\n')} Gate error: ${input.gateError} - +${previousFailureBlock} Use read_gate_error first. Then inspect only the allowed files, write the minimal repaired content, and stop.`; } -function buildToolSet(input: { - workdir: string; - gateError: string; - allowedPaths: ReadonlySet; - editedPaths: Set; -}): KtxRuntimeToolSet { +function buildReadGateErrorTool(gateError: string): KtxRuntimeToolSet { return { read_gate_error: { name: 'read_gate_error', description: 'Read the artifact gate failure that must be repaired.', inputSchema: z.object({}), execute: async () => ({ - markdown: input.gateError, - structured: { gateError: input.gateError }, + markdown: gateError, + structured: { gateError }, }), }, - read_repair_file: { - name: 'read_repair_file', - description: 'Read one allowed file from the integration worktree.', - inputSchema: readRepairFileSchema, - execute: async ({ path }: z.infer) => { - const normalized = assertAllowedPath(path, input.allowedPaths); - const file = await readOptionalFile(join(input.workdir, normalized)); - return { - markdown: file.exists ? file.content : `(missing file: ${normalized})`, - structured: { path: normalized, exists: file.exists }, - }; - }, - }, - write_repair_file: { - name: 'write_repair_file', - description: 'Replace one allowed integration worktree file with repaired text content.', - inputSchema: writeRepairFileSchema, - execute: async ({ path, content }: z.infer) => { - const normalized = assertAllowedPath(path, input.allowedPaths); - const fullPath = join(input.workdir, normalized); - await mkdir(dirname(fullPath), { recursive: true }); - await writeFile(fullPath, content, 'utf-8'); - input.editedPaths.add(normalized); - return { - markdown: `Wrote ${normalized}`, - structured: { path: normalized, bytes: Buffer.byteLength(content) }, - }; - }, - }, }; } @@ -163,71 +99,38 @@ export function finalGateRepairPaths(input: { export async function repairFinalGateFailure( input: RepairFinalGateFailureInput, ): Promise { - const allowedPaths = new Set(input.allowedPaths.map(normalizeRepoPath)); - const maxAttempts = input.maxAttempts ?? 1; - const stepBudget = input.stepBudget ?? 16; - let lastFailure = 'gate repair did not run'; - - for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { - const editedPaths = new Set(); - const sortedAllowedPaths = [...allowedPaths].sort(); - const traceData = { + return runConstrainedRepairLoop({ + agentRunner: input.agentRunner, + workdir: input.workdir, + allowedPaths: input.allowedPaths, + trace: input.trace, + tracePhase: 'gate_repair', + traceEventName: 'gate_repair', + traceData: { repairKind: input.repairKind, - attempt, - maxAttempts, - allowedPaths: sortedAllowedPaths, gateError: input.gateError, - }; - const result = await traceTimed(input.trace, 'gate_repair', 'gate_repair', traceData, async () => - input.agentRunner.runLoop({ - modelRole: 'repair', - systemPrompt: buildGateRepairSystemPrompt(), - userPrompt: buildGateRepairUserPrompt({ - gateError: input.gateError, - allowedPaths: sortedAllowedPaths, - repairKind: input.repairKind, - attempt, - maxAttempts, - }), - toolSet: buildToolSet({ - workdir: input.workdir, - gateError: input.gateError, - allowedPaths, - editedPaths, - }), - stepBudget, - telemetryTags: { - operationName: 'ingest-isolated-diff-gate-repair', - source: input.trace.context.sourceKey, - jobId: input.trace.context.jobId, - repairKind: input.repairKind, - }, - abortSignal: input.abortSignal, + }, + systemPrompt: buildGateRepairSystemPrompt(), + buildUserPrompt: ({ attempt, maxAttempts, previousFailure }) => + buildGateRepairUserPrompt({ + gateError: input.gateError, + allowedPaths: [...input.allowedPaths].sort(), + repairKind: input.repairKind, + attempt, + maxAttempts, + previousFailure, }), - ); - - if (result.stopReason === 'error') { - lastFailure = result.error?.message ?? 'gate repair agent loop errored'; - await input.trace.event('error', 'gate_repair', 'gate_repair_failed', traceData, result.error); - continue; - } - - const changedPaths = [...editedPaths].sort(); - if (changedPaths.length === 0) { - lastFailure = 'gate repair completed without editing an allowed path'; - await input.trace.event('error', 'gate_repair', 'gate_repair_failed', { - ...traceData, - reason: lastFailure, - }); - continue; - } - - await input.trace.event('debug', 'gate_repair', 'gate_repair_repaired', { - ...traceData, - changedPaths, - }); - return { status: 'repaired', attempts: attempt, changedPaths }; - } - - return { status: 'failed', attempts: maxAttempts, reason: lastFailure }; + buildExtraTools: () => buildReadGateErrorTool(input.gateError), + verify: input.verify, + noChangeFailureReason: 'gate repair completed without editing an allowed path', + telemetryTags: { + operationName: 'ingest-isolated-diff-gate-repair', + source: input.trace.context.sourceKey, + jobId: input.trace.context.jobId, + repairKind: input.repairKind, + }, + maxAttempts: input.maxAttempts, + stepBudget: input.stepBudget ?? 16, + abortSignal: input.abortSignal, + }); } diff --git a/packages/cli/src/context/ingest/ingest-bundle.runner.ts b/packages/cli/src/context/ingest/ingest-bundle.runner.ts index 0e5086e4..45953adf 100644 --- a/packages/cli/src/context/ingest/ingest-bundle.runner.ts +++ b/packages/cli/src/context/ingest/ingest-bundle.runner.ts @@ -1817,7 +1817,8 @@ export class IngestBundleRunner { touchedPaths: context.touchedPaths, trace: runTrace, reason: context.reason, - maxAttempts: 1, + verify: context.verify, + maxAttempts: 2, stepBudget: 12, abortSignal: ctx?.abortSignal, }); @@ -1839,7 +1840,8 @@ export class IngestBundleRunner { allowedPaths: context.touchedPaths, trace: runTrace, repairKind: 'patch_semantic_gate', - maxAttempts: 1, + verify: context.verify, + maxAttempts: 2, stepBudget: 16, abortSignal: ctx?.abortSignal, }); @@ -2546,40 +2548,41 @@ export class IngestBundleRunner { activePhase = 'final_gates'; activeFailureDetails = finalArtifactGateTraceData; emitStageProgress('final_gates', 89, 'Running final artifact gates'); + const runFinalArtifactGates = async () => { + await validateFinalIngestArtifacts({ + connectionIds: repairConnectionIds, + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }; try { await traceTimed( runTrace, 'final_gates', 'final_artifact_gates', finalArtifactGateTraceData, - async () => { - await validateFinalIngestArtifacts({ - connectionIds: repairConnectionIds, - changedWikiPageKeys: finalChangedWikiPageKeys, - touchedSlSources: finalTouchedSlSources, - wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - validateTouchedSources: (touched) => - validateWuTouchedSources( - { - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - connections: this.deps.connections, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - slSourcesRepository: this.deps.slSourcesRepository, - probeRowCount: this.deps.settings.probeRowCount, - slValidator: this.deps.slValidator, - }, - touched, - ), - tableExists: (connectionId, tableRef) => - this.tableRefExistsInSemanticLayer( - this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - [connectionId], - tableRef, - ), - }); - }, + runFinalArtifactGates, ); } catch (error) { const gateError = this.errorMessage(error); @@ -2595,7 +2598,15 @@ export class IngestBundleRunner { allowedPaths: repairPaths, trace: runTrace, repairKind: 'final_artifact_gate', - maxAttempts: 1, + verify: async () => { + try { + await runFinalArtifactGates(); + return { ok: true }; + } catch (verifyError) { + return { ok: false, reason: this.errorMessage(verifyError) }; + } + }, + maxAttempts: 2, stepBudget: 16, abortSignal: ctx?.abortSignal, }); @@ -2611,44 +2622,9 @@ export class IngestBundleRunner { throw new Error(`${gateError}\ngate repair failed: ${gateRepair.reason}`); } + // The repair loop re-ran the gates via `verify` before reporting + // success, so a repaired status here means the tree already passed. isolatedDiffSummary.gateRepairs += 1; - await traceTimed( - runTrace, - 'final_gates', - 'final_artifact_gates_after_gate_repair', - { - ...finalArtifactGateTraceData, - repairedPaths: gateRepair.changedPaths, - }, - async () => { - await validateFinalIngestArtifacts({ - connectionIds: repairConnectionIds, - changedWikiPageKeys: finalChangedWikiPageKeys, - touchedSlSources: finalTouchedSlSources, - wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - validateTouchedSources: (touched) => - validateWuTouchedSources( - { - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - connections: this.deps.connections, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - slSourcesRepository: this.deps.slSourcesRepository, - probeRowCount: this.deps.settings.probeRowCount, - slValidator: this.deps.slValidator, - }, - touched, - ), - tableExists: (connectionId, tableRef) => - this.tableRefExistsInSemanticLayer( - this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - [connectionId], - tableRef, - ), - }); - }, - ); const repairCommit = await sessionWorktree.git.commitFiles( gateRepair.changedPaths, diff --git a/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts b/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts index 1e2f0cee..04cc099b 100644 --- a/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts +++ b/packages/cli/src/context/ingest/isolated-diff/patch-integrator.ts @@ -1,35 +1,32 @@ import { readFile } from 'node:fs/promises'; import type { GitService } from '../../../context/core/git.service.js'; +import type { RepairVerification } from '../constrained-repair.js'; import type { FinalGateRepairResult } from '../final-gate-repair.js'; import type { IngestTraceWriter } from '../ingest-trace.js'; import { traceTimed } from '../ingest-trace.js'; import { assertPatchAllowedForWorkUnit, parsePatchTouchedPaths } from './git-patch.js'; import type { TextualConflictResolutionResult } from './textual-conflict-resolver.js'; -type PatchIntegrationTextualResolution = - | { status: 'repaired'; attempts: number; changedPaths: string[] } - | { status: 'failed'; attempts: number; reason: string }; - export type PatchIntegrationResult = | { status: 'accepted'; commitSha: string; touchedPaths: string[]; - textualResolution?: PatchIntegrationTextualResolution; + textualResolution?: TextualConflictResolutionResult; gateRepair?: FinalGateRepairResult; } | { status: 'textual_conflict'; reason: string; touchedPaths: string[]; - textualResolution?: PatchIntegrationTextualResolution; + textualResolution?: TextualConflictResolutionResult; gateRepair?: FinalGateRepairResult; } | { status: 'semantic_conflict'; reason: string; touchedPaths: string[]; - textualResolution?: PatchIntegrationTextualResolution; + textualResolution?: TextualConflictResolutionResult; gateRepair?: FinalGateRepairResult; }; @@ -47,12 +44,14 @@ export interface IntegrateWorkUnitPatchInput { patchPath: string; touchedPaths: string[]; reason: string; + verify(changedPaths: string[]): Promise; }): Promise; repairGateFailure?(input: { unitKey: string; patchPath: string; touchedPaths: string[]; reason: string; + verify(changedPaths: string[]): Promise; }): Promise; } @@ -94,6 +93,19 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) }; } + // Repair and resolution success is decided by this check, not by whether + // the repair agent edited files: the gates re-run over the union of the + // patch's paths and everything the agent changed. + const verifyAppliedTree = async (changedPaths: string[]): Promise => { + const paths = [...new Set([...touchedPaths, ...changedPaths])].sort(); + try { + await input.validateAppliedTree(paths); + return { ok: true }; + } catch (error) { + return { ok: false, reason: errorMessage(error) }; + } + }; + try { await traceTimed( input.trace, @@ -130,6 +142,7 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) patchPath: input.patchPath, touchedPaths, reason, + verify: verifyAppliedTree, }); if (textualResolution.status === 'failed') { @@ -144,115 +157,20 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) }; } - try { - await traceTimed( - input.trace, - 'integration', - 'semantic_gate_after_textual_resolution', - { unitKey: input.unitKey, touchedPaths: textualResolution.changedPaths }, - async () => { - await input.validateAppliedTree(textualResolution.changedPaths); - }, - ); - } catch (semanticError) { - const reason = errorMessage(semanticError); - await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', { + if (textualResolution.changedPaths.length === 0) { + // The resolver declared the patch redundant and the gates verified the + // current tree: the integration worktree already represents this work + // unit's content (e.g. a duplicate page created by another work unit). + await input.trace.event('debug', 'integration', 'patch_subsumed_after_textual_resolution', { unitKey: input.unitKey, patchPath: input.patchPath, - touchedPaths: textualResolution.changedPaths, - reason, + touchedPaths, + attempts: textualResolution.attempts, }); - - // A textual conflict and a semantic-gate failure can co-occur: the resolver - // reconciles the text but can leave wiki sl_refs pointing at measures the - // merged source no longer defines. Recover via the same gate repair the - // clean-apply branch uses, instead of hard-failing the whole job. - if (input.repairGateFailure) { - const gateRepair = await input.repairGateFailure({ - unitKey: input.unitKey, - patchPath: input.patchPath, - touchedPaths: textualResolution.changedPaths, - reason, - }); - - if (gateRepair.status !== 'failed') { - // The resolver wrote its merge to the worktree (unstaged); the repair - // edited a subset on top. Commit the union so neither is dropped. - const resolvedAndRepairedPaths = [ - ...new Set([...textualResolution.changedPaths, ...gateRepair.changedPaths]), - ].sort(); - try { - await traceTimed( - input.trace, - 'integration', - 'semantic_gate_after_gate_repair', - { unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths }, - async () => { - await input.validateAppliedTree(gateRepair.changedPaths); - }, - ); - - const commit = await input.integrationGit.commitFiles( - resolvedAndRepairedPaths, - `ingest: resolve WorkUnit ${input.unitKey} conflict`, - input.author.name, - input.author.email, - ); - if (commit.created) { - await input.trace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', { - unitKey: input.unitKey, - commitSha: commit.commitHash, - touchedPaths: resolvedAndRepairedPaths, - attempts: textualResolution.attempts, - gateRepairAttempts: gateRepair.attempts, - }); - return { - status: 'accepted', - commitSha: commit.commitHash, - touchedPaths: resolvedAndRepairedPaths, - textualResolution, - gateRepair, - }; - } - } catch (repairValidationError) { - if (preApplyHead) { - await input.integrationGit.resetHardTo(preApplyHead); - } - await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', { - unitKey: input.unitKey, - patchPath: input.patchPath, - touchedPaths: gateRepair.changedPaths, - reason: errorMessage(repairValidationError), - }); - return { - status: 'semantic_conflict', - reason: errorMessage(repairValidationError), - touchedPaths: gateRepair.changedPaths, - textualResolution, - gateRepair, - }; - } - } - - if (preApplyHead) { - await input.integrationGit.resetHardTo(preApplyHead); - } - return { - status: 'semantic_conflict', - reason: gateRepair.status === 'failed' ? gateRepair.reason : reason, - touchedPaths: textualResolution.changedPaths, - textualResolution, - gateRepair, - }; - } - - if (preApplyHead) { - await input.integrationGit.resetHardTo(preApplyHead); - } return { - status: 'semantic_conflict', - reason, - touchedPaths: textualResolution.changedPaths, + status: 'accepted', + commitSha: preApplyHead ?? '', + touchedPaths: [], textualResolution, }; } @@ -264,19 +182,18 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) input.author.email, ); if (!commit.created) { - if (preApplyHead) { - await input.integrationGit.resetHardTo(preApplyHead); - } - const noChangeReason = 'textual resolver produced no committable changes'; - await input.trace.event('error', 'integration', 'textual_conflict_resolver_noop', { + // The resolver's writes left the tree byte-identical to the accepted + // state, and the gates verified it — the patch is represented already. + await input.trace.event('debug', 'integration', 'patch_subsumed_after_textual_resolution', { unitKey: input.unitKey, patchPath: input.patchPath, touchedPaths: textualResolution.changedPaths, + attempts: textualResolution.attempts, }); return { - status: 'textual_conflict', - reason: noChangeReason, - touchedPaths: textualResolution.changedPaths, + status: 'accepted', + commitSha: preApplyHead ?? '', + touchedPaths: [], textualResolution, }; } @@ -314,6 +231,7 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) patchPath: input.patchPath, touchedPaths, reason, + verify: verifyAppliedTree, }); if (gateRepair.status === 'failed') { @@ -328,28 +246,6 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) }; } - try { - await traceTimed( - input.trace, - 'integration', - 'semantic_gate_after_gate_repair', - { unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths }, - async () => { - await input.validateAppliedTree(gateRepair.changedPaths); - }, - ); - } catch (repairValidationError) { - if (preApplyHead) { - await input.integrationGit.resetHardTo(preApplyHead); - } - return { - status: 'semantic_conflict', - reason: errorMessage(repairValidationError), - touchedPaths: gateRepair.changedPaths, - gateRepair, - }; - } - const commit = await input.integrationGit.commitFiles( gateRepair.changedPaths, `ingest: repair WorkUnit ${input.unitKey} gates`, diff --git a/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts b/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts index 8c5bb097..8633e7e1 100644 --- a/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts +++ b/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts @@ -1,13 +1,15 @@ -import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; -import { dirname, join } from 'node:path'; +import { readFile } from 'node:fs/promises'; import { z } from 'zod'; import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../../context/llm/runtime-port.js'; +import type { + ConstrainedRepairResult, + ConstrainedRepairToolContext, + RepairVerification, +} from '../constrained-repair.js'; +import { buildDeleteRepairFileTool, runConstrainedRepairLoop } from '../constrained-repair.js'; import type { IngestTraceWriter } from '../ingest-trace.js'; -import { traceTimed } from '../ingest-trace.js'; -export type TextualConflictResolutionResult = - | { status: 'repaired'; attempts: number; changedPaths: string[] } - | { status: 'failed'; attempts: number; reason: string }; +export type TextualConflictResolutionResult = ConstrainedRepairResult; export interface ResolveTextualConflictInput { agentRunner: AgentRunnerPort; @@ -17,52 +19,17 @@ export interface ResolveTextualConflictInput { touchedPaths: string[]; trace: IngestTraceWriter; reason: string; + /** + * Re-runs the artifact gates against the current worktree. A resolution — + * including an explicit no-change declaration for a redundant patch — + * counts as successful only when this passes. + */ + verify(changedPaths: string[]): Promise; maxAttempts?: number; stepBudget?: number; abortSignal?: AbortSignal; } -const readIntegrationFileSchema = z.object({ - path: z.string().min(1), -}); - -const writeIntegrationFileSchema = z.object({ - path: z.string().min(1), - content: z.string(), -}); - -const deleteIntegrationFileSchema = z.object({ - path: z.string().min(1), -}); - -function normalizeRepoPath(path: string): string { - const normalized = path.replace(/\\/g, '/').replace(/^\/+/, ''); - const parts = normalized.split('/').filter((part) => part.length > 0); - if (parts.length === 0 || parts.some((part) => part === '.' || part === '..')) { - throw new Error(`resolver path must be a repository-relative path: ${path}`); - } - return parts.join('/'); -} - -function assertAllowedPath(path: string, allowedPaths: ReadonlySet): string { - const normalized = normalizeRepoPath(path); - if (!allowedPaths.has(normalized)) { - throw new Error(`resolver path not allowed: ${normalized}`); - } - return normalized; -} - -async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> { - try { - return { exists: true, content: await readFile(path, 'utf-8') }; - } catch (error) { - if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') { - return { exists: false, content: '' }; - } - throw error; - } -} - function buildResolverSystemPrompt(): string { return ` You repair one failed ktx isolated-diff patch inside the integration worktree. @@ -71,10 +38,12 @@ You repair one failed ktx isolated-diff patch inside the integration worktree. - Preserve accepted integration content that is unrelated to the failed patch. - Incorporate the failed patch only when the patch evidence is compatible with the current file. +- If the current file already represents everything the failed patch contributes (for example a + duplicate page created by another work unit), call declare_patch_redundant instead of editing. - Edit only paths exposed by the resolver tools. - Prefer the smallest text edit that makes the composed artifact coherent. - Do not create new facts that are absent from the current file or failed patch. -- Stop after writing the repaired file content. +- Stop after writing the repaired file content or declaring the patch redundant. `; } @@ -85,7 +54,11 @@ function buildResolverUserPrompt(input: { reason: string; attempt: number; maxAttempts: number; + previousFailure: string | null; }): string { + const previousFailureBlock = input.previousFailure + ? `\nPrevious attempt did not pass the artifact gates:\n${input.previousFailure}\n` + : ''; return `Repair isolated-diff textual conflict. WorkUnit: ${input.unitKey} @@ -96,17 +69,22 @@ ${input.touchedPaths.map((path) => `- ${path}`).join('\n')} Git apply failure: ${input.reason} - -Use read_failed_patch first. Then read the touched integration files, write the -repaired content, and stop.`; +${previousFailureBlock} +Use read_failed_patch first. Then read the touched integration files and either +write the repaired content or, when the patch adds nothing the current files do +not already cover, call declare_patch_redundant. Then stop.`; } -function buildToolSet(input: { - workdir: string; +function buildResolverExtraTools(input: { patchPath: string; - allowedPaths: ReadonlySet; - editedPaths: Set; + context: ConstrainedRepairToolContext; }): KtxRuntimeToolSet { + const declareSchema = z.object({ + reason: z + .string() + .min(1) + .describe('Why the integration tree already represents everything this patch contributes.'), + }); return { read_failed_patch: { name: 'read_failed_patch', @@ -120,46 +98,18 @@ function buildToolSet(input: { }; }, }, - read_integration_file: { - name: 'read_integration_file', - description: 'Read one allowed file from the current integration worktree.', - inputSchema: readIntegrationFileSchema, - execute: async ({ path }: z.infer) => { - const normalized = assertAllowedPath(path, input.allowedPaths); - const file = await readOptionalFile(join(input.workdir, normalized)); + ...buildDeleteRepairFileTool(input.context), + declare_patch_redundant: { + name: 'declare_patch_redundant', + description: + 'Declare that the failed patch needs no integration because the current worktree already ' + + 'represents its content (for example a duplicate page created by another work unit).', + inputSchema: declareSchema, + execute: async ({ reason }: z.infer) => { + input.context.declareNoChange(reason); return { - markdown: file.exists ? file.content : `(missing file: ${normalized})`, - structured: { path: normalized, exists: file.exists }, - }; - }, - }, - write_integration_file: { - name: 'write_integration_file', - description: 'Replace one allowed integration worktree file with repaired text content.', - inputSchema: writeIntegrationFileSchema, - execute: async ({ path, content }: z.infer) => { - const normalized = assertAllowedPath(path, input.allowedPaths); - const fullPath = join(input.workdir, normalized); - await mkdir(dirname(fullPath), { recursive: true }); - await writeFile(fullPath, content, 'utf-8'); - input.editedPaths.add(normalized); - return { - markdown: `Wrote ${normalized}`, - structured: { path: normalized, bytes: Buffer.byteLength(content) }, - }; - }, - }, - delete_integration_file: { - name: 'delete_integration_file', - description: 'Delete one allowed integration worktree file when the failed patch proves the deletion is correct.', - inputSchema: deleteIntegrationFileSchema, - execute: async ({ path }: z.infer) => { - const normalized = assertAllowedPath(path, input.allowedPaths); - await rm(join(input.workdir, normalized), { force: true }); - input.editedPaths.add(normalized); - return { - markdown: `Deleted ${normalized}`, - structured: { path: normalized }, + markdown: `Declared patch redundant: ${reason}`, + structured: { reason }, }; }, }, @@ -169,72 +119,42 @@ function buildToolSet(input: { export async function resolveTextualConflict( input: ResolveTextualConflictInput, ): Promise { - const allowedPaths = new Set(input.touchedPaths.map(normalizeRepoPath)); - const maxAttempts = input.maxAttempts ?? 1; - const stepBudget = input.stepBudget ?? 12; - let lastFailure = 'resolver did not run'; - - for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { - const editedPaths = new Set(); - const traceData = { + const sortedTouchedPaths = [...input.touchedPaths].sort(); + return runConstrainedRepairLoop({ + agentRunner: input.agentRunner, + workdir: input.workdir, + allowedPaths: input.touchedPaths, + trace: input.trace, + tracePhase: 'resolver', + traceEventName: 'textual_conflict_resolver', + traceData: { unitKey: input.unitKey, patchPath: input.patchPath, - touchedPaths: [...allowedPaths].sort(), - attempt, - maxAttempts, + touchedPaths: sortedTouchedPaths, reason: input.reason, - }; - const result = await traceTimed(input.trace, 'resolver', 'textual_conflict_resolver', traceData, async () => - input.agentRunner.runLoop({ - modelRole: 'repair', - systemPrompt: buildResolverSystemPrompt(), - userPrompt: buildResolverUserPrompt({ - unitKey: input.unitKey, - patchPath: input.patchPath, - touchedPaths: [...allowedPaths].sort(), - reason: input.reason, - attempt, - maxAttempts, - }), - toolSet: buildToolSet({ - workdir: input.workdir, - patchPath: input.patchPath, - allowedPaths, - editedPaths, - }), - stepBudget, - telemetryTags: { - operationName: 'ingest-isolated-diff-textual-resolver', - source: input.trace.context.sourceKey, - jobId: input.trace.context.jobId, - unitKey: input.unitKey, - }, - abortSignal: input.abortSignal, + }, + systemPrompt: buildResolverSystemPrompt(), + buildUserPrompt: ({ attempt, maxAttempts, previousFailure }) => + buildResolverUserPrompt({ + unitKey: input.unitKey, + patchPath: input.patchPath, + touchedPaths: sortedTouchedPaths, + reason: input.reason, + attempt, + maxAttempts, + previousFailure, }), - ); - - if (result.stopReason === 'error') { - lastFailure = result.error?.message ?? 'resolver agent loop errored'; - await input.trace.event('error', 'resolver', 'textual_conflict_resolver_failed', traceData, result.error); - continue; - } - - const changedPaths = [...editedPaths].sort(); - if (changedPaths.length === 0) { - lastFailure = 'resolver completed without editing an allowed path'; - await input.trace.event('error', 'resolver', 'textual_conflict_resolver_failed', { - ...traceData, - reason: lastFailure, - }); - continue; - } - - await input.trace.event('debug', 'resolver', 'textual_conflict_resolver_repaired', { - ...traceData, - changedPaths, - }); - return { status: 'repaired', attempts: attempt, changedPaths }; - } - - return { status: 'failed', attempts: maxAttempts, reason: lastFailure }; + buildExtraTools: (context) => buildResolverExtraTools({ patchPath: input.patchPath, context }), + verify: input.verify, + noChangeFailureReason: 'resolver completed without editing an allowed path or declaring the patch redundant', + telemetryTags: { + operationName: 'ingest-isolated-diff-textual-resolver', + source: input.trace.context.sourceKey, + jobId: input.trace.context.jobId, + unitKey: input.unitKey, + }, + maxAttempts: input.maxAttempts, + stepBudget: input.stepBudget ?? 12, + abortSignal: input.abortSignal, + }); } diff --git a/packages/cli/src/context/ingest/stages/stage-3-work-units.ts b/packages/cli/src/context/ingest/stages/stage-3-work-units.ts index 9e4bbbc6..91f8b24b 100644 --- a/packages/cli/src/context/ingest/stages/stage-3-work-units.ts +++ b/packages/cli/src/context/ingest/stages/stage-3-work-units.ts @@ -3,20 +3,16 @@ import { isAbortError } from '../../core/abort.js'; import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js'; import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js'; import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js'; +import { formatInvalidWuSources, type WuValidationResult } from './validate-wu-sources.js'; import type { WorkUnit } from '../types.js'; const MAX_WORK_UNIT_PROMPT_CHARS = 240_000; -interface TouchedValidationResult { - invalidSources: string[]; - validSources: string[]; -} - export interface WorkUnitExecutionDeps { sessionWorktreeGit: { revParseHead(): Promise }; agentRunner: AgentRunnerPort; validateWikiRefs?: (actions: MemoryAction[]) => Promise; - validateTouchedSources: (touched: TouchedSlSource[]) => Promise; + validateTouchedSources: (touched: TouchedSlSource[]) => Promise; resetHardTo: (targetSha: string) => Promise; buildSystemPrompt: (wu: WorkUnit) => string; buildUserPrompt: (wu: WorkUnit) => string; @@ -156,7 +152,7 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit) // Spec: invalid SL writes reset the session worktree to the WU's pre-state, WU is marked failed, // its files are absent from the Stage Index. Per-source surgical revert is the // memory-agent pattern — NOT the bundle-ingest pattern. - return failWithReset(`sl_validate failed for: ${validation.invalidSources.join(', ')}`); + return failWithReset(`sl_validate failed for: ${formatInvalidWuSources(validation.invalidSources)}`); } } diff --git a/packages/cli/src/context/ingest/stages/validate-wu-sources.ts b/packages/cli/src/context/ingest/stages/validate-wu-sources.ts index 4bc3aaa0..f89e5730 100644 --- a/packages/cli/src/context/ingest/stages/validate-wu-sources.ts +++ b/packages/cli/src/context/ingest/stages/validate-wu-sources.ts @@ -1,24 +1,153 @@ +import { findMissingJoinTargets, formatMissingJoinTarget } from '../../../context/sl/semantic-layer.service.js'; import type { SlValidationDeps } from '../../../context/sl/tools/sl-warehouse-validation.js'; import type { SlValidatorPort } from '../../../context/sl/sl-validator.port.js'; import type { TouchedSlSource } from '../../../context/tools/touched-sl-sources.js'; +export interface InvalidWuSource { + /** `${connectionId}:${sourceName}` */ + source: string; + errors: string[]; +} + export interface WuValidationResult { validSources: string[]; - invalidSources: string[]; + invalidSources: InvalidWuSource[]; +} + +export function formatInvalidWuSources(invalid: InvalidWuSource[]): string { + return invalid.map((entry) => `${entry.source} (${entry.errors.join('; ')})`).join(', '); +} + +type LoadedSource = Awaited>['sources'][number]; + +function uniqueTouchedSources(sources: TouchedSlSource[]): TouchedSlSource[] { + const seen = new Set(); + const unique: TouchedSlSource[] = []; + for (const source of sources) { + const key = `${source.connectionId}:${source.sourceName}`; + if (seen.has(key)) { + continue; + } + seen.add(key); + unique.push(source); + } + return unique.sort((left, right) => { + const byConnection = left.connectionId.localeCompare(right.connectionId); + return byConnection === 0 ? left.sourceName.localeCompare(right.sourceName) : byConnection; + }); +} + +/** + * Expand the touched set with direct join neighbors that exist: targets the + * touched sources join to, and existing sources that join to a touched one. + * Missing targets are not added here — they are reported as join-target + * errors on the source that declares them, so the failure names the file + * that must change instead of the phantom neighbor. + */ +function expandWithExistingJoinNeighbors( + touched: TouchedSlSource[], + sourcesByConnection: Map, +): TouchedSlSource[] { + const expanded = [...touched]; + const touchedByConnection = new Map>(); + for (const source of touched) { + const bucket = touchedByConnection.get(source.connectionId) ?? new Set(); + bucket.add(source.sourceName); + touchedByConnection.set(source.connectionId, bucket); + } + + for (const [connectionId, sources] of sourcesByConnection) { + const touchedNames = touchedByConnection.get(connectionId); + if (!touchedNames || touchedNames.size === 0) { + continue; + } + const existingNames = new Set(sources.map((source) => source.name)); + for (const source of sources) { + if (touchedNames.has(source.name)) { + for (const join of source.joins ?? []) { + if (existingNames.has(join.to)) { + expanded.push({ connectionId, sourceName: join.to }); + } + } + } + if ((source.joins ?? []).some((join) => touchedNames.has(join.to))) { + expanded.push({ connectionId, sourceName: source.name }); + } + } + } + + return uniqueTouchedSources(expanded); +} + +/** + * Join-target errors attributable to this change set: every join declared by + * a touched source must resolve, and no source may be left joining to a name + * this change set removed. Pre-existing dangling joins on untouched sources + * are out of scope — they must not block unrelated work. Resolution is the + * Python engine's: exact source-name match within the connection. + */ +function findJoinTargetErrors( + touched: TouchedSlSource[], + sourcesByConnection: Map, +): Map { + const errorsBySource = new Map(); + const touchedByConnection = new Map>(); + for (const source of touched) { + const bucket = touchedByConnection.get(source.connectionId) ?? new Set(); + bucket.add(source.sourceName); + touchedByConnection.set(source.connectionId, bucket); + } + + for (const [connectionId, sources] of sourcesByConnection) { + const touchedNames = touchedByConnection.get(connectionId); + if (!touchedNames || touchedNames.size === 0) { + continue; + } + const existingNames = sources.map((source) => source.name); + for (const source of sources) { + const sourceIsTouched = touchedNames.has(source.name); + const candidateJoins = sourceIsTouched + ? source.joins + : (source.joins ?? []).filter((join) => touchedNames.has(join.to)); + const missing = findMissingJoinTargets(candidateJoins, existingNames); + if (missing.length === 0) { + continue; + } + const key = `${connectionId}:${source.name}`; + const messages = missing.map(formatMissingJoinTarget); + errorsBySource.set(key, [...(errorsBySource.get(key) ?? []), ...messages]); + } + } + return errorsBySource; } export async function validateWuTouchedSources( deps: SlValidationDeps & { slValidator: SlValidatorPort }, touched: TouchedSlSource[], ): Promise { + if (touched.length === 0) { + return { validSources: [], invalidSources: [] }; + } + + const sourcesByConnection = new Map(); + for (const connectionId of new Set(touched.map((source) => source.connectionId))) { + const { sources } = await deps.semanticLayerService.loadAllSources(connectionId); + sourcesByConnection.set(connectionId, sources); + } + + const expanded = expandWithExistingJoinNeighbors(touched, sourcesByConnection); + const joinTargetErrors = findJoinTargetErrors(touched, sourcesByConnection); + const valid: string[] = []; - const invalid: string[] = []; - for (const source of touched) { + const invalid: InvalidWuSource[] = []; + for (const source of expanded) { + const key = `${source.connectionId}:${source.sourceName}`; const result = await deps.slValidator.validateSingleSource(deps, source.connectionId, source.sourceName); - if (result.errors.length === 0) { - valid.push(`${source.connectionId}:${source.sourceName}`); + const errors = [...result.errors, ...(joinTargetErrors.get(key) ?? [])]; + if (errors.length === 0) { + valid.push(key); } else { - invalid.push(`${source.connectionId}:${source.sourceName}`); + invalid.push({ source: key, errors }); } } return { validSources: valid, invalidSources: invalid }; diff --git a/packages/cli/src/context/sl/semantic-layer.service.ts b/packages/cli/src/context/sl/semantic-layer.service.ts index 56a082c8..e81a28ac 100644 --- a/packages/cli/src/context/sl/semantic-layer.service.ts +++ b/packages/cli/src/context/sl/semantic-layer.service.ts @@ -521,6 +521,7 @@ export class SemanticLayerService { return null; } + async validatePhysicalTableReferences( connectionId: string, sources: SemanticLayerSource[], @@ -759,6 +760,23 @@ export class SemanticLayerService { } merged.push(toPush); + // A join target the engine cannot resolve fails every downstream gate and + // query with the error attributed to the phantom target. Reject it here, + // on the source that declares it, while the writing agent can still fix it. + const missingJoinTargets = findMissingJoinTargets( + toPush.joins, + merged.map((s) => s.name), + ); + const joinTargetErrors = missingJoinTargets.map( + (missing) => + `${toPush.name}: ${formatMissingJoinTarget(missing)}. Declare joins only to existing ` + + `semantic-layer sources in this connection, or drop the join and keep the relationship ` + + `in a column description.`, + ); + if (joinTargetErrors.length > 0) { + return { errors: [...loadErrors, ...joinTargetErrors], warnings: [], perSourceWarnings: {} }; + } + const validatable = merged.filter((s) => s.table != null || s.sql != null); if (validatable.length === 0) { return { errors: loadErrors, warnings: [], perSourceWarnings: {} }; @@ -1427,6 +1445,47 @@ function parseJoinColumns( return { localColumn: left.column, targetColumn: right.column }; } +export interface MissingJoinTarget { + to: string; + /** Source whose name matches only case-insensitively, if any — the usual authoring mistake. */ + caseMismatch: string | null; +} + +/** + * Join targets that do not exactly match a known source name. The Python + * engine resolves `joins[].to` by exact name within one connection's source + * set (`engine._collect_orphan_join_target_errors`) and `query()` raises on a + * miss, so anything looser here — case-insensitive matches, table refs, + * sources in other connections — would pass this gate and then fail + * query/validation as an orphan join target. + */ +export function findMissingJoinTargets( + joins: Array<{ to: string }> | undefined, + knownSourceNames: Iterable, +): MissingJoinTarget[] { + const known = new Set(); + const canonicalByLower = new Map(); + for (const name of knownSourceNames) { + known.add(name); + canonicalByLower.set(name.toLowerCase(), name); + } + const missing: MissingJoinTarget[] = []; + for (const join of joins ?? []) { + if (known.has(join.to)) { + continue; + } + missing.push({ to: join.to, caseMismatch: canonicalByLower.get(join.to.toLowerCase()) ?? null }); + } + return missing; +} + +export function formatMissingJoinTarget(missing: MissingJoinTarget): string { + const hint = missing.caseMismatch + ? `; join targets are case-sensitive — the source is named "${missing.caseMismatch}"` + : ''; + return `join target "${missing.to}" does not exist${hint}`; +} + /** * Returns one message per measure-level segment reference that doesn't resolve to * a segment defined on the source. Array is empty when every reference checks out. diff --git a/packages/cli/test/context/ingest/artifact-gates.test.ts b/packages/cli/test/context/ingest/artifact-gates.test.ts index c93a24e5..473595b2 100644 --- a/packages/cli/test/context/ingest/artifact-gates.test.ts +++ b/packages/cli/test/context/ingest/artifact-gates.test.ts @@ -108,57 +108,39 @@ describe('artifact gates', () => { ).rejects.toThrow(/unknown sl_refs entity mart_account_segments\.total_contract_arr_cents/); }); - it('validates direct declared-join neighbors of touched semantic-layer sources', async () => { + it('passes touched sources to the shared validation path and surfaces its reasons', async () => { + // Join-neighbor expansion lives inside validateTouchedSources (the same + // path work units use); the gate hands over the raw touched set and must + // carry the per-source reasons into the failure it throws. const semanticLayerService = { - loadAllSources: vi.fn().mockResolvedValue({ - sources: [ - { - name: 'orders', - grain: ['order_id'], - columns: [ - { name: 'order_id', type: 'string' }, - { name: 'account_id', type: 'string' }, - ], - joins: [{ to: 'accounts', on: 'orders.account_id = accounts.account_id', relationship: 'many_to_one' }], - measures: [{ name: 'order_count', expr: 'count(*)' }], - }, - { - name: 'accounts', - grain: ['account_id'], - columns: [{ name: 'account_id', type: 'string' }], - joins: [], - measures: [{ name: 'account_count', expr: 'count(*)' }], - }, - { - name: 'segments', - grain: ['segment_id'], - columns: [ - { name: 'segment_id', type: 'string' }, - { name: 'account_id', type: 'string' }, - ], - joins: [{ to: 'accounts', on: 'segments.account_id = accounts.account_id', relationship: 'many_to_one' }], - measures: [], - }, - ], - loadErrors: [], - }), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), }; - const validateTouchedSources = vi.fn().mockResolvedValue({ invalidSources: [], validSources: [] }); - - await validateFinalIngestArtifacts({ - connectionIds: ['warehouse'], - changedWikiPageKeys: [], - touchedSlSources: [{ connectionId: 'warehouse', sourceName: 'accounts' }], - wikiService: { readPage: vi.fn() } as never, - semanticLayerService: semanticLayerService as never, - validateTouchedSources, - tableExists: async () => true, + const validateTouchedSources = vi.fn().mockResolvedValue({ + validSources: [], + invalidSources: [ + { + source: 'warehouse:mart_account_segments', + errors: ['join target "accounts" does not exist'], + }, + ], }); + await expect( + validateFinalIngestArtifacts({ + connectionIds: ['warehouse'], + changedWikiPageKeys: [], + touchedSlSources: [{ connectionId: 'warehouse', sourceName: 'mart_account_segments' }], + wikiService: { readPage: vi.fn() } as never, + semanticLayerService: semanticLayerService as never, + validateTouchedSources, + tableExists: async () => true, + }), + ).rejects.toThrow( + /semantic-layer validation failed for warehouse:mart_account_segments: join target "accounts" does not exist/, + ); + expect(validateTouchedSources).toHaveBeenCalledWith([ - { connectionId: 'warehouse', sourceName: 'accounts' }, - { connectionId: 'warehouse', sourceName: 'orders' }, - { connectionId: 'warehouse', sourceName: 'segments' }, + { connectionId: 'warehouse', sourceName: 'mart_account_segments' }, ]); }); diff --git a/packages/cli/test/context/ingest/final-gate-repair.test.ts b/packages/cli/test/context/ingest/final-gate-repair.test.ts index 8a21eab6..d711c5bd 100644 --- a/packages/cli/test/context/ingest/final-gate-repair.test.ts +++ b/packages/cli/test/context/ingest/final-gate-repair.test.ts @@ -53,7 +53,7 @@ describe('finalGateRepairPaths', () => { }); describe('repairFinalGateFailure', () => { - it('lets the repair agent read gate errors and edit only allowed files', async () => { + it('lets the repair agent read gate errors, edit only allowed files, and verifies the gate', async () => { const { workdir, trace } = await makeHarness(); const agentRunner = { runLoop: vi.fn(async (params: any) => { @@ -70,7 +70,7 @@ describe('repairFinalGateFailure', () => { path: 'wiki/global/other.md', content: 'not allowed', }), - ).rejects.toThrow(/gate repair path not allowed/); + ).rejects.toThrow(/repair path not allowed/); await params.toolSet.write_repair_file.execute({ path: 'wiki/global/account-segments.md', @@ -79,6 +79,7 @@ describe('repairFinalGateFailure', () => { return { stopReason: 'natural' as const }; }), }; + const verify = vi.fn(async () => ({ ok: true as const })); const result = await repairFinalGateFailure({ agentRunner, @@ -88,6 +89,7 @@ describe('repairFinalGateFailure', () => { allowedPaths: ['wiki/global/account-segments.md'], trace, repairKind: 'final_artifact_gate', + verify, maxAttempts: 1, stepBudget: 8, }); @@ -97,6 +99,7 @@ describe('repairFinalGateFailure', () => { attempts: 1, changedPaths: ['wiki/global/account-segments.md'], }); + expect(verify).toHaveBeenCalledWith(['wiki/global/account-segments.md']); await expect(readFile(join(workdir, 'wiki/global/account-segments.md'), 'utf-8')).resolves.toContain( 'total_contract_arr', ); @@ -115,6 +118,7 @@ describe('repairFinalGateFailure', () => { it('returns failed when the repair agent edits no allowed file', async () => { const { workdir, trace } = await makeHarness(); + const verify = vi.fn(async () => ({ ok: true as const })); const result = await repairFinalGateFailure({ agentRunner: { runLoop: vi.fn(async () => ({ stopReason: 'natural' as const })) }, workdir, @@ -122,6 +126,7 @@ describe('repairFinalGateFailure', () => { allowedPaths: ['wiki/global/account-segments.md'], trace, repairKind: 'final_artifact_gate', + verify, maxAttempts: 1, stepBudget: 8, }); @@ -131,6 +136,52 @@ describe('repairFinalGateFailure', () => { attempts: 1, reason: 'gate repair completed without editing an allowed path', }); + expect(verify).not.toHaveBeenCalled(); await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('gate_repair_failed'); }); + + it('does not report repaired when edits fail gate verification', async () => { + // Regression: the repair agent edited allowed files but left a dangling + // join in place. The old loop reported "repaired" because a file changed; + // success must come from the gate re-check instead. + const { workdir, trace } = await makeHarness(); + const agentRunner = { + runLoop: vi.fn(async (params: any) => { + await params.toolSet.write_repair_file.execute({ + path: 'wiki/global/account-segments.md', + content: 'an edit that does not fix the gate\n', + }); + return { stopReason: 'natural' as const }; + }), + }; + const verify = vi + .fn() + .mockResolvedValueOnce({ + ok: false, + reason: 'final artifact gates failed:\nsemantic-layer validation failed for warehouse:accounts', + }) + .mockResolvedValueOnce({ ok: true }); + + const result = await repairFinalGateFailure({ + agentRunner, + workdir, + gateError: 'final artifact gates failed:\nsemantic-layer validation failed for warehouse:accounts', + allowedPaths: ['wiki/global/account-segments.md'], + trace, + repairKind: 'patch_semantic_gate', + verify, + maxAttempts: 2, + stepBudget: 8, + }); + + expect(result).toEqual({ + status: 'repaired', + attempts: 2, + changedPaths: ['wiki/global/account-segments.md'], + }); + expect(verify).toHaveBeenCalledTimes(2); + const secondPrompt = agentRunner.runLoop.mock.calls[1][0].userPrompt as string; + expect(secondPrompt).toContain('semantic-layer validation failed for warehouse:accounts'); + expect(secondPrompt).toContain('Previous attempt did not pass the gate'); + }); }); diff --git a/packages/cli/test/context/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/cli/test/context/ingest/ingest-bundle.runner.isolated-diff.test.ts index 55e1bc08..b9485684 100644 --- a/packages/cli/test/context/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/cli/test/context/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -1901,13 +1901,13 @@ describe('IngestBundleRunner isolated diff path', () => { }); deps.agentRunner.runLoop = vi.fn(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-isolated-diff-textual-resolver') { - const current = await params.toolSet.read_integration_file.execute({ + const current = await params.toolSet.read_repair_file.execute({ path: 'semantic-layer/warehouse/mart_account_segments.yaml', }); expect(current.markdown).toContain('total_contract_arr_cents'); const patch = await params.toolSet.read_failed_patch.execute({}); expect(patch.markdown).toContain('account_count'); - await params.toolSet.write_integration_file.execute({ + await params.toolSet.write_repair_file.execute({ path: 'semantic-layer/warehouse/mart_account_segments.yaml', content: 'name: mart_account_segments\n' + @@ -2105,7 +2105,6 @@ describe('IngestBundleRunner isolated diff path', () => { }); const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-final-gate-repair/trace.jsonl'), 'utf-8'); expect(trace).toContain('gate_repair_repaired'); - expect(trace).toContain('final_artifact_gates_after_gate_repair_finished'); expect(trace).toContain('final_gate_repair_committed'); } finally { await rm(runtime.homeDir, { recursive: true, force: true }); @@ -2191,7 +2190,8 @@ describe('IngestBundleRunner isolated diff path', () => { const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0] as any; expect(reportCreate.body.status).toBe('failed'); expect(reportCreate.body.isolatedDiff).toMatchObject({ - gateRepairAttempts: 1, + // Both attempts of the verify-based repair loop ran without an edit. + gateRepairAttempts: 2, gateRepairs: 0, gateRepairFailures: 1, }); diff --git a/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts b/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts index 545b2ba0..43590bc7 100644 --- a/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts +++ b/packages/cli/test/context/ingest/isolated-diff/patch-integrator.test.ts @@ -221,6 +221,7 @@ describe('integrateWorkUnitPatch', () => { touchedPaths: ['wiki/global/a.md'], }); await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal\n', 'utf-8'); + await expect(context.verify(['wiki/global/a.md'])).resolves.toEqual({ ok: true }); return { status: 'repaired' as const, attempts: 1, @@ -336,6 +337,7 @@ describe('integrateWorkUnitPatch', () => { touchedPaths: ['wiki/global/a.md'], }); await writeFile(join(configDir, 'wiki/global/a.md'), 'repaired semantic ref\n', 'utf-8'); + await expect(context.verify(['wiki/global/a.md'])).resolves.toEqual({ ok: true }); return { status: 'repaired' as const, attempts: 1, @@ -402,71 +404,56 @@ describe('integrateWorkUnitPatch', () => { await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('old\n'); }); - it('repairs a semantic gate failure after a textual conflict is resolved', async () => { - const { homeDir, configDir, git } = await makeRepo(); - await mkdir(join(configDir, 'wiki/global'), { recursive: true }); - await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8'); - await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com'); - const conflictBase = await git.revParseHead(); + it('accepts a redundant duplicate-creation patch as subsumed without committing', async () => { + // Regression: two work units each emitted a creation patch for the same + // wiki page. The second creation patch conflicts with the page already in + // the tree; the resolver verifies a no-change resolution and the source + // must not fail. + const { homeDir, configDir, git, baseSha } = await makeRepo(); + await writeFile(join(configDir, 'wiki/global/b.md'), 'page from the first work unit\n', 'utf-8'); + await git.commitFiles(['wiki/global/b.md'], 'first creation', 'System User', 'system@example.com'); + const acceptedHead = await git.revParseHead(); - await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8'); - await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com'); - - const childDir = join(homeDir, 'child-conflict-repair'); - await git.addWorktree(childDir, 'child-conflict-repair', conflictBase); + const childDir = join(homeDir, 'child-duplicate'); + await git.addWorktree(childDir, 'child-duplicate', baseSha); const childGit = git.forWorktree(childDir); - await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8'); - await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com'); - const patchPath = join(homeDir, 'proposal-repair.patch'); - await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath); + await writeFile(join(childDir, 'wiki/global/b.md'), 'duplicate page from the second work unit\n', 'utf-8'); + await childGit.commitFiles(['wiki/global/b.md'], 'second creation', 'System User', 'system@example.com'); + const patchPath = join(homeDir, 'duplicate-creation.patch'); + await childGit.writeBinaryNoRenamePatch(baseSha, 'HEAD', patchPath); const trace = new FileIngestTraceWriter({ - tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-repair/trace.jsonl'), - jobId: 'job-resolver-repair', + tracePath: join(homeDir, '.ktx/ingest-traces/job-subsumed/trace.jsonl'), + jobId: 'job-subsumed', connectionId: 'warehouse', - sourceKey: 'metabase', + sourceKey: 'notion', level: 'trace', }); - // Gate fails on the resolver's merged tree, then passes after the repair edit. - const validateAppliedTree = vi - .fn() - .mockRejectedValueOnce( - new Error('final artifact gates failed:\narr-definition: unknown sl_refs entity mart_arr_daily.arr_dollars'), - ) - .mockResolvedValueOnce(undefined); - - const repairGateFailure = vi.fn(async (context: { unitKey: string; touchedPaths: string[] }) => { - expect(context).toMatchObject({ unitKey: 'wu-conflict-repair', touchedPaths: ['wiki/global/a.md'] }); - await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal repaired\n', 'utf-8'); - return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] }; - }); - const result = await integrateWorkUnitPatch({ - unitKey: 'wu-conflict-repair', + unitKey: 'wu-duplicate', patchPath, integrationGit: git, trace, author: { name: 'System User', email: 'system@example.com' }, slDisallowed: false, allowedTargetConnectionIds: new Set(['warehouse']), - validateAppliedTree, - resolveTextualConflict: vi.fn(async () => { - await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\nproposal\n', 'utf-8'); - return { status: 'repaired' as const, attempts: 1, changedPaths: ['wiki/global/a.md'] }; + validateAppliedTree: vi.fn(async () => {}), + resolveTextualConflict: vi.fn(async (context) => { + await expect(context.verify([])).resolves.toEqual({ ok: true }); + return { status: 'repaired' as const, attempts: 1, changedPaths: [] }; }), - repairGateFailure, }); expect(result).toMatchObject({ status: 'accepted', - touchedPaths: ['wiki/global/a.md'], - textualResolution: { status: 'repaired' }, - gateRepair: { status: 'repaired', attempts: 1, changedPaths: ['wiki/global/a.md'] }, + touchedPaths: [], + textualResolution: { status: 'repaired', attempts: 1, changedPaths: [] }, }); - expect(validateAppliedTree).toHaveBeenCalledTimes(2); - expect(repairGateFailure).toHaveBeenCalledOnce(); - await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\nproposal repaired\n'); - await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('patch_accepted_after_textual_resolution'); + expect(await git.revParseHead()).toBe(acceptedHead); + await expect(readFile(join(configDir, 'wiki/global/b.md'), 'utf-8')).resolves.toBe( + 'page from the first work unit\n', + ); + await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('patch_subsumed_after_textual_resolution'); }); }); diff --git a/packages/cli/test/context/ingest/isolated-diff/textual-conflict-resolver.test.ts b/packages/cli/test/context/ingest/isolated-diff/textual-conflict-resolver.test.ts index a03eb66d..a2aa3e20 100644 --- a/packages/cli/test/context/ingest/isolated-diff/textual-conflict-resolver.test.ts +++ b/packages/cli/test/context/ingest/isolated-diff/textual-conflict-resolver.test.ts @@ -42,7 +42,7 @@ describe('resolveTextualConflict', () => { const { workdir, patchPath, trace } = await makeHarness(); const agentRunner = { runLoop: vi.fn(async (params: any) => { - const current = await params.toolSet.read_integration_file.execute({ path: 'wiki/global/account.md' }); + const current = await params.toolSet.read_repair_file.execute({ path: 'wiki/global/account.md' }); expect(current.structured).toEqual({ path: 'wiki/global/account.md', exists: true }); expect(current.markdown).toContain('accepted line'); @@ -50,19 +50,20 @@ describe('resolveTextualConflict', () => { expect(patch.markdown).toContain('proposal line'); await expect( - params.toolSet.write_integration_file.execute({ + params.toolSet.write_repair_file.execute({ path: 'wiki/global/not-allowed.md', content: 'bad\n', }), - ).rejects.toThrow(/resolver path not allowed/); + ).rejects.toThrow(/repair path not allowed/); - await params.toolSet.write_integration_file.execute({ + await params.toolSet.write_repair_file.execute({ path: 'wiki/global/account.md', content: 'accepted line\nproposal line\n', }); return { stopReason: 'natural' as const }; }), }; + const verify = vi.fn(async () => ({ ok: true as const })); const result = await resolveTextualConflict({ agentRunner, @@ -72,6 +73,7 @@ describe('resolveTextualConflict', () => { touchedPaths: ['wiki/global/account.md'], trace, reason: 'patch failed: wiki/global/account.md', + verify, maxAttempts: 1, stepBudget: 8, }); @@ -81,6 +83,7 @@ describe('resolveTextualConflict', () => { attempts: 1, changedPaths: ['wiki/global/account.md'], }); + expect(verify).toHaveBeenCalledWith(['wiki/global/account.md']); await expect(readFile(join(workdir, 'wiki/global/account.md'), 'utf-8')).resolves.toBe( 'accepted line\nproposal line\n', ); @@ -97,8 +100,9 @@ describe('resolveTextualConflict', () => { ); }); - it('fails when the repair agent completes without editing any touched path', async () => { + it('fails when the repair agent neither edits nor declares the patch redundant', async () => { const { workdir, patchPath, trace } = await makeHarness(); + const verify = vi.fn(async () => ({ ok: true as const })); const result = await resolveTextualConflict({ agentRunner: { runLoop: vi.fn(async () => ({ stopReason: 'natural' as const })) }, workdir, @@ -107,6 +111,7 @@ describe('resolveTextualConflict', () => { touchedPaths: ['wiki/global/account.md'], trace, reason: 'patch failed: wiki/global/account.md', + verify, maxAttempts: 1, stepBudget: 8, }); @@ -114,7 +119,112 @@ describe('resolveTextualConflict', () => { expect(result).toEqual({ status: 'failed', attempts: 1, - reason: 'resolver completed without editing an allowed path', + reason: 'resolver completed without editing an allowed path or declaring the patch redundant', }); + expect(verify).not.toHaveBeenCalled(); + }); + + it('succeeds without edits when the agent declares the patch redundant and the gates verify', async () => { + // Regression: two Notion pages produced creation patches for the same + // wiki key. The second patch conflicts, the integration tree already + // holds a complete page, and the correct resolution is no edit at all. + const { workdir, patchPath, trace } = await makeHarness(); + const agentRunner = { + runLoop: vi.fn(async (params: any) => { + const declared = await params.toolSet.declare_patch_redundant.execute({ + reason: 'wiki/global/account.md already documents this page', + }); + expect(declared.structured).toEqual({ reason: 'wiki/global/account.md already documents this page' }); + return { stopReason: 'natural' as const }; + }), + }; + const verify = vi.fn(async () => ({ ok: true as const })); + + const result = await resolveTextualConflict({ + agentRunner, + workdir, + unitKey: 'wu-duplicate', + patchPath, + touchedPaths: ['wiki/global/account.md'], + trace, + reason: 'patch failed: wiki/global/account.md', + verify, + maxAttempts: 1, + stepBudget: 8, + }); + + expect(result).toEqual({ status: 'repaired', attempts: 1, changedPaths: [] }); + expect(verify).toHaveBeenCalledWith([]); + await expect(readFile(join(workdir, 'wiki/global/account.md'), 'utf-8')).resolves.toBe('accepted line\n'); + await expect(readFile(trace.tracePath, 'utf-8')).resolves.toContain('textual_conflict_resolver_repaired'); + }); + + it('retries with the gate failure when verification rejects the first resolution', async () => { + const { workdir, patchPath, trace } = await makeHarness(); + const agentRunner = { + runLoop: vi.fn(async (params: any) => { + await params.toolSet.write_repair_file.execute({ + path: 'wiki/global/account.md', + content: 'accepted line\nproposal line\n', + }); + return { stopReason: 'natural' as const }; + }), + }; + const verify = vi + .fn() + .mockResolvedValueOnce({ ok: false, reason: 'final artifact gates failed: stale sl_refs entry' }) + .mockResolvedValueOnce({ ok: true }); + + const result = await resolveTextualConflict({ + agentRunner, + workdir, + unitKey: 'wu-retry', + patchPath, + touchedPaths: ['wiki/global/account.md'], + trace, + reason: 'patch failed: wiki/global/account.md', + verify, + maxAttempts: 2, + stepBudget: 8, + }); + + expect(result).toEqual({ + status: 'repaired', + attempts: 2, + changedPaths: ['wiki/global/account.md'], + }); + expect(agentRunner.runLoop).toHaveBeenCalledTimes(2); + const secondPrompt = agentRunner.runLoop.mock.calls[1][0].userPrompt as string; + expect(secondPrompt).toContain('final artifact gates failed: stale sl_refs entry'); + }); + + it('fails when edits never pass verification', async () => { + const { workdir, patchPath, trace } = await makeHarness(); + const agentRunner = { + runLoop: vi.fn(async (params: any) => { + await params.toolSet.write_repair_file.execute({ + path: 'wiki/global/account.md', + content: 'still wrong\n', + }); + return { stopReason: 'natural' as const }; + }), + }; + const verify = vi.fn(async () => ({ ok: false as const, reason: 'final artifact gates failed' })); + + const result = await resolveTextualConflict({ + agentRunner, + workdir, + unitKey: 'wu-never-passes', + patchPath, + touchedPaths: ['wiki/global/account.md'], + trace, + reason: 'patch failed: wiki/global/account.md', + verify, + maxAttempts: 2, + stepBudget: 8, + }); + + expect(result).toEqual({ status: 'failed', attempts: 2, reason: 'final artifact gates failed' }); + expect(verify).toHaveBeenCalledTimes(2); }); }); diff --git a/packages/cli/test/context/ingest/stages/stage-3-work-units.test.ts b/packages/cli/test/context/ingest/stages/stage-3-work-units.test.ts index 6d6deccd..f4ec9ae9 100644 --- a/packages/cli/test/context/ingest/stages/stage-3-work-units.test.ts +++ b/packages/cli/test/context/ingest/stages/stage-3-work-units.test.ts @@ -85,12 +85,13 @@ describe('Stage 3 — executeWorkUnit', () => { addTouchedSlSource(deps.captureSession.touchedSlSources, 'c1', 'src_good'); return Promise.resolve({ stopReason: 'natural' }); }); - deps.validateTouchedSources = vi - .fn() - .mockResolvedValue({ validSources: ['c1:src_good'], invalidSources: ['c1:src_bad'] }); + deps.validateTouchedSources = vi.fn().mockResolvedValue({ + validSources: ['c1:src_good'], + invalidSources: [{ source: 'c1:src_bad', errors: ['join target "accounts" does not exist'] }], + }); const outcome = await executeWorkUnit(deps, makeWu()); expect(outcome.status).toBe('failed'); - expect(outcome.reason).toMatch(/src_bad/); + expect(outcome.reason).toMatch(/src_bad \(join target "accounts" does not exist\)/); expect(outcome.actions).toEqual([]); expect(outcome.touchedSlSources).toEqual([]); expect(deps.resetHardTo).toHaveBeenCalledWith('pre'); diff --git a/packages/cli/test/context/ingest/stages/validate-wu-sources.test.ts b/packages/cli/test/context/ingest/stages/validate-wu-sources.test.ts index 807a8b10..51508510 100644 --- a/packages/cli/test/context/ingest/stages/validate-wu-sources.test.ts +++ b/packages/cli/test/context/ingest/stages/validate-wu-sources.test.ts @@ -1,8 +1,17 @@ import { describe, expect, it, vi } from 'vitest'; -import { validateWuTouchedSources } from '../../../../src/context/ingest/stages/validate-wu-sources.js'; +import { formatInvalidWuSources, validateWuTouchedSources } from '../../../../src/context/ingest/stages/validate-wu-sources.js'; + +function makeSemanticLayerService(sourcesByConnection: Record }>>) { + return { + loadAllSources: vi.fn(async (connectionId: string) => ({ + sources: sourcesByConnection[connectionId] ?? [], + loadErrors: [], + })), + }; +} describe('validateWuTouchedSources', () => { - it('validates each touched source against its own connection', async () => { + it('validates each touched source against its own connection and carries validator errors', async () => { const validateSingleSource = vi .fn() .mockImplementation((_deps: unknown, conn: string, name: string) => @@ -12,7 +21,13 @@ describe('validateWuTouchedSources', () => { : { errors: ['invalid measure'], warnings: [] }, ), ); - const deps = { slValidator: { validateSingleSource } } as any; + const deps = { + semanticLayerService: makeSemanticLayerService({ + 'warehouse-a': [{ name: 'good' }], + 'warehouse-b': [{ name: 'bad' }], + }), + slValidator: { validateSingleSource }, + } as any; const result = await validateWuTouchedSources(deps, [ { connectionId: 'warehouse-a', sourceName: 'good' }, @@ -20,16 +35,137 @@ describe('validateWuTouchedSources', () => { ]); expect(result.validSources).toEqual(['warehouse-a:good']); - expect(result.invalidSources).toEqual(['warehouse-b:bad']); - expect(validateSingleSource).toHaveBeenNthCalledWith(1, deps, 'warehouse-a', 'good'); - expect(validateSingleSource).toHaveBeenNthCalledWith(2, deps, 'warehouse-b', 'bad'); + expect(result.invalidSources).toEqual([{ source: 'warehouse-b:bad', errors: ['invalid measure'] }]); }); it('returns empty arrays when no sources are touched', async () => { const validateSingleSource = vi.fn(); - const deps = { slValidator: { validateSingleSource } } as any; + const semanticLayerService = makeSemanticLayerService({}); + const deps = { semanticLayerService, slValidator: { validateSingleSource } } as any; const result = await validateWuTouchedSources(deps, []); expect(result).toEqual({ validSources: [], invalidSources: [] }); expect(validateSingleSource).not.toHaveBeenCalled(); + expect(semanticLayerService.loadAllSources).not.toHaveBeenCalled(); + }); + + it('expands the validated set with existing join neighbors in both directions', async () => { + const validateSingleSource = vi.fn().mockResolvedValue({ errors: [], warnings: [] }); + const deps = { + semanticLayerService: makeSemanticLayerService({ + warehouse: [ + { name: 'accounts', joins: [] }, + { name: 'orders', joins: [{ to: 'accounts' }] }, + { name: 'segments', joins: [{ to: 'accounts' }] }, + { name: 'unrelated', joins: [] }, + ], + }), + slValidator: { validateSingleSource }, + } as any; + + const result = await validateWuTouchedSources(deps, [{ connectionId: 'warehouse', sourceName: 'accounts' }]); + + expect(result.validSources).toEqual(['warehouse:accounts', 'warehouse:orders', 'warehouse:segments']); + expect(validateSingleSource.mock.calls.map((call) => call[2])).toEqual(['accounts', 'orders', 'segments']); + }); + + it('reports a dangling join target as an error on the source that declares it', async () => { + // Regression: a Metabase work unit wrote mart_account_segments with + // `joins: [{to: accounts}]` while no `accounts` source exists anywhere. + // The error must name the declaring source, not the phantom neighbor. + const validateSingleSource = vi.fn().mockResolvedValue({ errors: [], warnings: [] }); + const deps = { + semanticLayerService: makeSemanticLayerService({ + warehouse: [{ name: 'mart_account_segments', joins: [{ to: 'accounts' }] }], + }), + slValidator: { validateSingleSource }, + } as any; + + const result = await validateWuTouchedSources(deps, [ + { connectionId: 'warehouse', sourceName: 'mart_account_segments' }, + ]); + + expect(result.validSources).toEqual([]); + expect(result.invalidSources).toEqual([ + { + source: 'warehouse:mart_account_segments', + errors: ['join target "accounts" does not exist'], + }, + ]); + // The phantom target is not validated as a source of its own. + expect(validateSingleSource.mock.calls.map((call) => call[2])).toEqual(['mart_account_segments']); + }); + + it('reports a join left dangling by a deletion on the surviving source', async () => { + const validateSingleSource = vi.fn().mockResolvedValue({ errors: [], warnings: [] }); + const deps = { + semanticLayerService: makeSemanticLayerService({ + // `accounts` was deleted by this work unit: touched but absent from + // the loaded sources. `orders` still joins to it. + warehouse: [{ name: 'orders', joins: [{ to: 'accounts' }] }], + }), + slValidator: { validateSingleSource }, + } as any; + + const result = await validateWuTouchedSources(deps, [{ connectionId: 'warehouse', sourceName: 'accounts' }]); + + expect(result.invalidSources).toContainEqual({ + source: 'warehouse:orders', + errors: ['join target "accounts" does not exist'], + }); + }); + + it('rejects join targets that match a source name only case-insensitively', async () => { + // The Python engine resolves joins[].to by exact name; a case mismatch + // would pass a lenient gate and then fail every query as an orphan. + const validateSingleSource = vi.fn().mockResolvedValue({ errors: [], warnings: [] }); + const deps = { + semanticLayerService: makeSemanticLayerService({ + warehouse: [{ name: 'SIGNED_UP' }, { name: 'orders', joins: [{ to: 'signed_up' }] }], + }), + slValidator: { validateSingleSource }, + } as any; + + const result = await validateWuTouchedSources(deps, [{ connectionId: 'warehouse', sourceName: 'orders' }]); + + expect(result.invalidSources).toEqual([ + { + source: 'warehouse:orders', + errors: [ + 'join target "signed_up" does not exist; join targets are case-sensitive — the source is named "SIGNED_UP"', + ], + }, + ]); + }); + + it('ignores pre-existing dangling joins on sources unrelated to this change set', async () => { + const validateSingleSource = vi.fn().mockResolvedValue({ errors: [], warnings: [] }); + const deps = { + semanticLayerService: makeSemanticLayerService({ + warehouse: [ + { name: 'touched_source', joins: [] }, + { name: 'legacy', joins: [{ to: 'phantom' }] }, + ], + }), + slValidator: { validateSingleSource }, + } as any; + + const result = await validateWuTouchedSources(deps, [{ connectionId: 'warehouse', sourceName: 'touched_source' }]); + + expect(result.invalidSources).toEqual([]); + expect(result.validSources).toEqual(['warehouse:touched_source']); + }); +}); + +describe('formatInvalidWuSources', () => { + it('joins each source with its reasons', () => { + expect( + formatInvalidWuSources([ + { source: 'warehouse:mart_account_segments', errors: ['join target "accounts" does not exist'] }, + { source: 'warehouse:bad', errors: ['invalid YAML', 'duplicate measure'] }, + ]), + ).toBe( + 'warehouse:mart_account_segments (join target "accounts" does not exist), ' + + 'warehouse:bad (invalid YAML; duplicate measure)', + ); }); }); diff --git a/packages/cli/test/context/sl/semantic-layer.service.test.ts b/packages/cli/test/context/sl/semantic-layer.service.test.ts index 8e61ec22..edc31ce1 100644 --- a/packages/cli/test/context/sl/semantic-layer.service.test.ts +++ b/packages/cli/test/context/sl/semantic-layer.service.test.ts @@ -1191,17 +1191,11 @@ describe('validateWithProposedSource', () => { }); it('rejects join keys that are absent from matched physical sources', async () => { - const schemaPath = 'semantic-layer/postgres-warehouse/_schema/orbit_analytics.yaml'; + const schemaPath = 'semantic-layer/dbt-main/_schema/orbit_analytics.yaml'; configService.listFiles.mockImplementation((dir: string) => { - if (dir === 'semantic-layer/dbt-main') { - return Promise.resolve({ files: [] }); - } - if (dir === 'semantic-layer') { + if (dir === 'semantic-layer/dbt-main' || dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer') { return Promise.resolve({ files: [schemaPath] }); } - if (dir === 'semantic-layer/dbt-main/_schema' || dir === 'semantic-layer/postgres-warehouse/_schema') { - return Promise.resolve({ files: dir.endsWith('postgres-warehouse/_schema') ? [schemaPath] : [] }); - } return Promise.resolve({ files: [] }); }); configService.readFile.mockResolvedValue({ @@ -1233,6 +1227,103 @@ describe('validateWithProposedSource', () => { expect(result.errors.join('\n')).toMatch(/local column "account_name"/); expect(result.errors.join('\n')).toMatch(/target column "account_uuid"/); }); + + it('rejects joins whose target resolves to no source and no manifest entry anywhere', async () => { + // Regression: a Metabase work unit wrote `joins: [{to: accounts}]` while + // no `accounts` source or manifest table existed in the project. The + // write tool must reject the source so the agent can fix its own join. + configService.listFiles.mockResolvedValue({ files: [] }); + pythonPort.validateSources.mockResolvedValue({ + data: { errors: [], warnings: [] }, + }); + + const result = await service.validateWithProposedSource('conn-1', { + name: 'mart_account_segments', + table: 'orbit_analytics.mart_account_segments', + grain: ['account_id'], + columns: [{ name: 'account_id', type: 'string' }], + joins: [ + { to: 'accounts', on: 'mart_account_segments.account_id = accounts.account_id', relationship: 'many_to_one' }, + ], + measures: [], + }); + + expect(result.errors.join('\n')).toMatch(/mart_account_segments: join target "accounts" does not exist/); + expect(pythonPort.validateSources).not.toHaveBeenCalled(); + }); + + it('rejects join targets that differ from the source name only by case', async () => { + // The Python engine resolves joins[].to by exact name + // (engine._collect_orphan_join_target_errors), so a case-insensitive + // acceptance here would let the source pass gates and fail every query. + const schemaPath = 'semantic-layer/conn-1/_schema/core.yaml'; + configService.listFiles.mockImplementation((dir: string) => { + if (dir === 'semantic-layer/conn-1' || dir === 'semantic-layer/conn-1/_schema' || dir === 'semantic-layer') { + return Promise.resolve({ files: [schemaPath] }); + } + return Promise.resolve({ files: [] }); + }); + configService.readFile.mockResolvedValue({ + content: ['tables:', ' SIGNED_UP:', ' table: analytics.SIGNED_UP', ' columns:', ' - { name: account_id, type: string }'].join( + '\n', + ), + }); + pythonPort.validateSources.mockResolvedValue({ + data: { errors: [], warnings: [] }, + }); + + const result = await service.validateWithProposedSource('conn-1', { + name: 'orders', + table: 'analytics.orders', + grain: ['account_id'], + columns: [{ name: 'account_id', type: 'string' }], + joins: [{ to: 'signed_up', on: 'orders.account_id = signed_up.account_id', relationship: 'many_to_one' }], + measures: [], + }); + + expect(result.errors.join('\n')).toMatch( + /orders: join target "signed_up" does not exist; join targets are case-sensitive — the source is named "SIGNED_UP"/, + ); + expect(pythonPort.validateSources).not.toHaveBeenCalled(); + }); + + it('rejects join targets written as table refs even when a manifest table matches', async () => { + // `joins[].to` must be the source NAME ("accounts"), not the physical + // table ref ("orbit_analytics.accounts") — the engine keys sources by name. + const schemaPath = 'semantic-layer/conn-1/_schema/core.yaml'; + configService.listFiles.mockImplementation((dir: string) => { + if (dir === 'semantic-layer/conn-1' || dir === 'semantic-layer/conn-1/_schema' || dir === 'semantic-layer') { + return Promise.resolve({ files: [schemaPath] }); + } + return Promise.resolve({ files: [] }); + }); + configService.readFile.mockResolvedValue({ + content: ['tables:', ' accounts:', ' table: orbit_analytics.accounts', ' columns:', ' - { name: account_id, type: string }'].join( + '\n', + ), + }); + pythonPort.validateSources.mockResolvedValue({ + data: { errors: [], warnings: [] }, + }); + + const result = await service.validateWithProposedSource('conn-1', { + name: 'orders', + table: 'orbit_analytics.orders', + grain: ['account_id'], + columns: [{ name: 'account_id', type: 'string' }], + joins: [ + { + to: 'orbit_analytics.accounts', + on: 'orders.account_id = orbit_analytics.accounts.account_id', + relationship: 'many_to_one', + }, + ], + measures: [], + }); + + expect(result.errors.join('\n')).toMatch(/orders: join target "orbit_analytics.accounts" does not exist/); + expect(pythonPort.validateSources).not.toHaveBeenCalled(); + }); }); describe('findDanglingSegmentRefs', () => {