From 6536c5da26a1d6c11a734dd52fd4452f05b1d323 Mon Sep 17 00:00:00 2001 From: Luca Martial Date: Mon, 11 May 2026 20:39:07 -0700 Subject: [PATCH] Harden semantic layer source validation --- .../context/skills/metabase_ingest/SKILL.md | 4 +- .../adapters/historic-sql/projection.ts | 4 +- .../context/src/mcp/local-project-ports.ts | 2 +- .../src/scan/local-enrichment-artifacts.ts | 2 +- packages/context/src/sl/local-sl.test.ts | 2 +- packages/context/src/sl/local-sl.ts | 2 +- packages/context/src/sl/schemas.ts | 23 +++- .../src/sl/semantic-layer.service.test.ts | 28 +++++ .../context/src/sl/semantic-layer.service.ts | 2 +- .../src/sl/tools/sl-edit-source.tool.ts | 2 +- .../src/sl/tools/sl-write-source.tool.ts | 2 +- python/ktx-sl/semantic_layer/engine.py | 38 +++++- .../semantic_layer/sql_table_extractor.py | 27 +++++ python/ktx-sl/tests/test_validator.py | 114 +++++++++++++++++- 14 files changed, 235 insertions(+), 17 deletions(-) diff --git a/packages/context/skills/metabase_ingest/SKILL.md b/packages/context/skills/metabase_ingest/SKILL.md index 713def8c..8b1c9338 100644 --- a/packages/context/skills/metabase_ingest/SKILL.md +++ b/packages/context/skills/metabase_ingest/SKILL.md @@ -48,7 +48,7 @@ Use `resultMetadata` to: For each card: 1. Analyze `resolvedSql` + `resultMetadata`: identify base tables, aggregations, joins, filters, column types. -2. Check `sl_discover` and `sl_read_source` for existing sources that overlap. +2. **REQUIRED before any write**: call `sl_discover` for every candidate target source name. The response tells you whether the name is manifest-backed (`Type: table` or `Type: sql`). For manifest-backed names you MUST use the overlay shape (`name:` + `measures:`/`segments:`/`description:` only — no `sql:`, `table:`, `grain:`, or `columns:`); the tool will reject a standalone write and you'll have wasted the call. If `sl_discover` returns nothing for the name, you can write a standalone source. Also call `sl_read_source` on existing sources you intend to extend so you don't duplicate measures. 3. Decide: - Simple aggregation on a table that already has a source → `sl_edit_source` to add a measure. - Join between tables that should be linked in the SL graph → `sl_edit_source` to add a join. @@ -70,6 +70,8 @@ Overlay shape: `name:` plus any of `measures:`, `segments:`, `description:`, `jo **Join discovery:** When your card's SQL references warehouse tables (e.g. in `FROM` or `JOIN` clauses), call `sl_discover({ query: '' })` before writing. The matching manifest entry's `name` is the value you use in `joins: [- to: ]` only when the card output exposes a local key that matches the target source grain (for example `account_id = mart_account_segments.account_id`). Do not declare a KTX join just because the card SQL joins that table internally. If the output only exposes display fields such as `account_name`, keep the SQL source self-contained or project the key before adding the join. Use `many_to_one` for FK-to-dimension joins, `one_to_many` for the reverse. +**Hard rule on join columns (prevents broken joins):** For every join you declare, the local column on the left of `on:` MUST be (a) present in your source's projected output and (b) a key/ID column, never a display value. If the natural FK isn't in your SELECT, add it to SELECT before declaring the join. Joining `account_name = mart_account_segments.account_id` is always wrong — names are not identifiers and the equality produces zero matches. The validator rejects this with a "display value to identifier" error; the tool will refuse to save it. Add `account_id` to your SELECT and join on `account_id = mart_account_segments.account_id`, or omit the join entirely. + ## priorProvenance If the WU prompt includes a `priorProvenance` section for a card, it tells you what happened on prior ingest syncs. Treat it as advisory: diff --git a/packages/context/src/ingest/adapters/historic-sql/projection.ts b/packages/context/src/ingest/adapters/historic-sql/projection.ts index ca24a67f..366b98f3 100644 --- a/packages/context/src/ingest/adapters/historic-sql/projection.ts +++ b/packages/context/src/ingest/adapters/historic-sql/projection.ts @@ -74,7 +74,7 @@ async function readJson(path: string): Promise { async function writeYamlAtomic(path: string, value: unknown): Promise { await mkdir(dirname(path), { recursive: true }); const tmp = `${path}.tmp`; - await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0 }), 'utf-8'); + await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0, version: '1.1' }), 'utf-8'); await rename(tmp, path); } @@ -270,7 +270,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp } } } - const after = YAML.stringify(shard, { indent: 2, lineWidth: 0 }); + const after = YAML.stringify(shard, { indent: 2, lineWidth: 0, version: '1.1' }); if (after !== before) { await writeYamlAtomic(path, shard); } diff --git a/packages/context/src/mcp/local-project-ports.ts b/packages/context/src/mcp/local-project-ports.ts index 41d1f916..0c325453 100644 --- a/packages/context/src/mcp/local-project-ports.ts +++ b/packages/context/src/mcp/local-project-ports.ts @@ -512,7 +512,7 @@ export function createLocalProjectMcpContextPorts( } const yaml = - input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0 }); + input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0, version: '1.1' }); parseYamlRecord(yaml); await project.fileStore.writeFile( path, diff --git a/packages/context/src/scan/local-enrichment-artifacts.ts b/packages/context/src/scan/local-enrichment-artifacts.ts index 101d062e..c25049c2 100644 --- a/packages/context/src/scan/local-enrichment-artifacts.ts +++ b/packages/context/src/scan/local-enrichment-artifacts.ts @@ -291,7 +291,7 @@ export async function writeLocalScanManifestShards( const path = `${schemaDir(input.connectionId)}/${shardKey}.yaml`; await input.project.fileStore.writeFile( path, - YAML.stringify(shard, { indent: 2, lineWidth: 0 }), + YAML.stringify(shard, { indent: 2, lineWidth: 0, version: '1.1' }), LOCAL_AUTHOR, LOCAL_AUTHOR_EMAIL, `scan(${LIVE_DATABASE_ADAPTER}): write manifest shard ${shardKey} syncId=${input.syncId}`, diff --git a/packages/context/src/sl/local-sl.test.ts b/packages/context/src/sl/local-sl.test.ts index 3cdfaefe..99d20c35 100644 --- a/packages/context/src/sl/local-sl.test.ts +++ b/packages/context/src/sl/local-sl.test.ts @@ -345,7 +345,7 @@ describe('local semantic-layer helpers', () => { await expect(validateLocalSlSource(invalidYaml)).resolves.toMatchObject({ valid: false, - errors: [expect.stringContaining('grain')], + errors: expect.arrayContaining([expect.stringContaining('grain')]), }); await expect( diff --git a/packages/context/src/sl/local-sl.ts b/packages/context/src/sl/local-sl.ts index 14559ffe..fe201d3f 100644 --- a/packages/context/src/sl/local-sl.ts +++ b/packages/context/src/sl/local-sl.ts @@ -157,7 +157,7 @@ function summarizeSource(args: { connectionId: string; path: string; raw: string } function sourceToYaml(source: SemanticLayerSource): string { - return YAML.stringify(source, { indent: 2, lineWidth: 0 }); + return YAML.stringify(source, { indent: 2, lineWidth: 0, version: '1.1' }); } function summarizeSemanticSource(args: { diff --git a/packages/context/src/sl/schemas.ts b/packages/context/src/sl/schemas.ts index 706a4add..a42ecc87 100644 --- a/packages/context/src/sl/schemas.ts +++ b/packages/context/src/sl/schemas.ts @@ -63,6 +63,14 @@ const sourceFreshnessSchema = z.object({ dbt: freshnessDbtSchema.optional(), }); +// Identifiers (grain entries, column names) must be unqualified output-column +// names. A dot would mean the agent emitted a table-qualified reference like +// `activity.account_id` — those break SQL generation and grain semantics. +const unqualifiedNameSchema = z + .string() + .min(1) + .regex(/^[^.]+$/, "must be unqualified (no '.') — use the output column name"); + const joinDeclarationSchema = z.object({ to: z.string().min(1), on: z.string().min(1), @@ -71,7 +79,7 @@ const joinDeclarationSchema = z.object({ }); const sourceColumnSchema = z.object({ - name: z.string().min(1), + name: unqualifiedNameSchema, // type/description optional on standalone sources: compose-time enrichment fills them // from the manifest entry named in `inherits_columns_from`. If the agent does not set // `inherits_columns_from`, or the column is not in the manifest, type must be present @@ -90,7 +98,7 @@ const sourceColumnSchema = z.object({ /** Overlay column: type requires expr (structural types are inherited from manifest). */ const overlayColumnSchema = z .object({ - name: z.string().min(1), + name: unqualifiedNameSchema, type: z.enum(columnTypeValues).optional(), role: z.enum(columnRoleValues).optional(), visibility: z.enum(columnVisibilityValues).optional(), @@ -118,8 +126,13 @@ export const sourceDefinitionSchema = z // agent write `columns: [{name: FOO}]` instead of redeclaring known fields. // Lookup is fuzzy: bare key, fully-qualified table path, or any suffix all match. inherits_columns_from: z.string().optional(), - grain: z.array(z.string()).min(1), - columns: z.array(sourceColumnSchema).default([]), + grain: z.array(unqualifiedNameSchema).min(1), + // Standalone sources MUST declare columns. An empty columns array means + // there's nothing to query or join against and breaks grain validation + // (the grain must reference declared columns). Inheritance from a manifest + // via `inherits_columns_from` only fills in type/description on declared + // columns — the column names themselves must be listed here. + columns: z.array(sourceColumnSchema).min(1), joins: z.array(joinDeclarationSchema).default([]), measures: z.array(slMeasureDefinitionSchema).default([]), segments: z.array(segmentDefinitionSchema).optional(), @@ -139,7 +152,7 @@ export const sourceOverlaySchema = z name: z.string().min(1), description: z.string().optional(), descriptions: z.record(z.string(), z.string()).optional(), - grain: z.array(z.string()).optional(), + grain: z.array(unqualifiedNameSchema).optional(), columns: z.array(overlayColumnSchema).optional(), joins: z.array(joinDeclarationSchema).optional(), measures: z.array(slMeasureDefinitionSchema).optional(), diff --git a/packages/context/src/sl/semantic-layer.service.test.ts b/packages/context/src/sl/semantic-layer.service.test.ts index 3adde085..78f5accf 100644 --- a/packages/context/src/sl/semantic-layer.service.test.ts +++ b/packages/context/src/sl/semantic-layer.service.test.ts @@ -363,6 +363,34 @@ describe('sourceDefinitionSchema', () => { externalOwner: 'analytics', }); }); + + it("rejects qualified grain names (e.g. 'activity.account_id')", () => { + const result = sourceDefinitionSchema.safeParse({ + name: 'activity', + table: 'public.activity', + grain: ['activity.account_id'], + columns: [{ name: 'account_id', type: 'number' }], + joins: [], + measures: [], + }); + expect(result.success).toBe(false); + if (result.success) return; + expect(result.error.issues.some((i) => i.path.join('.').startsWith('grain'))).toBe(true); + }); + + it('rejects qualified column names', () => { + const result = sourceDefinitionSchema.safeParse({ + name: 'activity', + table: 'public.activity', + grain: ['account_id'], + columns: [{ name: 'activity.account_id', type: 'number' }], + joins: [], + measures: [], + }); + expect(result.success).toBe(false); + if (result.success) return; + expect(result.error.issues.some((i) => i.path.join('.').startsWith('columns'))).toBe(true); + }); }); describe('projectManifestEntry', () => { diff --git a/packages/context/src/sl/semantic-layer.service.ts b/packages/context/src/sl/semantic-layer.service.ts index ffae0b12..33a44d11 100644 --- a/packages/context/src/sl/semantic-layer.service.ts +++ b/packages/context/src/sl/semantic-layer.service.ts @@ -133,7 +133,7 @@ export class SemanticLayerService { const path = this.sourcePath(connectionId, source.name); const normalizedSource = normalizeSemanticLayerDescriptions(source); - const content = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0 }); + const content = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0, version: '1.1' }); const message = commitMessage ?? `Update semantic layer source: ${source.name}`; const result = await this.configService.writeFile(path, content, author, authorEmail, message, { skipLock: options?.skipLock, diff --git a/packages/context/src/sl/tools/sl-edit-source.tool.ts b/packages/context/src/sl/tools/sl-edit-source.tool.ts index 17a85990..d27bb113 100644 --- a/packages/context/src/sl/tools/sl-edit-source.tool.ts +++ b/packages/context/src/sl/tools/sl-edit-source.tool.ts @@ -151,7 +151,7 @@ If no source exists yet, use sl_write_source instead — this tool will reject t source = normalizeSemanticLayerDescriptions(source, { fillMissing: !!context.session?.ingest }); // Re-serialize and write - const updatedYaml = YAML.stringify(source, { indent: 2, lineWidth: 0 }); + const updatedYaml = YAML.stringify(source, { indent: 2, lineWidth: 0, version: '1.1' }); const { errors: validationErrors, warnings: validationWarnings } = await semanticLayerService.validateWithProposedSource(connectionId, source); diff --git a/packages/context/src/sl/tools/sl-write-source.tool.ts b/packages/context/src/sl/tools/sl-write-source.tool.ts index 638b130e..de59f6bb 100644 --- a/packages/context/src/sl/tools/sl-write-source.tool.ts +++ b/packages/context/src/sl/tools/sl-write-source.tool.ts @@ -164,7 +164,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co ? `${isOverlay ? 'Update overlay' : 'Rewrite source'}: ${sourceName}` : `${isOverlay ? 'Create overlay' : 'Create source'}: ${sourceName}`; - const yamlContent = YAML.stringify(normalizedSource); + const yamlContent = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0, version: '1.1' }); const orphanError = await this.rejectOrphanOverlay(semanticLayerService, connectionId, sourceName, yamlContent); if (orphanError) { diff --git a/python/ktx-sl/semantic_layer/engine.py b/python/ktx-sl/semantic_layer/engine.py index 0e17419c..b45d89ac 100644 --- a/python/ktx-sl/semantic_layer/engine.py +++ b/python/ktx-sl/semantic_layer/engine.py @@ -12,6 +12,7 @@ from semantic_layer.models import ( ) from semantic_layer.planner import QueryPlanner from semantic_layer.sql_table_extractor import ( + extract_projected_columns, extract_table_refs, ref_matches_source_table, ) @@ -83,15 +84,48 @@ class SemanticEngine: report.errors.extend(self._collect_orphan_join_target_errors()) def _check_invalid_grain(self, report: ValidationReport) -> None: + dialect = getattr(self.generator, "dialect", "postgres") for source in self.sources.values(): + qualified_grain: set[str] = set() + for grain_col in source.grain: + if "." in grain_col: + qualified_grain.add(grain_col) + report.errors.append( + f"Source '{source.name}' grain entry '{grain_col}' is a " + f"qualified name. Grain must use unqualified output column " + f"names (e.g. 'account_id', not 'activity.account_id')." + ) + + for col in source.columns: + if "." in col.name: + report.errors.append( + f"Source '{source.name}' column name '{col.name}' contains " + f"'.'. Column names must be unqualified." + ) + column_names = {c.name for c in source.columns} for grain_col in source.grain: + if grain_col in qualified_grain: + continue if grain_col not in column_names: report.errors.append( f"Source '{source.name}' has grain column '{grain_col}' " f"that is not in its columns list" ) + if source.is_sql_source and source.sql: + projected = extract_projected_columns(source.sql, dialect=dialect) + if projected is not None: + for grain_col in source.grain: + if grain_col in qualified_grain: + continue + if grain_col not in projected: + report.errors.append( + f"Source '{source.name}' grain column '{grain_col}' " + f"is not in the SQL SELECT projection. Add it to the " + f"SELECT list (or remove it from grain)." + ) + def _check_join_columns(self, report: ValidationReport) -> None: for source in self.sources.values(): source_columns = {c.name for c in source.columns} @@ -108,7 +142,9 @@ class SemanticEngine: ) continue - local_cols = [col.strip() for col in local_raw.split(",") if col.strip()] + local_cols = [ + col.strip() for col in local_raw.split(",") if col.strip() + ] target_cols = [ col.strip() for col in target_raw.split(",") if col.strip() ] diff --git a/python/ktx-sl/semantic_layer/sql_table_extractor.py b/python/ktx-sl/semantic_layer/sql_table_extractor.py index 008c53ad..9b9efa6e 100644 --- a/python/ktx-sl/semantic_layer/sql_table_extractor.py +++ b/python/ktx-sl/semantic_layer/sql_table_extractor.py @@ -70,3 +70,30 @@ def ref_matches_source_table(ref: tuple[str, ...], source_table: str) -> bool: if len(ref) > len(src): return False return src[-len(ref) :] == ref + + +def extract_projected_columns(sql: str, dialect: str = "postgres") -> set[str] | None: + """Return the set of output column names projected by `sql`. + + Returns None if the projection cannot be statically determined — when + SELECT * (or qualified `t.*`) is present, or when parsing fails. Callers + should treat None as "unknown projection" and skip projection-dependent + checks rather than reporting a false-positive error. + """ + try: + tree = sqlglot.parse_one(sql, read=dialect) + except Exception as e: + logger.debug("extract_projected_columns: parse failed (%s); skipping", e) + return None + + if not isinstance(tree, exp.Select): + return None + + for projection in tree.expressions: + # Bare `*` or `t.*` — projection list is opaque. + if isinstance(projection, exp.Star): + return None + if isinstance(projection, exp.Column) and isinstance(projection.this, exp.Star): + return None + + return {name for name in tree.named_selects if name} diff --git a/python/ktx-sl/tests/test_validator.py b/python/ktx-sl/tests/test_validator.py index 723e9636..909d5a6c 100644 --- a/python/ktx-sl/tests/test_validator.py +++ b/python/ktx-sl/tests/test_validator.py @@ -119,6 +119,116 @@ class TestInvalidGrain: assert not report.valid assert any("bad" in e and "nonexistent_col" in e for e in report.errors) + def test_qualified_grain_name_is_rejected(self): + bad = _src( + "activity", + columns=["account_id"], + grain=["activity.account_id"], + ) + engine = SemanticEngine.from_sources({"activity": bad}) + + report = engine.validate() + + assert not report.valid + assert any( + "activity" in e and "activity.account_id" in e and "qualified" in e + for e in report.errors + ) + + def test_qualified_column_name_is_rejected(self): + bad = SourceDefinition( + name="activity", + table="public.activity", + grain=["account_id"], + columns=[ + SourceColumn(name="account_id", type="number"), + SourceColumn(name="activity.user_id", type="number"), + ], + ) + engine = SemanticEngine.from_sources({"activity": bad}) + + report = engine.validate() + + assert not report.valid + assert any( + "activity" in e and "activity.user_id" in e and "unqualified" in e + for e in report.errors + ) + + def test_sql_source_grain_missing_from_projection(self): + bad = SourceDefinition( + name="large_contract_requesters", + sql=( + "select account.account_name, requester.email as requester_email " + "from orbit_raw.actions activity " + "join orbit_raw.accounts account " + " on account.account_id = activity.account_id " + "join orbit_raw.users requester " + " on requester.user_id = activity.user_id" + ), + grain=["account_id", "user_id"], + columns=[ + SourceColumn(name="account_id", type="number"), + SourceColumn(name="user_id", type="number"), + SourceColumn(name="account_name", type="string"), + SourceColumn(name="requester_email", type="string"), + ], + ) + engine = SemanticEngine.from_sources({"large_contract_requesters": bad}) + + report = engine.validate() + + assert not report.valid + assert any( + "large_contract_requesters" in e + and "account_id" in e + and "SELECT projection" in e + for e in report.errors + ) + + def test_sql_source_grain_in_projection_passes(self): + good = SourceDefinition( + name="contract_requesters", + sql=( + "select activity.account_id, activity.user_id, " + "account.account_name, requester.email as requester_email " + "from orbit_raw.actions activity " + "join orbit_raw.accounts account " + " on account.account_id = activity.account_id " + "join orbit_raw.users requester " + " on requester.user_id = activity.user_id" + ), + grain=["account_id", "user_id"], + columns=[ + SourceColumn(name="account_id", type="number"), + SourceColumn(name="user_id", type="number"), + SourceColumn(name="account_name", type="string"), + SourceColumn(name="requester_email", type="string"), + ], + ) + engine = SemanticEngine.from_sources({"contract_requesters": good}) + + report = engine.validate() + + # No grain-related errors. (Other validators may emit unrelated + # warnings — we just assert the grain check is clean.) + assert not any("grain" in e or "SELECT projection" in e for e in report.errors) + + def test_sql_source_with_select_star_skips_projection_check(self): + # SELECT * means we can't statically know projected columns; + # the projection check must skip rather than false-fail. + src = SourceDefinition( + name="opaque", + sql="select * from public.events", + grain=["event_id"], + columns=[SourceColumn(name="event_id", type="number")], + ) + engine = SemanticEngine.from_sources({"opaque": src}) + + report = engine.validate() + + assert not any("SELECT projection" in e for e in report.errors) + class TestJoinValidation: def test_join_local_column_must_exist(self): @@ -246,7 +356,9 @@ class TestJoinValidation: report = engine.validate(recently_touched={"large_contract_requesters"}) assert not report.valid - assert any("mart_account_segments" in e and "joins[]" in e for e in report.errors) + assert any( + "mart_account_segments" in e and "joins[]" in e for e in report.errors + ) class TestDisconnectedComponents: