fix(ingest): validate scan sources and wiki refs

This commit is contained in:
Andrey Avtomonov 2026-05-18 00:11:11 +02:00
parent 199c916549
commit 7cc9f0e70a
6 changed files with 300 additions and 46 deletions

View file

@ -40,6 +40,8 @@ The design has these goals:
- Run all agent-authored durable writes in isolated per-work-unit worktrees.
- Treat each work unit's git diff as its proposal artifact.
- Integrate accepted diffs through a shared artifact-aware merge path.
- Resolve expected cross-work-unit overlap with bounded agent repair before
failing the run.
- Run final global semantic gates before any changes reach the main project
worktree.
- Keep connector variance minimal and source-shaped, not pipeline-shaped.
@ -257,16 +259,72 @@ Integration has three conflict classes:
- Semantic conflict: the patch applies textually but creates an invalid or
inconsistent artifact.
Textual conflicts are resolved before semantic gates run only when deterministic
artifact-aware merge helpers can prove a valid result. Version one has no
interactive, CLI, or agent-driven resolver.
Textual conflicts are resolved before semantic gates run when a bounded
resolver agent can produce a valid result. Overlapping work-unit writes are
normal, especially for Metabase cards that target the same semantic-layer marts
from different collections. The runner must treat overlap as an integration
case, not as a reason to fail immediately.
In version one, routing to a resolver means fail-fast resolver behavior. The
runner stops the run with a structured conflict error, marks the ingest failed,
preserves the integration worktree for inspection, retains work-unit
diffs/transcripts and existing report metadata, and does not squash anything
back to the project main worktree. Interactive prompts, waiting for human input,
retry loops, and agent-driven conflict repair are future work.
Version one is agent-first. If `git apply --3way --index` leaves conflicts,
the runner starts a resolver agent in the integration worktree. The resolver
receives only the failed patch, already-applied patches, conflicted files,
relevant work-unit transcripts, raw evidence paths, and the final-gate rules.
The resolver must preserve all non-conflicting accepted content, resolve
duplicate or competing artifact entries from evidence, and edit only files
touched by the failed patch or already-applied overlapping patches.
The runner then reruns artifact gates for the changed files and continues with
the remaining patches if validation passes. Resolver attempts are capped to
avoid an unbounded repair loop. A run fails only after the bounded resolver
attempts cannot produce a valid integration tree.
Deterministic semantic merge is a later optimization, not a version-one
requirement. After measuring resolver latency, cost, and failure modes, KTX can
add merge helpers for common semantic-layer YAML cases, such as additive
`measures`, `segments`, `columns`, `joins`, and `descriptions` updates keyed by
their stable logical identifiers. Those helpers can replace agent calls for
mechanical merges once the measured v1 behavior justifies the added complexity.
The integration worktree is preserved on failure with conflict markers or
resolver edits, work-unit patches, transcripts, trace events, and the failure
report. The runner never squashes a failed or partially repaired integration
tree back to the project main worktree.
### Gate repair stage
The gate repair stage handles cases where patches apply cleanly but the
combined tree fails final semantic or wiki gates. This is distinct from textual
conflict resolution: the tree is textually valid, but the artifacts violate KTX
contracts.
After each patch integration and after reconciliation, the runner runs final
artifact gates for the affected scope. If gates fail, the runner classifies the
errors before deciding whether to repair or fail.
Repairable gate errors include:
- stale wiki body references to renamed semantic-layer entities;
- invalid `sl_refs` entries that point to entities instead of sources;
- inline prose that accidentally uses explicit SL reference syntax;
- duplicate measures, segments, or joins with equivalent definitions;
- missing or stale wiki references created by accepted patches; and
- join or source references that can be corrected from the composed manifest
and work-unit evidence.
High-risk gate errors fail without automatic repair unless a later
implementation adds a stronger evidence contract:
- two work units define the same measure with different business meaning;
- a required warehouse table or column does not exist;
- a SQL source fails execution and no obvious localized rewrite exists; or
- the repair would require choosing between conflicting facts without evidence.
For repairable errors, the runner starts a gate repair agent with the exact
gate errors, changed files, relevant work-unit transcripts, raw evidence paths,
and final-gate rules. The agent may edit only the files involved in the gate
failure. The runner reruns gates after each repair attempt and caps attempts to
one or two passes per integration stage. If the tree still fails, the run stops
with the final gate report and preserved integration worktree.
### Reconciliation in the new flow
@ -297,8 +355,9 @@ write only to target connections authorized by the adapter for the ingest run,
but it is not subject to any single work unit's `slDisallowed` scope. The final
global gates validate the combined tree after reconciliation. If reconciliation
introduces an invalid wiki or semantic-layer reference, touches an unauthorized
target, or records an unresolvable artifact conflict, the run stops under
version-one resolver behavior before squash.
target, or records an unresolvable artifact conflict, the runner sends
repairable failures through the gate repair stage and stops before squash only
when bounded repair cannot produce a valid tree.
## Artifact-aware integration
@ -331,11 +390,12 @@ outputs emitted through `emit_artifact_resolution` as
`ArtifactResolutionRecord` stage-index records. They are in-memory stage
records, not worktree files, and they feed the provenance gate.
Artifact-aware integration can start with validation-only behavior. It does not
need to auto-merge every semantic conflict in version one. If two diffs contest
the same source YAML or wiki page and the merge cannot prove correctness, the
runner must stop under version-one resolver behavior rather than silently
accepting stale references.
Artifact-aware integration starts with validation plus bounded agent repair.
It does not need semantic-layer YAML merge helpers in version one. If two diffs
contest the same source YAML or wiki page and bounded agent repair cannot prove
correctness, the runner must stop rather than silently accepting stale
references. Deterministic semantic merge helpers can be added after v1 metrics
show which conflicts are frequent, mechanical, and worth optimizing.
## Global semantic gates
@ -367,20 +427,23 @@ The wiki body gate needs a narrow grammar so ordinary prose does not become a
semantic-layer reference. In version one, an explicit body reference is one of
these Markdown forms outside fenced code blocks:
- an inline code token in the form `source.entity`, where `source` matches a
visible semantic-layer source and `entity` must match one of that source's
measures, dimensions, or segments;
- an inline code token in the form `connectionId/source.entity`, which validates
against that specific target connection;
- an inline code token in the form `source.entity`, where both parts are plain
identifier tokens, `source` matches a visible semantic-layer source, and
`entity` must match one of that source's measures, dimensions, or segments;
- an inline code token in the form `connectionId/source.entity`, where
`source.entity` follows the same plain-identifier rule and validates against
that specific target connection;
- an inline code token in the form `source:source_name`, which validates a
source-level semantic-layer reference; or
- an inline code token in the form `table:qualified_table_name`, which validates
a raw warehouse table reference against the visible raw table/catalog sources.
The parser ignores unformatted prose, fenced SQL examples, and unprefixed
single-token inline code. Two-part inline code that does not name a visible
semantic-layer source is not treated as an SL entity reference; use the
`table:` prefix for raw warehouse table references.
The parser ignores unformatted prose, fenced SQL examples, wildcard patterns
such as `mart_nrr_quarterly.*_arr_cents`, inline SQL predicates such as
`users.is_internal = false`, and unprefixed single-token inline code. Two-part
inline code that does not name a visible semantic-layer source is not treated
as an SL entity reference; use the `table:` prefix for raw warehouse table
references.
The `total_contract_arr_cents` incident is the regression case for this gate:
the integrated tree must fail if a wiki page references
@ -501,8 +564,17 @@ starting from the same base:
Additional tests cover:
- two work units editing different wiki pages without conflict;
- two work units editing the same semantic-layer source with a textual conflict,
where the run stops under version-one resolver behavior before squash;
- two work units editing the same semantic-layer overlay with additive changes,
where the resolver agent preserves both changes and gates the repaired file;
- two work units editing the same semantic-layer overlay with incompatible
definitions, where the resolver agent receives the conflict context and the
run fails only after bounded repair attempts cannot prove a result;
- a textual conflict in a wiki page where the resolver agent preserves
non-conflicting accepted content and gates the repaired page before squash;
- a cleanly merged tree that fails final gates, where the gate repair agent
fixes a stale wiki reference and the run continues;
- an unrepairable final-gate failure, such as a missing warehouse column, where
the runner stops with a preserved integration worktree and report;
- a hybrid adapter case where deterministic projector outputs are visible in a
child worktree before work-unit wiki synthesis, and the final global gate
catches any stale reference to a non-existent projected semantic-layer entity;
@ -522,13 +594,18 @@ configuration knob.
1. Add the per-work-unit worktree executor behind that internal runner setting.
2. Add diff collection and deterministic integration in the existing runner.
3. Add final global wiki and semantic-layer reference gates, including the wiki
3. Add bounded resolver-agent handling for textual conflicts.
4. Add final global wiki and semantic-layer reference gates, including the wiki
body reference parser defined above.
4. Migrate Metabase to the new execution path first.
5. Migrate Notion, LookML, Looker, dbt, and MetricFlow.
6. Promote the new path to the default after the Metabase regression test and
5. Add bounded gate-repair-agent handling for repairable final-gate failures.
6. Instrument resolver latency, attempts, repaired files, and failure classes.
7. Migrate Metabase to the new execution path first.
8. Migrate Notion, LookML, Looker, dbt, and MetricFlow.
9. Add deterministic semantic merge helpers only after v1 metrics show which
agent repairs are frequent and mechanical enough to justify optimization.
10. Promote the new path to the default after the Metabase regression test and
at least one non-Metabase connector pass.
7. Remove the old shared-worktree work-unit execution path.
11. Remove the old shared-worktree work-unit execution path.
The rollout is complete when every connector that permits agent-authored durable
writes uses isolated diffs and all integrations pass the same final global

View file

@ -168,6 +168,7 @@ const makeDeps = () => {
};
const wikiService = {
forWorktree: vi.fn(),
listPageKeys: vi.fn().mockResolvedValue([]),
readPage: vi.fn().mockResolvedValue(null),
syncFromCommit: vi.fn().mockResolvedValue(undefined),
};
@ -1573,6 +1574,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }],
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']);
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }),
);

View file

@ -17,6 +17,18 @@ type RuntimeWithConnectionDeps = {
};
};
type RuntimeWithSlValidationDeps = {
deps: {
slValidator: {
validateSingleSource(
deps: unknown,
connectionId: string,
sourceName: string,
): Promise<{ errors: string[]; warnings: string[] }>;
};
};
};
function testAgentRunner(): AgentRunnerPort {
return { runLoop: vi.fn().mockResolvedValue({ stopReason: 'natural' as const }) };
}
@ -144,6 +156,77 @@ describe('createLocalBundleIngestRuntime', () => {
]);
});
it('validates manifest-backed scan sources during local ingest gates', async () => {
await project.fileStore.writeFile(
'semantic-layer/warehouse/_schema/public.yaml',
[
'tables:',
' payments:',
' table: public.payments',
' columns:',
' - name: payment_id',
' type: string',
' - name: amount',
' type: number',
'',
].join('\n'),
'ktx',
'ktx@example.com',
'Add warehouse manifest',
);
const agentRunner = testAgentRunner();
const runtime = createLocalBundleIngestRuntime({
project,
adapters: [new FakeSourceAdapter()],
agentRunner,
});
const deps = (runtime.runner as unknown as RuntimeWithSlValidationDeps).deps;
await expect(deps.slValidator.validateSingleSource(deps, 'warehouse', 'payments')).resolves.toEqual({
errors: [],
warnings: expect.any(Array),
});
});
it('does not mask malformed direct overlays with manifest-backed fallback validation', async () => {
await project.fileStore.writeFile(
'semantic-layer/warehouse/_schema/public.yaml',
[
'tables:',
' payments:',
' table: public.payments',
' columns:',
' - name: payment_id',
' type: string',
'',
].join('\n'),
'ktx',
'ktx@example.com',
'Add warehouse manifest',
);
await project.fileStore.writeFile(
'semantic-layer/warehouse/payments.yaml',
['name: payments', 'columns:', ' - [', ''].join('\n'),
'ktx',
'ktx@example.com',
'Add malformed overlay',
);
const agentRunner = testAgentRunner();
const runtime = createLocalBundleIngestRuntime({
project,
adapters: [new FakeSourceAdapter()],
agentRunner,
});
const deps = (runtime.runner as unknown as RuntimeWithSlValidationDeps).deps;
await expect(deps.slValidator.validateSingleSource(deps, 'warehouse', 'payments')).resolves.toEqual({
errors: [expect.stringContaining('invalid YAML')],
warnings: [],
});
});
it('passes project connection config to local ingest query executors', async () => {
const agentRunner = testAgentRunner();
const queryExecutor = {

View file

@ -24,7 +24,6 @@ import {
type KtxConnectionInfo,
type KtxQueryResult,
SemanticLayerService,
type SemanticLayerSource,
type SlConnectionCatalogPort,
SlDiscoverTool,
SlEditSourceTool,
@ -248,22 +247,63 @@ class LocalSlPythonPort implements SlPythonPort {
}
class LocalShapeOnlySlValidator implements SlValidatorPort<SlValidationDeps> {
private validateParsedSource(sourceName: string, parsed: Record<string, unknown>) {
const isOverlay = parsed.table == null && parsed.sql == null;
const result = (isOverlay ? sourceOverlaySchema : sourceDefinitionSchema).safeParse(parsed);
return result.success
? { errors: [], warnings: [LOCAL_SHAPE_WARNING] }
: {
errors: result.error.issues.map(
(issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`,
),
warnings: [],
};
}
private async validateComposedSource(
deps: SlValidationDeps,
connectionId: string,
sourceName: string,
readError: unknown,
) {
try {
const { sources, loadErrors } = await deps.semanticLayerService.loadAllSources(connectionId);
const source = sources.find((candidate) => candidate.name === sourceName);
if (source) {
return this.validateParsedSource(sourceName, source as unknown as Record<string, unknown>);
}
const detail =
loadErrors.length > 0
? loadErrors.join('; ')
: readError instanceof Error
? readError.message
: String(readError);
return { errors: [`${sourceName}: ${detail}`], warnings: [] };
} catch (fallbackError) {
return {
errors: [`${sourceName}: ${fallbackError instanceof Error ? fallbackError.message : String(fallbackError)}`],
warnings: [],
};
}
}
async validateSingleSource(deps: SlValidationDeps, connectionId: string, sourceName: string) {
let content: string;
try {
const file = await deps.semanticLayerService.readSourceFile(connectionId, sourceName);
const parsed = YAML.parse(file.content) as SemanticLayerSource;
const isOverlay = parsed.table == null && parsed.sql == null;
const result = (isOverlay ? sourceOverlaySchema : sourceDefinitionSchema).safeParse(parsed);
return result.success
? { errors: [], warnings: [LOCAL_SHAPE_WARNING] }
: {
errors: result.error.issues.map(
(issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`,
),
warnings: [],
};
content = file.content;
} catch (error) {
return { errors: [`${sourceName}: ${error instanceof Error ? error.message : String(error)}`], warnings: [] };
return this.validateComposedSource(deps, connectionId, sourceName, error);
}
try {
const parsed = YAML.parse(content) as unknown as Record<string, unknown>;
return this.validateParsedSource(sourceName, parsed);
} catch (error) {
return {
errors: [`${sourceName}: invalid YAML — ${error instanceof Error ? error.message : String(error)}`],
warnings: [],
};
}
}
}

View file

@ -23,6 +23,8 @@ describe('wiki body refs', () => {
'Also `warehouse/mart_account_segments.segment` and `table:analytics.mart_account_segments`.',
'Ignore prose mart_account_segments.total_contract_arr_cents.',
'Ignore `single_token`.',
'Ignore wildcard pattern `mart_nrr_quarterly.*_arr_cents`.',
'Ignore condition `users.is_internal = false`.',
'```sql',
'select `mart_account_segments.total_contract_arr_cents`',
'```',
@ -50,6 +52,52 @@ describe('wiki body refs', () => {
]);
});
it('does not treat wildcard inline-code patterns as exact semantic-layer entity references', async () => {
const invalid = await findInvalidWikiBodyRefs({
pageKey: 'revenue-metrics-encoding',
body: 'Cents columns include `mart_nrr_quarterly.*_arr_cents` and `mart_retention_movement_breakout.*_arr_cents`.',
visibleConnectionIds: ['warehouse'],
loadSources: async () => [
{ name: 'mart_nrr_quarterly', grain: [], columns: [], joins: [], measures: [], table: 'analytics.mart_nrr_quarterly' },
{
name: 'mart_retention_movement_breakout',
grain: [],
columns: [],
joins: [],
measures: [],
table: 'analytics.mart_retention_movement_breakout',
},
],
tableExists: async () => true,
});
expect(invalid).toEqual([]);
});
it('does not treat inline-code SQL predicates as exact semantic-layer entity references', async () => {
const invalid = await findInvalidWikiBodyRefs({
pageKey: 'account-reporting-exclusions',
body: 'Exclude internal users with `users.is_internal = false` and test users with `users.is_test = false`.',
visibleConnectionIds: ['warehouse'],
loadSources: async () => [
{
name: 'users',
grain: [],
columns: [
{ name: 'is_internal', type: 'boolean' },
{ name: 'is_test', type: 'boolean' },
],
joins: [],
measures: [],
table: 'analytics.users',
},
],
tableExists: async () => true,
});
expect(invalid).toEqual([]);
});
it('validates source, dimension, segment, measure, and table references', async () => {
const invalid = await findInvalidWikiBodyRefs({
pageKey: 'account-segments',

View file

@ -38,6 +38,10 @@ function parseConnectionScoped(value: string): { connectionId: string | null; bo
return { connectionId: value.slice(0, slash), body: value.slice(slash + 1) };
}
function isIdentifierToken(value: string): boolean {
return /^[A-Za-z_][A-Za-z0-9_]*$/.test(value);
}
export function parseWikiBodyRefs(body: string): WikiBodyRef[] {
const refs: WikiBodyRef[] = [];
for (const line of visibleLinesOutsideFences(body)) {
@ -62,7 +66,7 @@ export function parseWikiBodyRefs(body: string): WikiBodyRef[] {
continue;
}
const parts = scoped.body.split('.');
if (parts.length === 2 && parts[0] && parts[1]) {
if (parts.length === 2 && isIdentifierToken(parts[0] ?? '') && isIdentifierToken(parts[1] ?? '')) {
refs.push({
kind: 'sl_entity',
connectionId: scoped.connectionId,