diff --git a/packages/context/skills/metabase_ingest/SKILL.md b/packages/context/skills/metabase_ingest/SKILL.md
index 713def8c..8b1c9338 100644
--- a/packages/context/skills/metabase_ingest/SKILL.md
+++ b/packages/context/skills/metabase_ingest/SKILL.md
@@ -48,7 +48,7 @@ Use `resultMetadata` to:
For each card:
1. Analyze `resolvedSql` + `resultMetadata`: identify base tables, aggregations, joins, filters, column types.
-2. Check `sl_discover` and `sl_read_source` for existing sources that overlap.
+2. **REQUIRED before any write**: call `sl_discover` for every candidate target source name. The response tells you whether the name is manifest-backed (`Type: table` or `Type: sql`). For manifest-backed names you MUST use the overlay shape (`name:` + `measures:`/`segments:`/`description:` only — no `sql:`, `table:`, `grain:`, or `columns:`); the tool will reject a standalone write and you'll have wasted the call. If `sl_discover` returns nothing for the name, you can write a standalone source. Also call `sl_read_source` on existing sources you intend to extend so you don't duplicate measures.
3. Decide:
- Simple aggregation on a table that already has a source → `sl_edit_source` to add a measure.
- Join between tables that should be linked in the SL graph → `sl_edit_source` to add a join.
@@ -70,6 +70,8 @@ Overlay shape: `name:` plus any of `measures:`, `segments:`, `description:`, `jo
**Join discovery:** When your card's SQL references warehouse tables (e.g. in `FROM` or `JOIN` clauses), call `sl_discover({ query: '
' })` before writing. The matching manifest entry's `name` is the value you use in `joins: [- to: ]` only when the card output exposes a local key that matches the target source grain (for example `account_id = mart_account_segments.account_id`). Do not declare a KTX join just because the card SQL joins that table internally. If the output only exposes display fields such as `account_name`, keep the SQL source self-contained or project the key before adding the join. Use `many_to_one` for FK-to-dimension joins, `one_to_many` for the reverse.
+**Hard rule on join columns (prevents broken joins):** For every join you declare, the local column on the left of `on:` MUST be (a) present in your source's projected output and (b) a key/ID column, never a display value. If the natural FK isn't in your SELECT, add it to SELECT before declaring the join. Joining `account_name = mart_account_segments.account_id` is always wrong — names are not identifiers and the equality produces zero matches. The validator rejects this with a "display value to identifier" error; the tool will refuse to save it. Add `account_id` to your SELECT and join on `account_id = mart_account_segments.account_id`, or omit the join entirely.
+
## priorProvenance
If the WU prompt includes a `priorProvenance` section for a card, it tells you what happened on prior ingest syncs. Treat it as advisory:
diff --git a/packages/context/src/ingest/adapters/historic-sql/projection.ts b/packages/context/src/ingest/adapters/historic-sql/projection.ts
index ca24a67f..366b98f3 100644
--- a/packages/context/src/ingest/adapters/historic-sql/projection.ts
+++ b/packages/context/src/ingest/adapters/historic-sql/projection.ts
@@ -74,7 +74,7 @@ async function readJson(path: string): Promise {
async function writeYamlAtomic(path: string, value: unknown): Promise {
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp`;
- await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0 }), 'utf-8');
+ await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0, version: '1.1' }), 'utf-8');
await rename(tmp, path);
}
@@ -270,7 +270,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
}
}
}
- const after = YAML.stringify(shard, { indent: 2, lineWidth: 0 });
+ const after = YAML.stringify(shard, { indent: 2, lineWidth: 0, version: '1.1' });
if (after !== before) {
await writeYamlAtomic(path, shard);
}
diff --git a/packages/context/src/mcp/local-project-ports.ts b/packages/context/src/mcp/local-project-ports.ts
index 41d1f916..0c325453 100644
--- a/packages/context/src/mcp/local-project-ports.ts
+++ b/packages/context/src/mcp/local-project-ports.ts
@@ -512,7 +512,7 @@ export function createLocalProjectMcpContextPorts(
}
const yaml =
- input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0 });
+ input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0, version: '1.1' });
parseYamlRecord(yaml);
await project.fileStore.writeFile(
path,
diff --git a/packages/context/src/scan/local-enrichment-artifacts.ts b/packages/context/src/scan/local-enrichment-artifacts.ts
index 101d062e..c25049c2 100644
--- a/packages/context/src/scan/local-enrichment-artifacts.ts
+++ b/packages/context/src/scan/local-enrichment-artifacts.ts
@@ -291,7 +291,7 @@ export async function writeLocalScanManifestShards(
const path = `${schemaDir(input.connectionId)}/${shardKey}.yaml`;
await input.project.fileStore.writeFile(
path,
- YAML.stringify(shard, { indent: 2, lineWidth: 0 }),
+ YAML.stringify(shard, { indent: 2, lineWidth: 0, version: '1.1' }),
LOCAL_AUTHOR,
LOCAL_AUTHOR_EMAIL,
`scan(${LIVE_DATABASE_ADAPTER}): write manifest shard ${shardKey} syncId=${input.syncId}`,
diff --git a/packages/context/src/sl/local-sl.test.ts b/packages/context/src/sl/local-sl.test.ts
index 3cdfaefe..99d20c35 100644
--- a/packages/context/src/sl/local-sl.test.ts
+++ b/packages/context/src/sl/local-sl.test.ts
@@ -345,7 +345,7 @@ describe('local semantic-layer helpers', () => {
await expect(validateLocalSlSource(invalidYaml)).resolves.toMatchObject({
valid: false,
- errors: [expect.stringContaining('grain')],
+ errors: expect.arrayContaining([expect.stringContaining('grain')]),
});
await expect(
diff --git a/packages/context/src/sl/local-sl.ts b/packages/context/src/sl/local-sl.ts
index 14559ffe..fe201d3f 100644
--- a/packages/context/src/sl/local-sl.ts
+++ b/packages/context/src/sl/local-sl.ts
@@ -157,7 +157,7 @@ function summarizeSource(args: { connectionId: string; path: string; raw: string
}
function sourceToYaml(source: SemanticLayerSource): string {
- return YAML.stringify(source, { indent: 2, lineWidth: 0 });
+ return YAML.stringify(source, { indent: 2, lineWidth: 0, version: '1.1' });
}
function summarizeSemanticSource(args: {
diff --git a/packages/context/src/sl/schemas.ts b/packages/context/src/sl/schemas.ts
index 706a4add..a42ecc87 100644
--- a/packages/context/src/sl/schemas.ts
+++ b/packages/context/src/sl/schemas.ts
@@ -63,6 +63,14 @@ const sourceFreshnessSchema = z.object({
dbt: freshnessDbtSchema.optional(),
});
+// Identifiers (grain entries, column names) must be unqualified output-column
+// names. A dot would mean the agent emitted a table-qualified reference like
+// `activity.account_id` — those break SQL generation and grain semantics.
+const unqualifiedNameSchema = z
+ .string()
+ .min(1)
+ .regex(/^[^.]+$/, "must be unqualified (no '.') — use the output column name");
+
const joinDeclarationSchema = z.object({
to: z.string().min(1),
on: z.string().min(1),
@@ -71,7 +79,7 @@ const joinDeclarationSchema = z.object({
});
const sourceColumnSchema = z.object({
- name: z.string().min(1),
+ name: unqualifiedNameSchema,
// type/description optional on standalone sources: compose-time enrichment fills them
// from the manifest entry named in `inherits_columns_from`. If the agent does not set
// `inherits_columns_from`, or the column is not in the manifest, type must be present
@@ -90,7 +98,7 @@ const sourceColumnSchema = z.object({
/** Overlay column: type requires expr (structural types are inherited from manifest). */
const overlayColumnSchema = z
.object({
- name: z.string().min(1),
+ name: unqualifiedNameSchema,
type: z.enum(columnTypeValues).optional(),
role: z.enum(columnRoleValues).optional(),
visibility: z.enum(columnVisibilityValues).optional(),
@@ -118,8 +126,13 @@ export const sourceDefinitionSchema = z
// agent write `columns: [{name: FOO}]` instead of redeclaring known fields.
// Lookup is fuzzy: bare key, fully-qualified table path, or any suffix all match.
inherits_columns_from: z.string().optional(),
- grain: z.array(z.string()).min(1),
- columns: z.array(sourceColumnSchema).default([]),
+ grain: z.array(unqualifiedNameSchema).min(1),
+ // Standalone sources MUST declare columns. An empty columns array means
+ // there's nothing to query or join against and breaks grain validation
+ // (the grain must reference declared columns). Inheritance from a manifest
+ // via `inherits_columns_from` only fills in type/description on declared
+ // columns — the column names themselves must be listed here.
+ columns: z.array(sourceColumnSchema).min(1),
joins: z.array(joinDeclarationSchema).default([]),
measures: z.array(slMeasureDefinitionSchema).default([]),
segments: z.array(segmentDefinitionSchema).optional(),
@@ -139,7 +152,7 @@ export const sourceOverlaySchema = z
name: z.string().min(1),
description: z.string().optional(),
descriptions: z.record(z.string(), z.string()).optional(),
- grain: z.array(z.string()).optional(),
+ grain: z.array(unqualifiedNameSchema).optional(),
columns: z.array(overlayColumnSchema).optional(),
joins: z.array(joinDeclarationSchema).optional(),
measures: z.array(slMeasureDefinitionSchema).optional(),
diff --git a/packages/context/src/sl/semantic-layer.service.test.ts b/packages/context/src/sl/semantic-layer.service.test.ts
index 3adde085..78f5accf 100644
--- a/packages/context/src/sl/semantic-layer.service.test.ts
+++ b/packages/context/src/sl/semantic-layer.service.test.ts
@@ -363,6 +363,34 @@ describe('sourceDefinitionSchema', () => {
externalOwner: 'analytics',
});
});
+
+ it("rejects qualified grain names (e.g. 'activity.account_id')", () => {
+ const result = sourceDefinitionSchema.safeParse({
+ name: 'activity',
+ table: 'public.activity',
+ grain: ['activity.account_id'],
+ columns: [{ name: 'account_id', type: 'number' }],
+ joins: [],
+ measures: [],
+ });
+ expect(result.success).toBe(false);
+ if (result.success) return;
+ expect(result.error.issues.some((i) => i.path.join('.').startsWith('grain'))).toBe(true);
+ });
+
+ it('rejects qualified column names', () => {
+ const result = sourceDefinitionSchema.safeParse({
+ name: 'activity',
+ table: 'public.activity',
+ grain: ['account_id'],
+ columns: [{ name: 'activity.account_id', type: 'number' }],
+ joins: [],
+ measures: [],
+ });
+ expect(result.success).toBe(false);
+ if (result.success) return;
+ expect(result.error.issues.some((i) => i.path.join('.').startsWith('columns'))).toBe(true);
+ });
});
describe('projectManifestEntry', () => {
diff --git a/packages/context/src/sl/semantic-layer.service.ts b/packages/context/src/sl/semantic-layer.service.ts
index ffae0b12..33a44d11 100644
--- a/packages/context/src/sl/semantic-layer.service.ts
+++ b/packages/context/src/sl/semantic-layer.service.ts
@@ -133,7 +133,7 @@ export class SemanticLayerService {
const path = this.sourcePath(connectionId, source.name);
const normalizedSource = normalizeSemanticLayerDescriptions(source);
- const content = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0 });
+ const content = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0, version: '1.1' });
const message = commitMessage ?? `Update semantic layer source: ${source.name}`;
const result = await this.configService.writeFile(path, content, author, authorEmail, message, {
skipLock: options?.skipLock,
diff --git a/packages/context/src/sl/tools/sl-edit-source.tool.ts b/packages/context/src/sl/tools/sl-edit-source.tool.ts
index 17a85990..d27bb113 100644
--- a/packages/context/src/sl/tools/sl-edit-source.tool.ts
+++ b/packages/context/src/sl/tools/sl-edit-source.tool.ts
@@ -151,7 +151,7 @@ If no source exists yet, use sl_write_source instead — this tool will reject t
source = normalizeSemanticLayerDescriptions(source, { fillMissing: !!context.session?.ingest });
// Re-serialize and write
- const updatedYaml = YAML.stringify(source, { indent: 2, lineWidth: 0 });
+ const updatedYaml = YAML.stringify(source, { indent: 2, lineWidth: 0, version: '1.1' });
const { errors: validationErrors, warnings: validationWarnings } =
await semanticLayerService.validateWithProposedSource(connectionId, source);
diff --git a/packages/context/src/sl/tools/sl-write-source.tool.ts b/packages/context/src/sl/tools/sl-write-source.tool.ts
index 638b130e..de59f6bb 100644
--- a/packages/context/src/sl/tools/sl-write-source.tool.ts
+++ b/packages/context/src/sl/tools/sl-write-source.tool.ts
@@ -164,7 +164,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
? `${isOverlay ? 'Update overlay' : 'Rewrite source'}: ${sourceName}`
: `${isOverlay ? 'Create overlay' : 'Create source'}: ${sourceName}`;
- const yamlContent = YAML.stringify(normalizedSource);
+ const yamlContent = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0, version: '1.1' });
const orphanError = await this.rejectOrphanOverlay(semanticLayerService, connectionId, sourceName, yamlContent);
if (orphanError) {
diff --git a/python/ktx-sl/semantic_layer/engine.py b/python/ktx-sl/semantic_layer/engine.py
index 0e17419c..b45d89ac 100644
--- a/python/ktx-sl/semantic_layer/engine.py
+++ b/python/ktx-sl/semantic_layer/engine.py
@@ -12,6 +12,7 @@ from semantic_layer.models import (
)
from semantic_layer.planner import QueryPlanner
from semantic_layer.sql_table_extractor import (
+ extract_projected_columns,
extract_table_refs,
ref_matches_source_table,
)
@@ -83,15 +84,48 @@ class SemanticEngine:
report.errors.extend(self._collect_orphan_join_target_errors())
def _check_invalid_grain(self, report: ValidationReport) -> None:
+ dialect = getattr(self.generator, "dialect", "postgres")
for source in self.sources.values():
+ qualified_grain: set[str] = set()
+ for grain_col in source.grain:
+ if "." in grain_col:
+ qualified_grain.add(grain_col)
+ report.errors.append(
+ f"Source '{source.name}' grain entry '{grain_col}' is a "
+ f"qualified name. Grain must use unqualified output column "
+ f"names (e.g. 'account_id', not 'activity.account_id')."
+ )
+
+ for col in source.columns:
+ if "." in col.name:
+ report.errors.append(
+ f"Source '{source.name}' column name '{col.name}' contains "
+ f"'.'. Column names must be unqualified."
+ )
+
column_names = {c.name for c in source.columns}
for grain_col in source.grain:
+ if grain_col in qualified_grain:
+ continue
if grain_col not in column_names:
report.errors.append(
f"Source '{source.name}' has grain column '{grain_col}' "
f"that is not in its columns list"
)
+ if source.is_sql_source and source.sql:
+ projected = extract_projected_columns(source.sql, dialect=dialect)
+ if projected is not None:
+ for grain_col in source.grain:
+ if grain_col in qualified_grain:
+ continue
+ if grain_col not in projected:
+ report.errors.append(
+ f"Source '{source.name}' grain column '{grain_col}' "
+ f"is not in the SQL SELECT projection. Add it to the "
+ f"SELECT list (or remove it from grain)."
+ )
+
def _check_join_columns(self, report: ValidationReport) -> None:
for source in self.sources.values():
source_columns = {c.name for c in source.columns}
@@ -108,7 +142,9 @@ class SemanticEngine:
)
continue
- local_cols = [col.strip() for col in local_raw.split(",") if col.strip()]
+ local_cols = [
+ col.strip() for col in local_raw.split(",") if col.strip()
+ ]
target_cols = [
col.strip() for col in target_raw.split(",") if col.strip()
]
diff --git a/python/ktx-sl/semantic_layer/sql_table_extractor.py b/python/ktx-sl/semantic_layer/sql_table_extractor.py
index 008c53ad..9b9efa6e 100644
--- a/python/ktx-sl/semantic_layer/sql_table_extractor.py
+++ b/python/ktx-sl/semantic_layer/sql_table_extractor.py
@@ -70,3 +70,30 @@ def ref_matches_source_table(ref: tuple[str, ...], source_table: str) -> bool:
if len(ref) > len(src):
return False
return src[-len(ref) :] == ref
+
+
+def extract_projected_columns(sql: str, dialect: str = "postgres") -> set[str] | None:
+ """Return the set of output column names projected by `sql`.
+
+ Returns None if the projection cannot be statically determined — when
+ SELECT * (or qualified `t.*`) is present, or when parsing fails. Callers
+ should treat None as "unknown projection" and skip projection-dependent
+ checks rather than reporting a false-positive error.
+ """
+ try:
+ tree = sqlglot.parse_one(sql, read=dialect)
+ except Exception as e:
+ logger.debug("extract_projected_columns: parse failed (%s); skipping", e)
+ return None
+
+ if not isinstance(tree, exp.Select):
+ return None
+
+ for projection in tree.expressions:
+ # Bare `*` or `t.*` — projection list is opaque.
+ if isinstance(projection, exp.Star):
+ return None
+ if isinstance(projection, exp.Column) and isinstance(projection.this, exp.Star):
+ return None
+
+ return {name for name in tree.named_selects if name}
diff --git a/python/ktx-sl/tests/test_validator.py b/python/ktx-sl/tests/test_validator.py
index 723e9636..909d5a6c 100644
--- a/python/ktx-sl/tests/test_validator.py
+++ b/python/ktx-sl/tests/test_validator.py
@@ -119,6 +119,116 @@ class TestInvalidGrain:
assert not report.valid
assert any("bad" in e and "nonexistent_col" in e for e in report.errors)
+ def test_qualified_grain_name_is_rejected(self):
+ bad = _src(
+ "activity",
+ columns=["account_id"],
+ grain=["activity.account_id"],
+ )
+ engine = SemanticEngine.from_sources({"activity": bad})
+
+ report = engine.validate()
+
+ assert not report.valid
+ assert any(
+ "activity" in e and "activity.account_id" in e and "qualified" in e
+ for e in report.errors
+ )
+
+ def test_qualified_column_name_is_rejected(self):
+ bad = SourceDefinition(
+ name="activity",
+ table="public.activity",
+ grain=["account_id"],
+ columns=[
+ SourceColumn(name="account_id", type="number"),
+ SourceColumn(name="activity.user_id", type="number"),
+ ],
+ )
+ engine = SemanticEngine.from_sources({"activity": bad})
+
+ report = engine.validate()
+
+ assert not report.valid
+ assert any(
+ "activity" in e and "activity.user_id" in e and "unqualified" in e
+ for e in report.errors
+ )
+
+ def test_sql_source_grain_missing_from_projection(self):
+ bad = SourceDefinition(
+ name="large_contract_requesters",
+ sql=(
+ "select account.account_name, requester.email as requester_email "
+ "from orbit_raw.actions activity "
+ "join orbit_raw.accounts account "
+ " on account.account_id = activity.account_id "
+ "join orbit_raw.users requester "
+ " on requester.user_id = activity.user_id"
+ ),
+ grain=["account_id", "user_id"],
+ columns=[
+ SourceColumn(name="account_id", type="number"),
+ SourceColumn(name="user_id", type="number"),
+ SourceColumn(name="account_name", type="string"),
+ SourceColumn(name="requester_email", type="string"),
+ ],
+ )
+ engine = SemanticEngine.from_sources({"large_contract_requesters": bad})
+
+ report = engine.validate()
+
+ assert not report.valid
+ assert any(
+ "large_contract_requesters" in e
+ and "account_id" in e
+ and "SELECT projection" in e
+ for e in report.errors
+ )
+
+ def test_sql_source_grain_in_projection_passes(self):
+ good = SourceDefinition(
+ name="contract_requesters",
+ sql=(
+ "select activity.account_id, activity.user_id, "
+ "account.account_name, requester.email as requester_email "
+ "from orbit_raw.actions activity "
+ "join orbit_raw.accounts account "
+ " on account.account_id = activity.account_id "
+ "join orbit_raw.users requester "
+ " on requester.user_id = activity.user_id"
+ ),
+ grain=["account_id", "user_id"],
+ columns=[
+ SourceColumn(name="account_id", type="number"),
+ SourceColumn(name="user_id", type="number"),
+ SourceColumn(name="account_name", type="string"),
+ SourceColumn(name="requester_email", type="string"),
+ ],
+ )
+ engine = SemanticEngine.from_sources({"contract_requesters": good})
+
+ report = engine.validate()
+
+ # No grain-related errors. (Other validators may emit unrelated
+ # warnings — we just assert the grain check is clean.)
+ assert not any("grain" in e or "SELECT projection" in e for e in report.errors)
+
+ def test_sql_source_with_select_star_skips_projection_check(self):
+ # SELECT * means we can't statically know projected columns;
+ # the projection check must skip rather than false-fail.
+ src = SourceDefinition(
+ name="opaque",
+ sql="select * from public.events",
+ grain=["event_id"],
+ columns=[SourceColumn(name="event_id", type="number")],
+ )
+ engine = SemanticEngine.from_sources({"opaque": src})
+
+ report = engine.validate()
+
+ assert not any("SELECT projection" in e for e in report.errors)
+
class TestJoinValidation:
def test_join_local_column_must_exist(self):
@@ -246,7 +356,9 @@ class TestJoinValidation:
report = engine.validate(recently_touched={"large_contract_requesters"})
assert not report.valid
- assert any("mart_account_segments" in e and "joins[]" in e for e in report.errors)
+ assert any(
+ "mart_account_segments" in e and "joins[]" in e for e in report.errors
+ )
class TestDisconnectedComponents: