fix(ingest): verify repair outcomes and reject dangling join targets (#292)

One ingest integration hiccup no longer discards a whole source:

- Replace the duplicated gate-repair and textual-resolver loops with one
  shared constrained-repair loop whose success criterion is re-running
  the failed check (verify), not whether the agent edited files. Verify
  failures feed the retry prompt; maxAttempts is 2.
- Let the resolver declare a conflicting patch redundant: a verified
  no-change resolution is accepted as subsumed instead of failing the
  source (duplicate wiki-page creation from parallel work units).
- Carry per-source validation errors through validateWuTouchedSources
  into gate messages and work-unit failure reasons instead of
  discarding them.
- Move join-neighbor expansion into the shared validation path so
  work-unit validation and integration gates check the same set.
- Reject joins whose target resolves to no source, at sl_write time and
  in the gates, attributed to the declaring source. Resolution mirrors
  the Python engine exactly (case-sensitive name within the
  connection), with a case-mismatch hint for the writing agent.
This commit is contained in:
Andrey Avtomonov 2026-06-11 14:39:51 +02:00 committed by GitHub
parent 00cdf2de90
commit a278d2f7d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1119 additions and 705 deletions

View file

@ -2,20 +2,16 @@ import type { SemanticLayerService } from '../../context/sl/semantic-layer.servi
import type { TouchedSlSource } from '../../context/tools/touched-sl-sources.js';
import type { KnowledgeWikiService } from '../../context/wiki/knowledge-wiki.service.js';
import { findMissingWikiRefs } from '../wiki/wiki-ref-validation.js';
import type { WuValidationResult } from './stages/validate-wu-sources.js';
import { findInvalidWikiBodyRefs } from './wiki-body-refs.js';
interface TouchedValidationResult {
invalidSources: string[];
validSources: string[];
}
export interface FinalArtifactGateInput {
connectionIds: string[];
changedWikiPageKeys: string[];
touchedSlSources: TouchedSlSource[];
wikiService: KnowledgeWikiService;
semanticLayerService: SemanticLayerService;
validateTouchedSources(touched: TouchedSlSource[]): Promise<TouchedValidationResult>;
validateTouchedSources(touched: TouchedSlSource[]): Promise<WuValidationResult>;
tableExists(connectionId: string, tableRef: string): Promise<boolean>;
}
@ -40,54 +36,6 @@ function slEntityNames(source: Awaited<ReturnType<SemanticLayerService['loadAllS
]);
}
function uniqueTouchedSources(sources: TouchedSlSource[]): TouchedSlSource[] {
const seen = new Set<string>();
const unique: TouchedSlSource[] = [];
for (const source of sources) {
const key = `${source.connectionId}:${source.sourceName}`;
if (seen.has(key)) {
continue;
}
seen.add(key);
unique.push(source);
}
return unique.sort((left, right) => {
const byConnection = left.connectionId.localeCompare(right.connectionId);
return byConnection === 0 ? left.sourceName.localeCompare(right.sourceName) : byConnection;
});
}
async function expandTouchedSlSourcesWithDirectJoinNeighbors(input: FinalArtifactGateInput): Promise<TouchedSlSource[]> {
const expanded = [...input.touchedSlSources];
const touchedByConnection = new Map<string, Set<string>>();
for (const source of input.touchedSlSources) {
const bucket = touchedByConnection.get(source.connectionId) ?? new Set<string>();
bucket.add(source.sourceName);
touchedByConnection.set(source.connectionId, bucket);
}
for (const connectionId of input.connectionIds) {
const touched = touchedByConnection.get(connectionId);
if (!touched || touched.size === 0) {
continue;
}
const { sources } = await input.semanticLayerService.loadAllSources(connectionId);
for (const source of sources) {
const sourceIsTouched = touched.has(source.name);
if (sourceIsTouched) {
for (const join of source.joins ?? []) {
expanded.push({ connectionId, sourceName: join.to });
}
}
if ((source.joins ?? []).some((join) => touched.has(join.to))) {
expanded.push({ connectionId, sourceName: source.name });
}
}
}
return uniqueTouchedSources(expanded);
}
async function validateWikiSlRefs(input: FinalArtifactGateInput): Promise<string[]> {
const errors: string[] = [];
const sourcesByConnection = new Map<string, Awaited<ReturnType<SemanticLayerService['loadAllSources']>>['sources']>();
@ -146,9 +94,13 @@ async function validateWikiRefs(input: FinalArtifactGateInput): Promise<string[]
}
export async function validateFinalIngestArtifacts(input: FinalArtifactGateInput): Promise<void> {
const touchedWithDependencies = await expandTouchedSlSourcesWithDirectJoinNeighbors(input);
const validation = await input.validateTouchedSources(touchedWithDependencies);
const errors: string[] = validation.invalidSources.map((source) => `semantic-layer validation failed for ${source}`);
// Join-neighbor expansion happens inside validateTouchedSources so work-unit
// validation and this gate check the same set — a source that passes one
// passes the other.
const validation = await input.validateTouchedSources(input.touchedSlSources);
const errors: string[] = validation.invalidSources.map(
(invalid) => `semantic-layer validation failed for ${invalid.source}: ${invalid.errors.join('; ')}`,
);
errors.push(...(await validateWikiSlRefs(input)));
const danglingWikiRefs = await validateWikiRefs(input);
if (danglingWikiRefs.length > 0) {

View file

@ -0,0 +1,225 @@
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import { z } from 'zod';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../context/llm/runtime-port.js';
import type { IngestTraceWriter } from './ingest-trace.js';
import { traceTimed } from './ingest-trace.js';
/**
* Shared loop for the two integration-time repair agents (semantic gate
* repair, textual conflict resolution). Success is decided by re-running the
* failed check `verify` never by whether the agent edited files: an
* ineffective edit fails, and an explicit no-change declaration that verifies
* succeeds.
*/
export type RepairVerification = { ok: true } | { ok: false; reason: string };
export type ConstrainedRepairResult =
| { status: 'repaired'; attempts: number; changedPaths: string[] }
| { status: 'failed'; attempts: number; reason: string };
export interface ConstrainedRepairToolContext {
workdir: string;
allowedPaths: ReadonlySet<string>;
editedPaths: Set<string>;
declareNoChange(reason: string): void;
}
export interface ConstrainedRepairLoopInput {
agentRunner: AgentRunnerPort;
workdir: string;
allowedPaths: string[];
trace: IngestTraceWriter;
tracePhase: string;
traceEventName: string;
traceData: Record<string, unknown>;
systemPrompt: string;
buildUserPrompt(input: { attempt: number; maxAttempts: number; previousFailure: string | null }): string;
buildExtraTools?(context: ConstrainedRepairToolContext): KtxRuntimeToolSet;
verify(changedPaths: string[]): Promise<RepairVerification>;
/** Failure reason when an attempt neither edits nor declares no-change. */
noChangeFailureReason: string;
telemetryTags: Record<string, string>;
maxAttempts?: number;
stepBudget?: number;
abortSignal?: AbortSignal;
}
const readRepairFileSchema = z.object({
path: z.string().min(1),
});
const writeRepairFileSchema = z.object({
path: z.string().min(1),
content: z.string(),
});
function normalizeRepoPath(path: string): string {
const normalized = path.replace(/\\/g, '/').replace(/^\/+/, '');
const parts = normalized.split('/').filter((part) => part.length > 0);
if (parts.length === 0 || parts.some((part) => part === '.' || part === '..')) {
throw new Error(`repair path must be a repository-relative path: ${path}`);
}
return parts.join('/');
}
function assertAllowedPath(path: string, allowedPaths: ReadonlySet<string>): string {
const normalized = normalizeRepoPath(path);
if (!allowedPaths.has(normalized)) {
throw new Error(`repair path not allowed: ${normalized}`);
}
return normalized;
}
async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> {
try {
return { exists: true, content: await readFile(path, 'utf-8') };
} catch (error) {
if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') {
return { exists: false, content: '' };
}
throw error;
}
}
function buildRepairFileTools(context: ConstrainedRepairToolContext): KtxRuntimeToolSet {
return {
read_repair_file: {
name: 'read_repair_file',
description: 'Read one allowed file from the integration worktree.',
inputSchema: readRepairFileSchema,
execute: async ({ path }: z.infer<typeof readRepairFileSchema>) => {
const normalized = assertAllowedPath(path, context.allowedPaths);
const file = await readOptionalFile(join(context.workdir, normalized));
return {
markdown: file.exists ? file.content : `(missing file: ${normalized})`,
structured: { path: normalized, exists: file.exists },
};
},
},
write_repair_file: {
name: 'write_repair_file',
description: 'Replace one allowed integration worktree file with repaired text content.',
inputSchema: writeRepairFileSchema,
execute: async ({ path, content }: z.infer<typeof writeRepairFileSchema>) => {
const normalized = assertAllowedPath(path, context.allowedPaths);
const fullPath = join(context.workdir, normalized);
await mkdir(dirname(fullPath), { recursive: true });
await writeFile(fullPath, content, 'utf-8');
context.editedPaths.add(normalized);
return {
markdown: `Wrote ${normalized}`,
structured: { path: normalized, bytes: Buffer.byteLength(content) },
};
},
},
};
}
export function buildDeleteRepairFileTool(context: ConstrainedRepairToolContext): KtxRuntimeToolSet {
const deleteRepairFileSchema = z.object({
path: z.string().min(1),
});
return {
delete_repair_file: {
name: 'delete_repair_file',
description: 'Delete one allowed integration worktree file when the failed patch proves the deletion is correct.',
inputSchema: deleteRepairFileSchema,
execute: async ({ path }: z.infer<typeof deleteRepairFileSchema>) => {
const normalized = assertAllowedPath(path, context.allowedPaths);
await rm(join(context.workdir, normalized), { force: true });
context.editedPaths.add(normalized);
return {
markdown: `Deleted ${normalized}`,
structured: { path: normalized },
};
},
},
};
}
export async function runConstrainedRepairLoop(input: ConstrainedRepairLoopInput): Promise<ConstrainedRepairResult> {
const allowedPaths = new Set(input.allowedPaths.map(normalizeRepoPath));
const sortedAllowedPaths = [...allowedPaths].sort();
const maxAttempts = input.maxAttempts ?? 2;
const stepBudget = input.stepBudget ?? 16;
// Edits persist in the worktree across attempts, so the verified set and the
// reported changedPaths accumulate over the whole loop.
const editedPaths = new Set<string>();
let lastFailure = 'repair did not run';
let previousFailure: string | null = null;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
let noChangeDeclaration: string | null = null;
const toolContext: ConstrainedRepairToolContext = {
workdir: input.workdir,
allowedPaths,
editedPaths,
declareNoChange: (reason: string) => {
noChangeDeclaration = reason;
},
};
const traceData = {
...input.traceData,
attempt,
maxAttempts,
allowedPaths: sortedAllowedPaths,
};
const result = await traceTimed(input.trace, input.tracePhase, input.traceEventName, traceData, async () =>
input.agentRunner.runLoop({
modelRole: 'repair',
systemPrompt: input.systemPrompt,
userPrompt: input.buildUserPrompt({ attempt, maxAttempts, previousFailure }),
toolSet: {
...buildRepairFileTools(toolContext),
...(input.buildExtraTools?.(toolContext) ?? {}),
},
stepBudget,
telemetryTags: input.telemetryTags,
abortSignal: input.abortSignal,
}),
);
if (result.stopReason === 'error') {
lastFailure = result.error?.message ?? 'repair agent loop errored';
previousFailure = lastFailure;
await input.trace.event('error', input.tracePhase, `${input.traceEventName}_failed`, traceData, result.error);
continue;
}
const changedPaths = [...editedPaths].sort();
if (changedPaths.length === 0 && noChangeDeclaration === null) {
// Nothing changed and nothing was claimed: the failed check would fail
// identically, so skip verification and retry.
lastFailure = input.noChangeFailureReason;
previousFailure = lastFailure;
await input.trace.event('error', input.tracePhase, `${input.traceEventName}_failed`, {
...traceData,
reason: lastFailure,
});
continue;
}
const verification = await input.verify(changedPaths);
if (!verification.ok) {
lastFailure = verification.reason;
previousFailure = lastFailure;
await input.trace.event('error', input.tracePhase, `${input.traceEventName}_failed`, {
...traceData,
changedPaths,
reason: lastFailure,
});
continue;
}
await input.trace.event('debug', input.tracePhase, `${input.traceEventName}_repaired`, {
...traceData,
changedPaths,
...(noChangeDeclaration !== null ? { noChangeDeclaration } : {}),
});
return { status: 'repaired', attempts: attempt, changedPaths };
}
return { status: 'failed', attempts: maxAttempts, reason: lastFailure };
}

View file

@ -1,15 +1,12 @@
import { mkdir, readFile, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import { z } from 'zod';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../context/llm/runtime-port.js';
import type { ConstrainedRepairResult, RepairVerification } from './constrained-repair.js';
import { runConstrainedRepairLoop } from './constrained-repair.js';
import type { IngestTraceWriter } from './ingest-trace.js';
import { traceTimed } from './ingest-trace.js';
type FinalGateRepairKind = 'patch_semantic_gate' | 'final_artifact_gate';
export type FinalGateRepairResult =
| { status: 'repaired'; attempts: number; changedPaths: string[] }
| { status: 'failed'; attempts: number; reason: string };
export type FinalGateRepairResult = ConstrainedRepairResult;
export interface RepairFinalGateFailureInput {
agentRunner: AgentRunnerPort;
@ -18,48 +15,17 @@ export interface RepairFinalGateFailureInput {
allowedPaths: string[];
trace: IngestTraceWriter;
repairKind: FinalGateRepairKind;
/**
* Re-runs the failed gate against the current worktree. The repair counts
* as successful only when this passes editing files is not the success
* signal.
*/
verify(changedPaths: string[]): Promise<RepairVerification>;
maxAttempts?: number;
stepBudget?: number;
abortSignal?: AbortSignal;
}
const readRepairFileSchema = z.object({
path: z.string().min(1),
});
const writeRepairFileSchema = z.object({
path: z.string().min(1),
content: z.string(),
});
function normalizeRepoPath(path: string): string {
const normalized = path.replace(/\\/g, '/').replace(/^\/+/, '');
const parts = normalized.split('/').filter((part) => part.length > 0);
if (parts.length === 0 || parts.some((part) => part === '.' || part === '..')) {
throw new Error(`gate repair path must be a repository-relative path: ${path}`);
}
return parts.join('/');
}
function assertAllowedPath(path: string, allowedPaths: ReadonlySet<string>): string {
const normalized = normalizeRepoPath(path);
if (!allowedPaths.has(normalized)) {
throw new Error(`gate repair path not allowed: ${normalized}`);
}
return normalized;
}
async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> {
try {
return { exists: true, content: await readFile(path, 'utf-8') };
} catch (error) {
if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') {
return { exists: false, content: '' };
}
throw error;
}
}
function buildGateRepairSystemPrompt(): string {
return `<role>
You repair one ktx isolated-diff artifact gate failure inside the integration worktree.
@ -82,7 +48,11 @@ function buildGateRepairUserPrompt(input: {
repairKind: FinalGateRepairKind;
attempt: number;
maxAttempts: number;
previousFailure: string | null;
}): string {
const previousFailureBlock = input.previousFailure
? `\nPrevious attempt did not pass the gate:\n${input.previousFailure}\n`
: '';
return `Repair isolated-diff artifact gates.
Repair kind: ${input.repairKind}
@ -93,56 +63,22 @@ ${input.allowedPaths.map((path) => `- ${path}`).join('\n')}
Gate error:
${input.gateError}
${previousFailureBlock}
Use read_gate_error first. Then inspect only the allowed files, write the
minimal repaired content, and stop.`;
}
function buildToolSet(input: {
workdir: string;
gateError: string;
allowedPaths: ReadonlySet<string>;
editedPaths: Set<string>;
}): KtxRuntimeToolSet {
function buildReadGateErrorTool(gateError: string): KtxRuntimeToolSet {
return {
read_gate_error: {
name: 'read_gate_error',
description: 'Read the artifact gate failure that must be repaired.',
inputSchema: z.object({}),
execute: async () => ({
markdown: input.gateError,
structured: { gateError: input.gateError },
markdown: gateError,
structured: { gateError },
}),
},
read_repair_file: {
name: 'read_repair_file',
description: 'Read one allowed file from the integration worktree.',
inputSchema: readRepairFileSchema,
execute: async ({ path }: z.infer<typeof readRepairFileSchema>) => {
const normalized = assertAllowedPath(path, input.allowedPaths);
const file = await readOptionalFile(join(input.workdir, normalized));
return {
markdown: file.exists ? file.content : `(missing file: ${normalized})`,
structured: { path: normalized, exists: file.exists },
};
},
},
write_repair_file: {
name: 'write_repair_file',
description: 'Replace one allowed integration worktree file with repaired text content.',
inputSchema: writeRepairFileSchema,
execute: async ({ path, content }: z.infer<typeof writeRepairFileSchema>) => {
const normalized = assertAllowedPath(path, input.allowedPaths);
const fullPath = join(input.workdir, normalized);
await mkdir(dirname(fullPath), { recursive: true });
await writeFile(fullPath, content, 'utf-8');
input.editedPaths.add(normalized);
return {
markdown: `Wrote ${normalized}`,
structured: { path: normalized, bytes: Buffer.byteLength(content) },
};
},
},
};
}
@ -163,71 +99,38 @@ export function finalGateRepairPaths(input: {
export async function repairFinalGateFailure(
input: RepairFinalGateFailureInput,
): Promise<FinalGateRepairResult> {
const allowedPaths = new Set(input.allowedPaths.map(normalizeRepoPath));
const maxAttempts = input.maxAttempts ?? 1;
const stepBudget = input.stepBudget ?? 16;
let lastFailure = 'gate repair did not run';
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
const editedPaths = new Set<string>();
const sortedAllowedPaths = [...allowedPaths].sort();
const traceData = {
return runConstrainedRepairLoop({
agentRunner: input.agentRunner,
workdir: input.workdir,
allowedPaths: input.allowedPaths,
trace: input.trace,
tracePhase: 'gate_repair',
traceEventName: 'gate_repair',
traceData: {
repairKind: input.repairKind,
attempt,
maxAttempts,
allowedPaths: sortedAllowedPaths,
gateError: input.gateError,
};
const result = await traceTimed(input.trace, 'gate_repair', 'gate_repair', traceData, async () =>
input.agentRunner.runLoop({
modelRole: 'repair',
systemPrompt: buildGateRepairSystemPrompt(),
userPrompt: buildGateRepairUserPrompt({
gateError: input.gateError,
allowedPaths: sortedAllowedPaths,
repairKind: input.repairKind,
attempt,
maxAttempts,
}),
toolSet: buildToolSet({
workdir: input.workdir,
gateError: input.gateError,
allowedPaths,
editedPaths,
}),
stepBudget,
telemetryTags: {
operationName: 'ingest-isolated-diff-gate-repair',
source: input.trace.context.sourceKey,
jobId: input.trace.context.jobId,
repairKind: input.repairKind,
},
abortSignal: input.abortSignal,
},
systemPrompt: buildGateRepairSystemPrompt(),
buildUserPrompt: ({ attempt, maxAttempts, previousFailure }) =>
buildGateRepairUserPrompt({
gateError: input.gateError,
allowedPaths: [...input.allowedPaths].sort(),
repairKind: input.repairKind,
attempt,
maxAttempts,
previousFailure,
}),
);
if (result.stopReason === 'error') {
lastFailure = result.error?.message ?? 'gate repair agent loop errored';
await input.trace.event('error', 'gate_repair', 'gate_repair_failed', traceData, result.error);
continue;
}
const changedPaths = [...editedPaths].sort();
if (changedPaths.length === 0) {
lastFailure = 'gate repair completed without editing an allowed path';
await input.trace.event('error', 'gate_repair', 'gate_repair_failed', {
...traceData,
reason: lastFailure,
});
continue;
}
await input.trace.event('debug', 'gate_repair', 'gate_repair_repaired', {
...traceData,
changedPaths,
});
return { status: 'repaired', attempts: attempt, changedPaths };
}
return { status: 'failed', attempts: maxAttempts, reason: lastFailure };
buildExtraTools: () => buildReadGateErrorTool(input.gateError),
verify: input.verify,
noChangeFailureReason: 'gate repair completed without editing an allowed path',
telemetryTags: {
operationName: 'ingest-isolated-diff-gate-repair',
source: input.trace.context.sourceKey,
jobId: input.trace.context.jobId,
repairKind: input.repairKind,
},
maxAttempts: input.maxAttempts,
stepBudget: input.stepBudget ?? 16,
abortSignal: input.abortSignal,
});
}

View file

@ -1817,7 +1817,8 @@ export class IngestBundleRunner {
touchedPaths: context.touchedPaths,
trace: runTrace,
reason: context.reason,
maxAttempts: 1,
verify: context.verify,
maxAttempts: 2,
stepBudget: 12,
abortSignal: ctx?.abortSignal,
});
@ -1839,7 +1840,8 @@ export class IngestBundleRunner {
allowedPaths: context.touchedPaths,
trace: runTrace,
repairKind: 'patch_semantic_gate',
maxAttempts: 1,
verify: context.verify,
maxAttempts: 2,
stepBudget: 16,
abortSignal: ctx?.abortSignal,
});
@ -2546,40 +2548,41 @@ export class IngestBundleRunner {
activePhase = 'final_gates';
activeFailureDetails = finalArtifactGateTraceData;
emitStageProgress('final_gates', 89, 'Running final artifact gates');
const runFinalArtifactGates = async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
};
try {
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates',
finalArtifactGateTraceData,
async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
runFinalArtifactGates,
);
} catch (error) {
const gateError = this.errorMessage(error);
@ -2595,7 +2598,15 @@ export class IngestBundleRunner {
allowedPaths: repairPaths,
trace: runTrace,
repairKind: 'final_artifact_gate',
maxAttempts: 1,
verify: async () => {
try {
await runFinalArtifactGates();
return { ok: true };
} catch (verifyError) {
return { ok: false, reason: this.errorMessage(verifyError) };
}
},
maxAttempts: 2,
stepBudget: 16,
abortSignal: ctx?.abortSignal,
});
@ -2611,44 +2622,9 @@ export class IngestBundleRunner {
throw new Error(`${gateError}\ngate repair failed: ${gateRepair.reason}`);
}
// The repair loop re-ran the gates via `verify` before reporting
// success, so a repaired status here means the tree already passed.
isolatedDiffSummary.gateRepairs += 1;
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates_after_gate_repair',
{
...finalArtifactGateTraceData,
repairedPaths: gateRepair.changedPaths,
},
async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
);
const repairCommit = await sessionWorktree.git.commitFiles(
gateRepair.changedPaths,

View file

@ -1,35 +1,32 @@
import { readFile } from 'node:fs/promises';
import type { GitService } from '../../../context/core/git.service.js';
import type { RepairVerification } from '../constrained-repair.js';
import type { FinalGateRepairResult } from '../final-gate-repair.js';
import type { IngestTraceWriter } from '../ingest-trace.js';
import { traceTimed } from '../ingest-trace.js';
import { assertPatchAllowedForWorkUnit, parsePatchTouchedPaths } from './git-patch.js';
import type { TextualConflictResolutionResult } from './textual-conflict-resolver.js';
type PatchIntegrationTextualResolution =
| { status: 'repaired'; attempts: number; changedPaths: string[] }
| { status: 'failed'; attempts: number; reason: string };
export type PatchIntegrationResult =
| {
status: 'accepted';
commitSha: string;
touchedPaths: string[];
textualResolution?: PatchIntegrationTextualResolution;
textualResolution?: TextualConflictResolutionResult;
gateRepair?: FinalGateRepairResult;
}
| {
status: 'textual_conflict';
reason: string;
touchedPaths: string[];
textualResolution?: PatchIntegrationTextualResolution;
textualResolution?: TextualConflictResolutionResult;
gateRepair?: FinalGateRepairResult;
}
| {
status: 'semantic_conflict';
reason: string;
touchedPaths: string[];
textualResolution?: PatchIntegrationTextualResolution;
textualResolution?: TextualConflictResolutionResult;
gateRepair?: FinalGateRepairResult;
};
@ -47,12 +44,14 @@ export interface IntegrateWorkUnitPatchInput {
patchPath: string;
touchedPaths: string[];
reason: string;
verify(changedPaths: string[]): Promise<RepairVerification>;
}): Promise<TextualConflictResolutionResult>;
repairGateFailure?(input: {
unitKey: string;
patchPath: string;
touchedPaths: string[];
reason: string;
verify(changedPaths: string[]): Promise<RepairVerification>;
}): Promise<FinalGateRepairResult>;
}
@ -94,6 +93,19 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
};
}
// Repair and resolution success is decided by this check, not by whether
// the repair agent edited files: the gates re-run over the union of the
// patch's paths and everything the agent changed.
const verifyAppliedTree = async (changedPaths: string[]): Promise<RepairVerification> => {
const paths = [...new Set([...touchedPaths, ...changedPaths])].sort();
try {
await input.validateAppliedTree(paths);
return { ok: true };
} catch (error) {
return { ok: false, reason: errorMessage(error) };
}
};
try {
await traceTimed(
input.trace,
@ -130,6 +142,7 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
patchPath: input.patchPath,
touchedPaths,
reason,
verify: verifyAppliedTree,
});
if (textualResolution.status === 'failed') {
@ -144,115 +157,20 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
};
}
try {
await traceTimed(
input.trace,
'integration',
'semantic_gate_after_textual_resolution',
{ unitKey: input.unitKey, touchedPaths: textualResolution.changedPaths },
async () => {
await input.validateAppliedTree(textualResolution.changedPaths);
},
);
} catch (semanticError) {
const reason = errorMessage(semanticError);
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
if (textualResolution.changedPaths.length === 0) {
// The resolver declared the patch redundant and the gates verified the
// current tree: the integration worktree already represents this work
// unit's content (e.g. a duplicate page created by another work unit).
await input.trace.event('debug', 'integration', 'patch_subsumed_after_textual_resolution', {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: textualResolution.changedPaths,
reason,
touchedPaths,
attempts: textualResolution.attempts,
});
// A textual conflict and a semantic-gate failure can co-occur: the resolver
// reconciles the text but can leave wiki sl_refs pointing at measures the
// merged source no longer defines. Recover via the same gate repair the
// clean-apply branch uses, instead of hard-failing the whole job.
if (input.repairGateFailure) {
const gateRepair = await input.repairGateFailure({
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: textualResolution.changedPaths,
reason,
});
if (gateRepair.status !== 'failed') {
// The resolver wrote its merge to the worktree (unstaged); the repair
// edited a subset on top. Commit the union so neither is dropped.
const resolvedAndRepairedPaths = [
...new Set([...textualResolution.changedPaths, ...gateRepair.changedPaths]),
].sort();
try {
await traceTimed(
input.trace,
'integration',
'semantic_gate_after_gate_repair',
{ unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths },
async () => {
await input.validateAppliedTree(gateRepair.changedPaths);
},
);
const commit = await input.integrationGit.commitFiles(
resolvedAndRepairedPaths,
`ingest: resolve WorkUnit ${input.unitKey} conflict`,
input.author.name,
input.author.email,
);
if (commit.created) {
await input.trace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', {
unitKey: input.unitKey,
commitSha: commit.commitHash,
touchedPaths: resolvedAndRepairedPaths,
attempts: textualResolution.attempts,
gateRepairAttempts: gateRepair.attempts,
});
return {
status: 'accepted',
commitSha: commit.commitHash,
touchedPaths: resolvedAndRepairedPaths,
textualResolution,
gateRepair,
};
}
} catch (repairValidationError) {
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
await input.trace.event('error', 'integration', 'patch_semantic_conflict_after_textual_resolution', {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: gateRepair.changedPaths,
reason: errorMessage(repairValidationError),
});
return {
status: 'semantic_conflict',
reason: errorMessage(repairValidationError),
touchedPaths: gateRepair.changedPaths,
textualResolution,
gateRepair,
};
}
}
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
return {
status: 'semantic_conflict',
reason: gateRepair.status === 'failed' ? gateRepair.reason : reason,
touchedPaths: textualResolution.changedPaths,
textualResolution,
gateRepair,
};
}
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
return {
status: 'semantic_conflict',
reason,
touchedPaths: textualResolution.changedPaths,
status: 'accepted',
commitSha: preApplyHead ?? '',
touchedPaths: [],
textualResolution,
};
}
@ -264,19 +182,18 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
input.author.email,
);
if (!commit.created) {
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
const noChangeReason = 'textual resolver produced no committable changes';
await input.trace.event('error', 'integration', 'textual_conflict_resolver_noop', {
// The resolver's writes left the tree byte-identical to the accepted
// state, and the gates verified it — the patch is represented already.
await input.trace.event('debug', 'integration', 'patch_subsumed_after_textual_resolution', {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: textualResolution.changedPaths,
attempts: textualResolution.attempts,
});
return {
status: 'textual_conflict',
reason: noChangeReason,
touchedPaths: textualResolution.changedPaths,
status: 'accepted',
commitSha: preApplyHead ?? '',
touchedPaths: [],
textualResolution,
};
}
@ -314,6 +231,7 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
patchPath: input.patchPath,
touchedPaths,
reason,
verify: verifyAppliedTree,
});
if (gateRepair.status === 'failed') {
@ -328,28 +246,6 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
};
}
try {
await traceTimed(
input.trace,
'integration',
'semantic_gate_after_gate_repair',
{ unitKey: input.unitKey, touchedPaths: gateRepair.changedPaths },
async () => {
await input.validateAppliedTree(gateRepair.changedPaths);
},
);
} catch (repairValidationError) {
if (preApplyHead) {
await input.integrationGit.resetHardTo(preApplyHead);
}
return {
status: 'semantic_conflict',
reason: errorMessage(repairValidationError),
touchedPaths: gateRepair.changedPaths,
gateRepair,
};
}
const commit = await input.integrationGit.commitFiles(
gateRepair.changedPaths,
`ingest: repair WorkUnit ${input.unitKey} gates`,

View file

@ -1,13 +1,15 @@
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import { readFile } from 'node:fs/promises';
import { z } from 'zod';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../../context/llm/runtime-port.js';
import type {
ConstrainedRepairResult,
ConstrainedRepairToolContext,
RepairVerification,
} from '../constrained-repair.js';
import { buildDeleteRepairFileTool, runConstrainedRepairLoop } from '../constrained-repair.js';
import type { IngestTraceWriter } from '../ingest-trace.js';
import { traceTimed } from '../ingest-trace.js';
export type TextualConflictResolutionResult =
| { status: 'repaired'; attempts: number; changedPaths: string[] }
| { status: 'failed'; attempts: number; reason: string };
export type TextualConflictResolutionResult = ConstrainedRepairResult;
export interface ResolveTextualConflictInput {
agentRunner: AgentRunnerPort;
@ -17,52 +19,17 @@ export interface ResolveTextualConflictInput {
touchedPaths: string[];
trace: IngestTraceWriter;
reason: string;
/**
* Re-runs the artifact gates against the current worktree. A resolution
* including an explicit no-change declaration for a redundant patch
* counts as successful only when this passes.
*/
verify(changedPaths: string[]): Promise<RepairVerification>;
maxAttempts?: number;
stepBudget?: number;
abortSignal?: AbortSignal;
}
const readIntegrationFileSchema = z.object({
path: z.string().min(1),
});
const writeIntegrationFileSchema = z.object({
path: z.string().min(1),
content: z.string(),
});
const deleteIntegrationFileSchema = z.object({
path: z.string().min(1),
});
function normalizeRepoPath(path: string): string {
const normalized = path.replace(/\\/g, '/').replace(/^\/+/, '');
const parts = normalized.split('/').filter((part) => part.length > 0);
if (parts.length === 0 || parts.some((part) => part === '.' || part === '..')) {
throw new Error(`resolver path must be a repository-relative path: ${path}`);
}
return parts.join('/');
}
function assertAllowedPath(path: string, allowedPaths: ReadonlySet<string>): string {
const normalized = normalizeRepoPath(path);
if (!allowedPaths.has(normalized)) {
throw new Error(`resolver path not allowed: ${normalized}`);
}
return normalized;
}
async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> {
try {
return { exists: true, content: await readFile(path, 'utf-8') };
} catch (error) {
if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') {
return { exists: false, content: '' };
}
throw error;
}
}
function buildResolverSystemPrompt(): string {
return `<role>
You repair one failed ktx isolated-diff patch inside the integration worktree.
@ -71,10 +38,12 @@ You repair one failed ktx isolated-diff patch inside the integration worktree.
<rules>
- Preserve accepted integration content that is unrelated to the failed patch.
- Incorporate the failed patch only when the patch evidence is compatible with the current file.
- If the current file already represents everything the failed patch contributes (for example a
duplicate page created by another work unit), call declare_patch_redundant instead of editing.
- Edit only paths exposed by the resolver tools.
- Prefer the smallest text edit that makes the composed artifact coherent.
- Do not create new facts that are absent from the current file or failed patch.
- Stop after writing the repaired file content.
- Stop after writing the repaired file content or declaring the patch redundant.
</rules>`;
}
@ -85,7 +54,11 @@ function buildResolverUserPrompt(input: {
reason: string;
attempt: number;
maxAttempts: number;
previousFailure: string | null;
}): string {
const previousFailureBlock = input.previousFailure
? `\nPrevious attempt did not pass the artifact gates:\n${input.previousFailure}\n`
: '';
return `Repair isolated-diff textual conflict.
WorkUnit: ${input.unitKey}
@ -96,17 +69,22 @@ ${input.touchedPaths.map((path) => `- ${path}`).join('\n')}
Git apply failure:
${input.reason}
Use read_failed_patch first. Then read the touched integration files, write the
repaired content, and stop.`;
${previousFailureBlock}
Use read_failed_patch first. Then read the touched integration files and either
write the repaired content or, when the patch adds nothing the current files do
not already cover, call declare_patch_redundant. Then stop.`;
}
function buildToolSet(input: {
workdir: string;
function buildResolverExtraTools(input: {
patchPath: string;
allowedPaths: ReadonlySet<string>;
editedPaths: Set<string>;
context: ConstrainedRepairToolContext;
}): KtxRuntimeToolSet {
const declareSchema = z.object({
reason: z
.string()
.min(1)
.describe('Why the integration tree already represents everything this patch contributes.'),
});
return {
read_failed_patch: {
name: 'read_failed_patch',
@ -120,46 +98,18 @@ function buildToolSet(input: {
};
},
},
read_integration_file: {
name: 'read_integration_file',
description: 'Read one allowed file from the current integration worktree.',
inputSchema: readIntegrationFileSchema,
execute: async ({ path }: z.infer<typeof readIntegrationFileSchema>) => {
const normalized = assertAllowedPath(path, input.allowedPaths);
const file = await readOptionalFile(join(input.workdir, normalized));
...buildDeleteRepairFileTool(input.context),
declare_patch_redundant: {
name: 'declare_patch_redundant',
description:
'Declare that the failed patch needs no integration because the current worktree already ' +
'represents its content (for example a duplicate page created by another work unit).',
inputSchema: declareSchema,
execute: async ({ reason }: z.infer<typeof declareSchema>) => {
input.context.declareNoChange(reason);
return {
markdown: file.exists ? file.content : `(missing file: ${normalized})`,
structured: { path: normalized, exists: file.exists },
};
},
},
write_integration_file: {
name: 'write_integration_file',
description: 'Replace one allowed integration worktree file with repaired text content.',
inputSchema: writeIntegrationFileSchema,
execute: async ({ path, content }: z.infer<typeof writeIntegrationFileSchema>) => {
const normalized = assertAllowedPath(path, input.allowedPaths);
const fullPath = join(input.workdir, normalized);
await mkdir(dirname(fullPath), { recursive: true });
await writeFile(fullPath, content, 'utf-8');
input.editedPaths.add(normalized);
return {
markdown: `Wrote ${normalized}`,
structured: { path: normalized, bytes: Buffer.byteLength(content) },
};
},
},
delete_integration_file: {
name: 'delete_integration_file',
description: 'Delete one allowed integration worktree file when the failed patch proves the deletion is correct.',
inputSchema: deleteIntegrationFileSchema,
execute: async ({ path }: z.infer<typeof deleteIntegrationFileSchema>) => {
const normalized = assertAllowedPath(path, input.allowedPaths);
await rm(join(input.workdir, normalized), { force: true });
input.editedPaths.add(normalized);
return {
markdown: `Deleted ${normalized}`,
structured: { path: normalized },
markdown: `Declared patch redundant: ${reason}`,
structured: { reason },
};
},
},
@ -169,72 +119,42 @@ function buildToolSet(input: {
export async function resolveTextualConflict(
input: ResolveTextualConflictInput,
): Promise<TextualConflictResolutionResult> {
const allowedPaths = new Set(input.touchedPaths.map(normalizeRepoPath));
const maxAttempts = input.maxAttempts ?? 1;
const stepBudget = input.stepBudget ?? 12;
let lastFailure = 'resolver did not run';
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
const editedPaths = new Set<string>();
const traceData = {
const sortedTouchedPaths = [...input.touchedPaths].sort();
return runConstrainedRepairLoop({
agentRunner: input.agentRunner,
workdir: input.workdir,
allowedPaths: input.touchedPaths,
trace: input.trace,
tracePhase: 'resolver',
traceEventName: 'textual_conflict_resolver',
traceData: {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: [...allowedPaths].sort(),
attempt,
maxAttempts,
touchedPaths: sortedTouchedPaths,
reason: input.reason,
};
const result = await traceTimed(input.trace, 'resolver', 'textual_conflict_resolver', traceData, async () =>
input.agentRunner.runLoop({
modelRole: 'repair',
systemPrompt: buildResolverSystemPrompt(),
userPrompt: buildResolverUserPrompt({
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: [...allowedPaths].sort(),
reason: input.reason,
attempt,
maxAttempts,
}),
toolSet: buildToolSet({
workdir: input.workdir,
patchPath: input.patchPath,
allowedPaths,
editedPaths,
}),
stepBudget,
telemetryTags: {
operationName: 'ingest-isolated-diff-textual-resolver',
source: input.trace.context.sourceKey,
jobId: input.trace.context.jobId,
unitKey: input.unitKey,
},
abortSignal: input.abortSignal,
},
systemPrompt: buildResolverSystemPrompt(),
buildUserPrompt: ({ attempt, maxAttempts, previousFailure }) =>
buildResolverUserPrompt({
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths: sortedTouchedPaths,
reason: input.reason,
attempt,
maxAttempts,
previousFailure,
}),
);
if (result.stopReason === 'error') {
lastFailure = result.error?.message ?? 'resolver agent loop errored';
await input.trace.event('error', 'resolver', 'textual_conflict_resolver_failed', traceData, result.error);
continue;
}
const changedPaths = [...editedPaths].sort();
if (changedPaths.length === 0) {
lastFailure = 'resolver completed without editing an allowed path';
await input.trace.event('error', 'resolver', 'textual_conflict_resolver_failed', {
...traceData,
reason: lastFailure,
});
continue;
}
await input.trace.event('debug', 'resolver', 'textual_conflict_resolver_repaired', {
...traceData,
changedPaths,
});
return { status: 'repaired', attempts: attempt, changedPaths };
}
return { status: 'failed', attempts: maxAttempts, reason: lastFailure };
buildExtraTools: (context) => buildResolverExtraTools({ patchPath: input.patchPath, context }),
verify: input.verify,
noChangeFailureReason: 'resolver completed without editing an allowed path or declaring the patch redundant',
telemetryTags: {
operationName: 'ingest-isolated-diff-textual-resolver',
source: input.trace.context.sourceKey,
jobId: input.trace.context.jobId,
unitKey: input.unitKey,
},
maxAttempts: input.maxAttempts,
stepBudget: input.stepBudget ?? 12,
abortSignal: input.abortSignal,
});
}

View file

@ -3,20 +3,16 @@ import { isAbortError } from '../../core/abort.js';
import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js';
import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js';
import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js';
import { formatInvalidWuSources, type WuValidationResult } from './validate-wu-sources.js';
import type { WorkUnit } from '../types.js';
const MAX_WORK_UNIT_PROMPT_CHARS = 240_000;
interface TouchedValidationResult {
invalidSources: string[];
validSources: string[];
}
export interface WorkUnitExecutionDeps {
sessionWorktreeGit: { revParseHead(): Promise<string | null> };
agentRunner: AgentRunnerPort;
validateWikiRefs?: (actions: MemoryAction[]) => Promise<string[]>;
validateTouchedSources: (touched: TouchedSlSource[]) => Promise<TouchedValidationResult>;
validateTouchedSources: (touched: TouchedSlSource[]) => Promise<WuValidationResult>;
resetHardTo: (targetSha: string) => Promise<void>;
buildSystemPrompt: (wu: WorkUnit) => string;
buildUserPrompt: (wu: WorkUnit) => string;
@ -156,7 +152,7 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
// Spec: invalid SL writes reset the session worktree to the WU's pre-state, WU is marked failed,
// its files are absent from the Stage Index. Per-source surgical revert is the
// memory-agent pattern — NOT the bundle-ingest pattern.
return failWithReset(`sl_validate failed for: ${validation.invalidSources.join(', ')}`);
return failWithReset(`sl_validate failed for: ${formatInvalidWuSources(validation.invalidSources)}`);
}
}

View file

@ -1,24 +1,153 @@
import { findMissingJoinTargets, formatMissingJoinTarget } from '../../../context/sl/semantic-layer.service.js';
import type { SlValidationDeps } from '../../../context/sl/tools/sl-warehouse-validation.js';
import type { SlValidatorPort } from '../../../context/sl/sl-validator.port.js';
import type { TouchedSlSource } from '../../../context/tools/touched-sl-sources.js';
export interface InvalidWuSource {
/** `${connectionId}:${sourceName}` */
source: string;
errors: string[];
}
export interface WuValidationResult {
validSources: string[];
invalidSources: string[];
invalidSources: InvalidWuSource[];
}
export function formatInvalidWuSources(invalid: InvalidWuSource[]): string {
return invalid.map((entry) => `${entry.source} (${entry.errors.join('; ')})`).join(', ');
}
type LoadedSource = Awaited<ReturnType<SlValidationDeps['semanticLayerService']['loadAllSources']>>['sources'][number];
function uniqueTouchedSources(sources: TouchedSlSource[]): TouchedSlSource[] {
const seen = new Set<string>();
const unique: TouchedSlSource[] = [];
for (const source of sources) {
const key = `${source.connectionId}:${source.sourceName}`;
if (seen.has(key)) {
continue;
}
seen.add(key);
unique.push(source);
}
return unique.sort((left, right) => {
const byConnection = left.connectionId.localeCompare(right.connectionId);
return byConnection === 0 ? left.sourceName.localeCompare(right.sourceName) : byConnection;
});
}
/**
* Expand the touched set with direct join neighbors that exist: targets the
* touched sources join to, and existing sources that join to a touched one.
* Missing targets are not added here they are reported as join-target
* errors on the source that declares them, so the failure names the file
* that must change instead of the phantom neighbor.
*/
function expandWithExistingJoinNeighbors(
touched: TouchedSlSource[],
sourcesByConnection: Map<string, LoadedSource[]>,
): TouchedSlSource[] {
const expanded = [...touched];
const touchedByConnection = new Map<string, Set<string>>();
for (const source of touched) {
const bucket = touchedByConnection.get(source.connectionId) ?? new Set<string>();
bucket.add(source.sourceName);
touchedByConnection.set(source.connectionId, bucket);
}
for (const [connectionId, sources] of sourcesByConnection) {
const touchedNames = touchedByConnection.get(connectionId);
if (!touchedNames || touchedNames.size === 0) {
continue;
}
const existingNames = new Set(sources.map((source) => source.name));
for (const source of sources) {
if (touchedNames.has(source.name)) {
for (const join of source.joins ?? []) {
if (existingNames.has(join.to)) {
expanded.push({ connectionId, sourceName: join.to });
}
}
}
if ((source.joins ?? []).some((join) => touchedNames.has(join.to))) {
expanded.push({ connectionId, sourceName: source.name });
}
}
}
return uniqueTouchedSources(expanded);
}
/**
* Join-target errors attributable to this change set: every join declared by
* a touched source must resolve, and no source may be left joining to a name
* this change set removed. Pre-existing dangling joins on untouched sources
* are out of scope they must not block unrelated work. Resolution is the
* Python engine's: exact source-name match within the connection.
*/
function findJoinTargetErrors(
touched: TouchedSlSource[],
sourcesByConnection: Map<string, LoadedSource[]>,
): Map<string, string[]> {
const errorsBySource = new Map<string, string[]>();
const touchedByConnection = new Map<string, Set<string>>();
for (const source of touched) {
const bucket = touchedByConnection.get(source.connectionId) ?? new Set<string>();
bucket.add(source.sourceName);
touchedByConnection.set(source.connectionId, bucket);
}
for (const [connectionId, sources] of sourcesByConnection) {
const touchedNames = touchedByConnection.get(connectionId);
if (!touchedNames || touchedNames.size === 0) {
continue;
}
const existingNames = sources.map((source) => source.name);
for (const source of sources) {
const sourceIsTouched = touchedNames.has(source.name);
const candidateJoins = sourceIsTouched
? source.joins
: (source.joins ?? []).filter((join) => touchedNames.has(join.to));
const missing = findMissingJoinTargets(candidateJoins, existingNames);
if (missing.length === 0) {
continue;
}
const key = `${connectionId}:${source.name}`;
const messages = missing.map(formatMissingJoinTarget);
errorsBySource.set(key, [...(errorsBySource.get(key) ?? []), ...messages]);
}
}
return errorsBySource;
}
export async function validateWuTouchedSources(
deps: SlValidationDeps & { slValidator: SlValidatorPort<SlValidationDeps> },
touched: TouchedSlSource[],
): Promise<WuValidationResult> {
if (touched.length === 0) {
return { validSources: [], invalidSources: [] };
}
const sourcesByConnection = new Map<string, LoadedSource[]>();
for (const connectionId of new Set(touched.map((source) => source.connectionId))) {
const { sources } = await deps.semanticLayerService.loadAllSources(connectionId);
sourcesByConnection.set(connectionId, sources);
}
const expanded = expandWithExistingJoinNeighbors(touched, sourcesByConnection);
const joinTargetErrors = findJoinTargetErrors(touched, sourcesByConnection);
const valid: string[] = [];
const invalid: string[] = [];
for (const source of touched) {
const invalid: InvalidWuSource[] = [];
for (const source of expanded) {
const key = `${source.connectionId}:${source.sourceName}`;
const result = await deps.slValidator.validateSingleSource(deps, source.connectionId, source.sourceName);
if (result.errors.length === 0) {
valid.push(`${source.connectionId}:${source.sourceName}`);
const errors = [...result.errors, ...(joinTargetErrors.get(key) ?? [])];
if (errors.length === 0) {
valid.push(key);
} else {
invalid.push(`${source.connectionId}:${source.sourceName}`);
invalid.push({ source: key, errors });
}
}
return { validSources: valid, invalidSources: invalid };

View file

@ -521,6 +521,7 @@ export class SemanticLayerService {
return null;
}
async validatePhysicalTableReferences(
connectionId: string,
sources: SemanticLayerSource[],
@ -759,6 +760,23 @@ export class SemanticLayerService {
}
merged.push(toPush);
// A join target the engine cannot resolve fails every downstream gate and
// query with the error attributed to the phantom target. Reject it here,
// on the source that declares it, while the writing agent can still fix it.
const missingJoinTargets = findMissingJoinTargets(
toPush.joins,
merged.map((s) => s.name),
);
const joinTargetErrors = missingJoinTargets.map(
(missing) =>
`${toPush.name}: ${formatMissingJoinTarget(missing)}. Declare joins only to existing ` +
`semantic-layer sources in this connection, or drop the join and keep the relationship ` +
`in a column description.`,
);
if (joinTargetErrors.length > 0) {
return { errors: [...loadErrors, ...joinTargetErrors], warnings: [], perSourceWarnings: {} };
}
const validatable = merged.filter((s) => s.table != null || s.sql != null);
if (validatable.length === 0) {
return { errors: loadErrors, warnings: [], perSourceWarnings: {} };
@ -1427,6 +1445,47 @@ function parseJoinColumns(
return { localColumn: left.column, targetColumn: right.column };
}
export interface MissingJoinTarget {
to: string;
/** Source whose name matches only case-insensitively, if any — the usual authoring mistake. */
caseMismatch: string | null;
}
/**
* Join targets that do not exactly match a known source name. The Python
* engine resolves `joins[].to` by exact name within one connection's source
* set (`engine._collect_orphan_join_target_errors`) and `query()` raises on a
* miss, so anything looser here case-insensitive matches, table refs,
* sources in other connections would pass this gate and then fail
* query/validation as an orphan join target.
*/
export function findMissingJoinTargets(
joins: Array<{ to: string }> | undefined,
knownSourceNames: Iterable<string>,
): MissingJoinTarget[] {
const known = new Set<string>();
const canonicalByLower = new Map<string, string>();
for (const name of knownSourceNames) {
known.add(name);
canonicalByLower.set(name.toLowerCase(), name);
}
const missing: MissingJoinTarget[] = [];
for (const join of joins ?? []) {
if (known.has(join.to)) {
continue;
}
missing.push({ to: join.to, caseMismatch: canonicalByLower.get(join.to.toLowerCase()) ?? null });
}
return missing;
}
export function formatMissingJoinTarget(missing: MissingJoinTarget): string {
const hint = missing.caseMismatch
? `; join targets are case-sensitive — the source is named "${missing.caseMismatch}"`
: '';
return `join target "${missing.to}" does not exist${hint}`;
}
/**
* Returns one message per measure-level segment reference that doesn't resolve to
* a segment defined on the source. Array is empty when every reference checks out.