From 7cc9f0e70a645d68d77dfd79f18a354dadd11e06 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 18 May 2026 00:11:11 +0200 Subject: [PATCH] fix(ingest): validate scan sources and wiki refs --- ...26-05-17-isolated-diff-ingestion-design.md | 141 ++++++++++++++---- .../src/ingest/ingest-bundle.runner.test.ts | 2 + .../src/ingest/local-bundle-runtime.test.ts | 83 +++++++++++ .../src/ingest/local-bundle-runtime.ts | 66 ++++++-- .../context/src/ingest/wiki-body-refs.test.ts | 48 ++++++ packages/context/src/ingest/wiki-body-refs.ts | 6 +- 6 files changed, 300 insertions(+), 46 deletions(-) diff --git a/docs/superpowers/specs/2026-05-17-isolated-diff-ingestion-design.md b/docs/superpowers/specs/2026-05-17-isolated-diff-ingestion-design.md index f09d3aee..b2428ae5 100644 --- a/docs/superpowers/specs/2026-05-17-isolated-diff-ingestion-design.md +++ b/docs/superpowers/specs/2026-05-17-isolated-diff-ingestion-design.md @@ -40,6 +40,8 @@ The design has these goals: - Run all agent-authored durable writes in isolated per-work-unit worktrees. - Treat each work unit's git diff as its proposal artifact. - Integrate accepted diffs through a shared artifact-aware merge path. +- Resolve expected cross-work-unit overlap with bounded agent repair before + failing the run. - Run final global semantic gates before any changes reach the main project worktree. - Keep connector variance minimal and source-shaped, not pipeline-shaped. @@ -257,16 +259,72 @@ Integration has three conflict classes: - Semantic conflict: the patch applies textually but creates an invalid or inconsistent artifact. -Textual conflicts are resolved before semantic gates run only when deterministic -artifact-aware merge helpers can prove a valid result. Version one has no -interactive, CLI, or agent-driven resolver. +Textual conflicts are resolved before semantic gates run when a bounded +resolver agent can produce a valid result. Overlapping work-unit writes are +normal, especially for Metabase cards that target the same semantic-layer marts +from different collections. The runner must treat overlap as an integration +case, not as a reason to fail immediately. -In version one, routing to a resolver means fail-fast resolver behavior. The -runner stops the run with a structured conflict error, marks the ingest failed, -preserves the integration worktree for inspection, retains work-unit -diffs/transcripts and existing report metadata, and does not squash anything -back to the project main worktree. Interactive prompts, waiting for human input, -retry loops, and agent-driven conflict repair are future work. +Version one is agent-first. If `git apply --3way --index` leaves conflicts, +the runner starts a resolver agent in the integration worktree. The resolver +receives only the failed patch, already-applied patches, conflicted files, +relevant work-unit transcripts, raw evidence paths, and the final-gate rules. +The resolver must preserve all non-conflicting accepted content, resolve +duplicate or competing artifact entries from evidence, and edit only files +touched by the failed patch or already-applied overlapping patches. + +The runner then reruns artifact gates for the changed files and continues with +the remaining patches if validation passes. Resolver attempts are capped to +avoid an unbounded repair loop. A run fails only after the bounded resolver +attempts cannot produce a valid integration tree. + +Deterministic semantic merge is a later optimization, not a version-one +requirement. After measuring resolver latency, cost, and failure modes, KTX can +add merge helpers for common semantic-layer YAML cases, such as additive +`measures`, `segments`, `columns`, `joins`, and `descriptions` updates keyed by +their stable logical identifiers. Those helpers can replace agent calls for +mechanical merges once the measured v1 behavior justifies the added complexity. + +The integration worktree is preserved on failure with conflict markers or +resolver edits, work-unit patches, transcripts, trace events, and the failure +report. The runner never squashes a failed or partially repaired integration +tree back to the project main worktree. + +### Gate repair stage + +The gate repair stage handles cases where patches apply cleanly but the +combined tree fails final semantic or wiki gates. This is distinct from textual +conflict resolution: the tree is textually valid, but the artifacts violate KTX +contracts. + +After each patch integration and after reconciliation, the runner runs final +artifact gates for the affected scope. If gates fail, the runner classifies the +errors before deciding whether to repair or fail. + +Repairable gate errors include: + +- stale wiki body references to renamed semantic-layer entities; +- invalid `sl_refs` entries that point to entities instead of sources; +- inline prose that accidentally uses explicit SL reference syntax; +- duplicate measures, segments, or joins with equivalent definitions; +- missing or stale wiki references created by accepted patches; and +- join or source references that can be corrected from the composed manifest + and work-unit evidence. + +High-risk gate errors fail without automatic repair unless a later +implementation adds a stronger evidence contract: + +- two work units define the same measure with different business meaning; +- a required warehouse table or column does not exist; +- a SQL source fails execution and no obvious localized rewrite exists; or +- the repair would require choosing between conflicting facts without evidence. + +For repairable errors, the runner starts a gate repair agent with the exact +gate errors, changed files, relevant work-unit transcripts, raw evidence paths, +and final-gate rules. The agent may edit only the files involved in the gate +failure. The runner reruns gates after each repair attempt and caps attempts to +one or two passes per integration stage. If the tree still fails, the run stops +with the final gate report and preserved integration worktree. ### Reconciliation in the new flow @@ -297,8 +355,9 @@ write only to target connections authorized by the adapter for the ingest run, but it is not subject to any single work unit's `slDisallowed` scope. The final global gates validate the combined tree after reconciliation. If reconciliation introduces an invalid wiki or semantic-layer reference, touches an unauthorized -target, or records an unresolvable artifact conflict, the run stops under -version-one resolver behavior before squash. +target, or records an unresolvable artifact conflict, the runner sends +repairable failures through the gate repair stage and stops before squash only +when bounded repair cannot produce a valid tree. ## Artifact-aware integration @@ -331,11 +390,12 @@ outputs emitted through `emit_artifact_resolution` as `ArtifactResolutionRecord` stage-index records. They are in-memory stage records, not worktree files, and they feed the provenance gate. -Artifact-aware integration can start with validation-only behavior. It does not -need to auto-merge every semantic conflict in version one. If two diffs contest -the same source YAML or wiki page and the merge cannot prove correctness, the -runner must stop under version-one resolver behavior rather than silently -accepting stale references. +Artifact-aware integration starts with validation plus bounded agent repair. +It does not need semantic-layer YAML merge helpers in version one. If two diffs +contest the same source YAML or wiki page and bounded agent repair cannot prove +correctness, the runner must stop rather than silently accepting stale +references. Deterministic semantic merge helpers can be added after v1 metrics +show which conflicts are frequent, mechanical, and worth optimizing. ## Global semantic gates @@ -367,20 +427,23 @@ The wiki body gate needs a narrow grammar so ordinary prose does not become a semantic-layer reference. In version one, an explicit body reference is one of these Markdown forms outside fenced code blocks: -- an inline code token in the form `source.entity`, where `source` matches a - visible semantic-layer source and `entity` must match one of that source's - measures, dimensions, or segments; -- an inline code token in the form `connectionId/source.entity`, which validates - against that specific target connection; +- an inline code token in the form `source.entity`, where both parts are plain + identifier tokens, `source` matches a visible semantic-layer source, and + `entity` must match one of that source's measures, dimensions, or segments; +- an inline code token in the form `connectionId/source.entity`, where + `source.entity` follows the same plain-identifier rule and validates against + that specific target connection; - an inline code token in the form `source:source_name`, which validates a source-level semantic-layer reference; or - an inline code token in the form `table:qualified_table_name`, which validates a raw warehouse table reference against the visible raw table/catalog sources. -The parser ignores unformatted prose, fenced SQL examples, and unprefixed -single-token inline code. Two-part inline code that does not name a visible -semantic-layer source is not treated as an SL entity reference; use the -`table:` prefix for raw warehouse table references. +The parser ignores unformatted prose, fenced SQL examples, wildcard patterns +such as `mart_nrr_quarterly.*_arr_cents`, inline SQL predicates such as +`users.is_internal = false`, and unprefixed single-token inline code. Two-part +inline code that does not name a visible semantic-layer source is not treated +as an SL entity reference; use the `table:` prefix for raw warehouse table +references. The `total_contract_arr_cents` incident is the regression case for this gate: the integrated tree must fail if a wiki page references @@ -501,8 +564,17 @@ starting from the same base: Additional tests cover: - two work units editing different wiki pages without conflict; -- two work units editing the same semantic-layer source with a textual conflict, - where the run stops under version-one resolver behavior before squash; +- two work units editing the same semantic-layer overlay with additive changes, + where the resolver agent preserves both changes and gates the repaired file; +- two work units editing the same semantic-layer overlay with incompatible + definitions, where the resolver agent receives the conflict context and the + run fails only after bounded repair attempts cannot prove a result; +- a textual conflict in a wiki page where the resolver agent preserves + non-conflicting accepted content and gates the repaired page before squash; +- a cleanly merged tree that fails final gates, where the gate repair agent + fixes a stale wiki reference and the run continues; +- an unrepairable final-gate failure, such as a missing warehouse column, where + the runner stops with a preserved integration worktree and report; - a hybrid adapter case where deterministic projector outputs are visible in a child worktree before work-unit wiki synthesis, and the final global gate catches any stale reference to a non-existent projected semantic-layer entity; @@ -522,13 +594,18 @@ configuration knob. 1. Add the per-work-unit worktree executor behind that internal runner setting. 2. Add diff collection and deterministic integration in the existing runner. -3. Add final global wiki and semantic-layer reference gates, including the wiki +3. Add bounded resolver-agent handling for textual conflicts. +4. Add final global wiki and semantic-layer reference gates, including the wiki body reference parser defined above. -4. Migrate Metabase to the new execution path first. -5. Migrate Notion, LookML, Looker, dbt, and MetricFlow. -6. Promote the new path to the default after the Metabase regression test and +5. Add bounded gate-repair-agent handling for repairable final-gate failures. +6. Instrument resolver latency, attempts, repaired files, and failure classes. +7. Migrate Metabase to the new execution path first. +8. Migrate Notion, LookML, Looker, dbt, and MetricFlow. +9. Add deterministic semantic merge helpers only after v1 metrics show which + agent repairs are frequent and mechanical enough to justify optimization. +10. Promote the new path to the default after the Metabase regression test and at least one non-Metabase connector pass. -7. Remove the old shared-worktree work-unit execution path. +11. Remove the old shared-worktree work-unit execution path. The rollout is complete when every connector that permits agent-authored durable writes uses isolated diffs and all integrations pass the same final global diff --git a/packages/context/src/ingest/ingest-bundle.runner.test.ts b/packages/context/src/ingest/ingest-bundle.runner.test.ts index ccd9ede7..59e85058 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.test.ts @@ -168,6 +168,7 @@ const makeDeps = () => { }; const wikiService = { forWorktree: vi.fn(), + listPageKeys: vi.fn().mockResolvedValue([]), readPage: vi.fn().mockResolvedValue(null), syncFromCommit: vi.fn().mockResolvedValue(undefined), }; @@ -1573,6 +1574,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { workUnits: [{ unitKey: 'u1', rawFiles: ['semantic_models.yml'], peerFileIndex: [], dependencyPaths: [] }], parseArtifacts: { semanticModels: [{ name: 'orders' }] }, }); + deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['warehouse-2']); deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) => Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }), ); diff --git a/packages/context/src/ingest/local-bundle-runtime.test.ts b/packages/context/src/ingest/local-bundle-runtime.test.ts index 71e08817..8287df71 100644 --- a/packages/context/src/ingest/local-bundle-runtime.test.ts +++ b/packages/context/src/ingest/local-bundle-runtime.test.ts @@ -17,6 +17,18 @@ type RuntimeWithConnectionDeps = { }; }; +type RuntimeWithSlValidationDeps = { + deps: { + slValidator: { + validateSingleSource( + deps: unknown, + connectionId: string, + sourceName: string, + ): Promise<{ errors: string[]; warnings: string[] }>; + }; + }; +}; + function testAgentRunner(): AgentRunnerPort { return { runLoop: vi.fn().mockResolvedValue({ stopReason: 'natural' as const }) }; } @@ -144,6 +156,77 @@ describe('createLocalBundleIngestRuntime', () => { ]); }); + it('validates manifest-backed scan sources during local ingest gates', async () => { + await project.fileStore.writeFile( + 'semantic-layer/warehouse/_schema/public.yaml', + [ + 'tables:', + ' payments:', + ' table: public.payments', + ' columns:', + ' - name: payment_id', + ' type: string', + ' - name: amount', + ' type: number', + '', + ].join('\n'), + 'ktx', + 'ktx@example.com', + 'Add warehouse manifest', + ); + const agentRunner = testAgentRunner(); + + const runtime = createLocalBundleIngestRuntime({ + project, + adapters: [new FakeSourceAdapter()], + agentRunner, + }); + const deps = (runtime.runner as unknown as RuntimeWithSlValidationDeps).deps; + + await expect(deps.slValidator.validateSingleSource(deps, 'warehouse', 'payments')).resolves.toEqual({ + errors: [], + warnings: expect.any(Array), + }); + }); + + it('does not mask malformed direct overlays with manifest-backed fallback validation', async () => { + await project.fileStore.writeFile( + 'semantic-layer/warehouse/_schema/public.yaml', + [ + 'tables:', + ' payments:', + ' table: public.payments', + ' columns:', + ' - name: payment_id', + ' type: string', + '', + ].join('\n'), + 'ktx', + 'ktx@example.com', + 'Add warehouse manifest', + ); + await project.fileStore.writeFile( + 'semantic-layer/warehouse/payments.yaml', + ['name: payments', 'columns:', ' - [', ''].join('\n'), + 'ktx', + 'ktx@example.com', + 'Add malformed overlay', + ); + const agentRunner = testAgentRunner(); + + const runtime = createLocalBundleIngestRuntime({ + project, + adapters: [new FakeSourceAdapter()], + agentRunner, + }); + const deps = (runtime.runner as unknown as RuntimeWithSlValidationDeps).deps; + + await expect(deps.slValidator.validateSingleSource(deps, 'warehouse', 'payments')).resolves.toEqual({ + errors: [expect.stringContaining('invalid YAML')], + warnings: [], + }); + }); + it('passes project connection config to local ingest query executors', async () => { const agentRunner = testAgentRunner(); const queryExecutor = { diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index 5600fa90..74e3bd52 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -24,7 +24,6 @@ import { type KtxConnectionInfo, type KtxQueryResult, SemanticLayerService, - type SemanticLayerSource, type SlConnectionCatalogPort, SlDiscoverTool, SlEditSourceTool, @@ -248,22 +247,63 @@ class LocalSlPythonPort implements SlPythonPort { } class LocalShapeOnlySlValidator implements SlValidatorPort { + private validateParsedSource(sourceName: string, parsed: Record) { + const isOverlay = parsed.table == null && parsed.sql == null; + const result = (isOverlay ? sourceOverlaySchema : sourceDefinitionSchema).safeParse(parsed); + return result.success + ? { errors: [], warnings: [LOCAL_SHAPE_WARNING] } + : { + errors: result.error.issues.map( + (issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`, + ), + warnings: [], + }; + } + + private async validateComposedSource( + deps: SlValidationDeps, + connectionId: string, + sourceName: string, + readError: unknown, + ) { + try { + const { sources, loadErrors } = await deps.semanticLayerService.loadAllSources(connectionId); + const source = sources.find((candidate) => candidate.name === sourceName); + if (source) { + return this.validateParsedSource(sourceName, source as unknown as Record); + } + const detail = + loadErrors.length > 0 + ? loadErrors.join('; ') + : readError instanceof Error + ? readError.message + : String(readError); + return { errors: [`${sourceName}: ${detail}`], warnings: [] }; + } catch (fallbackError) { + return { + errors: [`${sourceName}: ${fallbackError instanceof Error ? fallbackError.message : String(fallbackError)}`], + warnings: [], + }; + } + } + async validateSingleSource(deps: SlValidationDeps, connectionId: string, sourceName: string) { + let content: string; try { const file = await deps.semanticLayerService.readSourceFile(connectionId, sourceName); - const parsed = YAML.parse(file.content) as SemanticLayerSource; - const isOverlay = parsed.table == null && parsed.sql == null; - const result = (isOverlay ? sourceOverlaySchema : sourceDefinitionSchema).safeParse(parsed); - return result.success - ? { errors: [], warnings: [LOCAL_SHAPE_WARNING] } - : { - errors: result.error.issues.map( - (issue) => `${sourceName}: ${issue.path.join('.') || 'source'} ${issue.message}`, - ), - warnings: [], - }; + content = file.content; } catch (error) { - return { errors: [`${sourceName}: ${error instanceof Error ? error.message : String(error)}`], warnings: [] }; + return this.validateComposedSource(deps, connectionId, sourceName, error); + } + + try { + const parsed = YAML.parse(content) as unknown as Record; + return this.validateParsedSource(sourceName, parsed); + } catch (error) { + return { + errors: [`${sourceName}: invalid YAML — ${error instanceof Error ? error.message : String(error)}`], + warnings: [], + }; } } } diff --git a/packages/context/src/ingest/wiki-body-refs.test.ts b/packages/context/src/ingest/wiki-body-refs.test.ts index 8a209581..2af8935f 100644 --- a/packages/context/src/ingest/wiki-body-refs.test.ts +++ b/packages/context/src/ingest/wiki-body-refs.test.ts @@ -23,6 +23,8 @@ describe('wiki body refs', () => { 'Also `warehouse/mart_account_segments.segment` and `table:analytics.mart_account_segments`.', 'Ignore prose mart_account_segments.total_contract_arr_cents.', 'Ignore `single_token`.', + 'Ignore wildcard pattern `mart_nrr_quarterly.*_arr_cents`.', + 'Ignore condition `users.is_internal = false`.', '```sql', 'select `mart_account_segments.total_contract_arr_cents`', '```', @@ -50,6 +52,52 @@ describe('wiki body refs', () => { ]); }); + it('does not treat wildcard inline-code patterns as exact semantic-layer entity references', async () => { + const invalid = await findInvalidWikiBodyRefs({ + pageKey: 'revenue-metrics-encoding', + body: 'Cents columns include `mart_nrr_quarterly.*_arr_cents` and `mart_retention_movement_breakout.*_arr_cents`.', + visibleConnectionIds: ['warehouse'], + loadSources: async () => [ + { name: 'mart_nrr_quarterly', grain: [], columns: [], joins: [], measures: [], table: 'analytics.mart_nrr_quarterly' }, + { + name: 'mart_retention_movement_breakout', + grain: [], + columns: [], + joins: [], + measures: [], + table: 'analytics.mart_retention_movement_breakout', + }, + ], + tableExists: async () => true, + }); + + expect(invalid).toEqual([]); + }); + + it('does not treat inline-code SQL predicates as exact semantic-layer entity references', async () => { + const invalid = await findInvalidWikiBodyRefs({ + pageKey: 'account-reporting-exclusions', + body: 'Exclude internal users with `users.is_internal = false` and test users with `users.is_test = false`.', + visibleConnectionIds: ['warehouse'], + loadSources: async () => [ + { + name: 'users', + grain: [], + columns: [ + { name: 'is_internal', type: 'boolean' }, + { name: 'is_test', type: 'boolean' }, + ], + joins: [], + measures: [], + table: 'analytics.users', + }, + ], + tableExists: async () => true, + }); + + expect(invalid).toEqual([]); + }); + it('validates source, dimension, segment, measure, and table references', async () => { const invalid = await findInvalidWikiBodyRefs({ pageKey: 'account-segments', diff --git a/packages/context/src/ingest/wiki-body-refs.ts b/packages/context/src/ingest/wiki-body-refs.ts index cb43e172..25f25eb3 100644 --- a/packages/context/src/ingest/wiki-body-refs.ts +++ b/packages/context/src/ingest/wiki-body-refs.ts @@ -38,6 +38,10 @@ function parseConnectionScoped(value: string): { connectionId: string | null; bo return { connectionId: value.slice(0, slash), body: value.slice(slash + 1) }; } +function isIdentifierToken(value: string): boolean { + return /^[A-Za-z_][A-Za-z0-9_]*$/.test(value); +} + export function parseWikiBodyRefs(body: string): WikiBodyRef[] { const refs: WikiBodyRef[] = []; for (const line of visibleLinesOutsideFences(body)) { @@ -62,7 +66,7 @@ export function parseWikiBodyRefs(body: string): WikiBodyRef[] { continue; } const parts = scoped.body.split('.'); - if (parts.length === 2 && parts[0] && parts[1]) { + if (parts.length === 2 && isIdentifierToken(parts[0] ?? '') && isIdentifierToken(parts[1] ?? '')) { refs.push({ kind: 'sl_entity', connectionId: scoped.connectionId,