mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-13 08:15:14 +02:00
Harden semantic layer source validation
This commit is contained in:
parent
66109caa1d
commit
6536c5da26
14 changed files with 235 additions and 17 deletions
|
|
@ -48,7 +48,7 @@ Use `resultMetadata` to:
|
|||
|
||||
For each card:
|
||||
1. Analyze `resolvedSql` + `resultMetadata`: identify base tables, aggregations, joins, filters, column types.
|
||||
2. Check `sl_discover` and `sl_read_source` for existing sources that overlap.
|
||||
2. **REQUIRED before any write**: call `sl_discover` for every candidate target source name. The response tells you whether the name is manifest-backed (`Type: table` or `Type: sql`). For manifest-backed names you MUST use the overlay shape (`name:` + `measures:`/`segments:`/`description:` only — no `sql:`, `table:`, `grain:`, or `columns:`); the tool will reject a standalone write and you'll have wasted the call. If `sl_discover` returns nothing for the name, you can write a standalone source. Also call `sl_read_source` on existing sources you intend to extend so you don't duplicate measures.
|
||||
3. Decide:
|
||||
- Simple aggregation on a table that already has a source → `sl_edit_source` to add a measure.
|
||||
- Join between tables that should be linked in the SL graph → `sl_edit_source` to add a join.
|
||||
|
|
@ -70,6 +70,8 @@ Overlay shape: `name:` plus any of `measures:`, `segments:`, `description:`, `jo
|
|||
|
||||
**Join discovery:** When your card's SQL references warehouse tables (e.g. in `FROM` or `JOIN` clauses), call `sl_discover({ query: '<table>' })` before writing. The matching manifest entry's `name` is the value you use in `joins: [- to: <name>]` only when the card output exposes a local key that matches the target source grain (for example `account_id = mart_account_segments.account_id`). Do not declare a KTX join just because the card SQL joins that table internally. If the output only exposes display fields such as `account_name`, keep the SQL source self-contained or project the key before adding the join. Use `many_to_one` for FK-to-dimension joins, `one_to_many` for the reverse.
|
||||
|
||||
**Hard rule on join columns (prevents broken joins):** For every join you declare, the local column on the left of `on:` MUST be (a) present in your source's projected output and (b) a key/ID column, never a display value. If the natural FK isn't in your SELECT, add it to SELECT before declaring the join. Joining `account_name = mart_account_segments.account_id` is always wrong — names are not identifiers and the equality produces zero matches. The validator rejects this with a "display value to identifier" error; the tool will refuse to save it. Add `account_id` to your SELECT and join on `account_id = mart_account_segments.account_id`, or omit the join entirely.
|
||||
|
||||
## priorProvenance
|
||||
|
||||
If the WU prompt includes a `priorProvenance` section for a card, it tells you what happened on prior ingest syncs. Treat it as advisory:
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ async function readJson(path: string): Promise<unknown> {
|
|||
async function writeYamlAtomic(path: string, value: unknown): Promise<void> {
|
||||
await mkdir(dirname(path), { recursive: true });
|
||||
const tmp = `${path}.tmp`;
|
||||
await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0 }), 'utf-8');
|
||||
await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0, version: '1.1' }), 'utf-8');
|
||||
await rename(tmp, path);
|
||||
}
|
||||
|
||||
|
|
@ -270,7 +270,7 @@ export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInp
|
|||
}
|
||||
}
|
||||
}
|
||||
const after = YAML.stringify(shard, { indent: 2, lineWidth: 0 });
|
||||
const after = YAML.stringify(shard, { indent: 2, lineWidth: 0, version: '1.1' });
|
||||
if (after !== before) {
|
||||
await writeYamlAtomic(path, shard);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -512,7 +512,7 @@ export function createLocalProjectMcpContextPorts(
|
|||
}
|
||||
|
||||
const yaml =
|
||||
input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0 });
|
||||
input.yaml ?? YAML.stringify({ ...input.source, name: input.sourceName }, { indent: 2, lineWidth: 0, version: '1.1' });
|
||||
parseYamlRecord(yaml);
|
||||
await project.fileStore.writeFile(
|
||||
path,
|
||||
|
|
|
|||
|
|
@ -291,7 +291,7 @@ export async function writeLocalScanManifestShards(
|
|||
const path = `${schemaDir(input.connectionId)}/${shardKey}.yaml`;
|
||||
await input.project.fileStore.writeFile(
|
||||
path,
|
||||
YAML.stringify(shard, { indent: 2, lineWidth: 0 }),
|
||||
YAML.stringify(shard, { indent: 2, lineWidth: 0, version: '1.1' }),
|
||||
LOCAL_AUTHOR,
|
||||
LOCAL_AUTHOR_EMAIL,
|
||||
`scan(${LIVE_DATABASE_ADAPTER}): write manifest shard ${shardKey} syncId=${input.syncId}`,
|
||||
|
|
|
|||
|
|
@ -345,7 +345,7 @@ describe('local semantic-layer helpers', () => {
|
|||
|
||||
await expect(validateLocalSlSource(invalidYaml)).resolves.toMatchObject({
|
||||
valid: false,
|
||||
errors: [expect.stringContaining('grain')],
|
||||
errors: expect.arrayContaining([expect.stringContaining('grain')]),
|
||||
});
|
||||
|
||||
await expect(
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ function summarizeSource(args: { connectionId: string; path: string; raw: string
|
|||
}
|
||||
|
||||
function sourceToYaml(source: SemanticLayerSource): string {
|
||||
return YAML.stringify(source, { indent: 2, lineWidth: 0 });
|
||||
return YAML.stringify(source, { indent: 2, lineWidth: 0, version: '1.1' });
|
||||
}
|
||||
|
||||
function summarizeSemanticSource(args: {
|
||||
|
|
|
|||
|
|
@ -63,6 +63,14 @@ const sourceFreshnessSchema = z.object({
|
|||
dbt: freshnessDbtSchema.optional(),
|
||||
});
|
||||
|
||||
// Identifiers (grain entries, column names) must be unqualified output-column
|
||||
// names. A dot would mean the agent emitted a table-qualified reference like
|
||||
// `activity.account_id` — those break SQL generation and grain semantics.
|
||||
const unqualifiedNameSchema = z
|
||||
.string()
|
||||
.min(1)
|
||||
.regex(/^[^.]+$/, "must be unqualified (no '.') — use the output column name");
|
||||
|
||||
const joinDeclarationSchema = z.object({
|
||||
to: z.string().min(1),
|
||||
on: z.string().min(1),
|
||||
|
|
@ -71,7 +79,7 @@ const joinDeclarationSchema = z.object({
|
|||
});
|
||||
|
||||
const sourceColumnSchema = z.object({
|
||||
name: z.string().min(1),
|
||||
name: unqualifiedNameSchema,
|
||||
// type/description optional on standalone sources: compose-time enrichment fills them
|
||||
// from the manifest entry named in `inherits_columns_from`. If the agent does not set
|
||||
// `inherits_columns_from`, or the column is not in the manifest, type must be present
|
||||
|
|
@ -90,7 +98,7 @@ const sourceColumnSchema = z.object({
|
|||
/** Overlay column: type requires expr (structural types are inherited from manifest). */
|
||||
const overlayColumnSchema = z
|
||||
.object({
|
||||
name: z.string().min(1),
|
||||
name: unqualifiedNameSchema,
|
||||
type: z.enum(columnTypeValues).optional(),
|
||||
role: z.enum(columnRoleValues).optional(),
|
||||
visibility: z.enum(columnVisibilityValues).optional(),
|
||||
|
|
@ -118,8 +126,13 @@ export const sourceDefinitionSchema = z
|
|||
// agent write `columns: [{name: FOO}]` instead of redeclaring known fields.
|
||||
// Lookup is fuzzy: bare key, fully-qualified table path, or any suffix all match.
|
||||
inherits_columns_from: z.string().optional(),
|
||||
grain: z.array(z.string()).min(1),
|
||||
columns: z.array(sourceColumnSchema).default([]),
|
||||
grain: z.array(unqualifiedNameSchema).min(1),
|
||||
// Standalone sources MUST declare columns. An empty columns array means
|
||||
// there's nothing to query or join against and breaks grain validation
|
||||
// (the grain must reference declared columns). Inheritance from a manifest
|
||||
// via `inherits_columns_from` only fills in type/description on declared
|
||||
// columns — the column names themselves must be listed here.
|
||||
columns: z.array(sourceColumnSchema).min(1),
|
||||
joins: z.array(joinDeclarationSchema).default([]),
|
||||
measures: z.array(slMeasureDefinitionSchema).default([]),
|
||||
segments: z.array(segmentDefinitionSchema).optional(),
|
||||
|
|
@ -139,7 +152,7 @@ export const sourceOverlaySchema = z
|
|||
name: z.string().min(1),
|
||||
description: z.string().optional(),
|
||||
descriptions: z.record(z.string(), z.string()).optional(),
|
||||
grain: z.array(z.string()).optional(),
|
||||
grain: z.array(unqualifiedNameSchema).optional(),
|
||||
columns: z.array(overlayColumnSchema).optional(),
|
||||
joins: z.array(joinDeclarationSchema).optional(),
|
||||
measures: z.array(slMeasureDefinitionSchema).optional(),
|
||||
|
|
|
|||
|
|
@ -363,6 +363,34 @@ describe('sourceDefinitionSchema', () => {
|
|||
externalOwner: 'analytics',
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects qualified grain names (e.g. 'activity.account_id')", () => {
|
||||
const result = sourceDefinitionSchema.safeParse({
|
||||
name: 'activity',
|
||||
table: 'public.activity',
|
||||
grain: ['activity.account_id'],
|
||||
columns: [{ name: 'account_id', type: 'number' }],
|
||||
joins: [],
|
||||
measures: [],
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
if (result.success) return;
|
||||
expect(result.error.issues.some((i) => i.path.join('.').startsWith('grain'))).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects qualified column names', () => {
|
||||
const result = sourceDefinitionSchema.safeParse({
|
||||
name: 'activity',
|
||||
table: 'public.activity',
|
||||
grain: ['account_id'],
|
||||
columns: [{ name: 'activity.account_id', type: 'number' }],
|
||||
joins: [],
|
||||
measures: [],
|
||||
});
|
||||
expect(result.success).toBe(false);
|
||||
if (result.success) return;
|
||||
expect(result.error.issues.some((i) => i.path.join('.').startsWith('columns'))).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('projectManifestEntry', () => {
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ export class SemanticLayerService {
|
|||
|
||||
const path = this.sourcePath(connectionId, source.name);
|
||||
const normalizedSource = normalizeSemanticLayerDescriptions(source);
|
||||
const content = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0 });
|
||||
const content = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0, version: '1.1' });
|
||||
const message = commitMessage ?? `Update semantic layer source: ${source.name}`;
|
||||
const result = await this.configService.writeFile(path, content, author, authorEmail, message, {
|
||||
skipLock: options?.skipLock,
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ If no source exists yet, use sl_write_source instead — this tool will reject t
|
|||
source = normalizeSemanticLayerDescriptions(source, { fillMissing: !!context.session?.ingest });
|
||||
|
||||
// Re-serialize and write
|
||||
const updatedYaml = YAML.stringify(source, { indent: 2, lineWidth: 0 });
|
||||
const updatedYaml = YAML.stringify(source, { indent: 2, lineWidth: 0, version: '1.1' });
|
||||
|
||||
const { errors: validationErrors, warnings: validationWarnings } =
|
||||
await semanticLayerService.validateWithProposedSource(connectionId, source);
|
||||
|
|
|
|||
|
|
@ -164,7 +164,7 @@ Do NOT join back to a table that the SQL already aggregates from if the grain co
|
|||
? `${isOverlay ? 'Update overlay' : 'Rewrite source'}: ${sourceName}`
|
||||
: `${isOverlay ? 'Create overlay' : 'Create source'}: ${sourceName}`;
|
||||
|
||||
const yamlContent = YAML.stringify(normalizedSource);
|
||||
const yamlContent = YAML.stringify(normalizedSource, { indent: 2, lineWidth: 0, version: '1.1' });
|
||||
|
||||
const orphanError = await this.rejectOrphanOverlay(semanticLayerService, connectionId, sourceName, yamlContent);
|
||||
if (orphanError) {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from semantic_layer.models import (
|
|||
)
|
||||
from semantic_layer.planner import QueryPlanner
|
||||
from semantic_layer.sql_table_extractor import (
|
||||
extract_projected_columns,
|
||||
extract_table_refs,
|
||||
ref_matches_source_table,
|
||||
)
|
||||
|
|
@ -83,15 +84,48 @@ class SemanticEngine:
|
|||
report.errors.extend(self._collect_orphan_join_target_errors())
|
||||
|
||||
def _check_invalid_grain(self, report: ValidationReport) -> None:
|
||||
dialect = getattr(self.generator, "dialect", "postgres")
|
||||
for source in self.sources.values():
|
||||
qualified_grain: set[str] = set()
|
||||
for grain_col in source.grain:
|
||||
if "." in grain_col:
|
||||
qualified_grain.add(grain_col)
|
||||
report.errors.append(
|
||||
f"Source '{source.name}' grain entry '{grain_col}' is a "
|
||||
f"qualified name. Grain must use unqualified output column "
|
||||
f"names (e.g. 'account_id', not 'activity.account_id')."
|
||||
)
|
||||
|
||||
for col in source.columns:
|
||||
if "." in col.name:
|
||||
report.errors.append(
|
||||
f"Source '{source.name}' column name '{col.name}' contains "
|
||||
f"'.'. Column names must be unqualified."
|
||||
)
|
||||
|
||||
column_names = {c.name for c in source.columns}
|
||||
for grain_col in source.grain:
|
||||
if grain_col in qualified_grain:
|
||||
continue
|
||||
if grain_col not in column_names:
|
||||
report.errors.append(
|
||||
f"Source '{source.name}' has grain column '{grain_col}' "
|
||||
f"that is not in its columns list"
|
||||
)
|
||||
|
||||
if source.is_sql_source and source.sql:
|
||||
projected = extract_projected_columns(source.sql, dialect=dialect)
|
||||
if projected is not None:
|
||||
for grain_col in source.grain:
|
||||
if grain_col in qualified_grain:
|
||||
continue
|
||||
if grain_col not in projected:
|
||||
report.errors.append(
|
||||
f"Source '{source.name}' grain column '{grain_col}' "
|
||||
f"is not in the SQL SELECT projection. Add it to the "
|
||||
f"SELECT list (or remove it from grain)."
|
||||
)
|
||||
|
||||
def _check_join_columns(self, report: ValidationReport) -> None:
|
||||
for source in self.sources.values():
|
||||
source_columns = {c.name for c in source.columns}
|
||||
|
|
@ -108,7 +142,9 @@ class SemanticEngine:
|
|||
)
|
||||
continue
|
||||
|
||||
local_cols = [col.strip() for col in local_raw.split(",") if col.strip()]
|
||||
local_cols = [
|
||||
col.strip() for col in local_raw.split(",") if col.strip()
|
||||
]
|
||||
target_cols = [
|
||||
col.strip() for col in target_raw.split(",") if col.strip()
|
||||
]
|
||||
|
|
|
|||
|
|
@ -70,3 +70,30 @@ def ref_matches_source_table(ref: tuple[str, ...], source_table: str) -> bool:
|
|||
if len(ref) > len(src):
|
||||
return False
|
||||
return src[-len(ref) :] == ref
|
||||
|
||||
|
||||
def extract_projected_columns(sql: str, dialect: str = "postgres") -> set[str] | None:
|
||||
"""Return the set of output column names projected by `sql`.
|
||||
|
||||
Returns None if the projection cannot be statically determined — when
|
||||
SELECT * (or qualified `t.*`) is present, or when parsing fails. Callers
|
||||
should treat None as "unknown projection" and skip projection-dependent
|
||||
checks rather than reporting a false-positive error.
|
||||
"""
|
||||
try:
|
||||
tree = sqlglot.parse_one(sql, read=dialect)
|
||||
except Exception as e:
|
||||
logger.debug("extract_projected_columns: parse failed (%s); skipping", e)
|
||||
return None
|
||||
|
||||
if not isinstance(tree, exp.Select):
|
||||
return None
|
||||
|
||||
for projection in tree.expressions:
|
||||
# Bare `*` or `t.*` — projection list is opaque.
|
||||
if isinstance(projection, exp.Star):
|
||||
return None
|
||||
if isinstance(projection, exp.Column) and isinstance(projection.this, exp.Star):
|
||||
return None
|
||||
|
||||
return {name for name in tree.named_selects if name}
|
||||
|
|
|
|||
|
|
@ -119,6 +119,116 @@ class TestInvalidGrain:
|
|||
assert not report.valid
|
||||
assert any("bad" in e and "nonexistent_col" in e for e in report.errors)
|
||||
|
||||
def test_qualified_grain_name_is_rejected(self):
|
||||
bad = _src(
|
||||
"activity",
|
||||
columns=["account_id"],
|
||||
grain=["activity.account_id"],
|
||||
)
|
||||
engine = SemanticEngine.from_sources({"activity": bad})
|
||||
|
||||
report = engine.validate()
|
||||
|
||||
assert not report.valid
|
||||
assert any(
|
||||
"activity" in e and "activity.account_id" in e and "qualified" in e
|
||||
for e in report.errors
|
||||
)
|
||||
|
||||
def test_qualified_column_name_is_rejected(self):
|
||||
bad = SourceDefinition(
|
||||
name="activity",
|
||||
table="public.activity",
|
||||
grain=["account_id"],
|
||||
columns=[
|
||||
SourceColumn(name="account_id", type="number"),
|
||||
SourceColumn(name="activity.user_id", type="number"),
|
||||
],
|
||||
)
|
||||
engine = SemanticEngine.from_sources({"activity": bad})
|
||||
|
||||
report = engine.validate()
|
||||
|
||||
assert not report.valid
|
||||
assert any(
|
||||
"activity" in e and "activity.user_id" in e and "unqualified" in e
|
||||
for e in report.errors
|
||||
)
|
||||
|
||||
def test_sql_source_grain_missing_from_projection(self):
|
||||
bad = SourceDefinition(
|
||||
name="large_contract_requesters",
|
||||
sql=(
|
||||
"select account.account_name, requester.email as requester_email "
|
||||
"from orbit_raw.actions activity "
|
||||
"join orbit_raw.accounts account "
|
||||
" on account.account_id = activity.account_id "
|
||||
"join orbit_raw.users requester "
|
||||
" on requester.user_id = activity.user_id"
|
||||
),
|
||||
grain=["account_id", "user_id"],
|
||||
columns=[
|
||||
SourceColumn(name="account_id", type="number"),
|
||||
SourceColumn(name="user_id", type="number"),
|
||||
SourceColumn(name="account_name", type="string"),
|
||||
SourceColumn(name="requester_email", type="string"),
|
||||
],
|
||||
)
|
||||
engine = SemanticEngine.from_sources({"large_contract_requesters": bad})
|
||||
|
||||
report = engine.validate()
|
||||
|
||||
assert not report.valid
|
||||
assert any(
|
||||
"large_contract_requesters" in e
|
||||
and "account_id" in e
|
||||
and "SELECT projection" in e
|
||||
for e in report.errors
|
||||
)
|
||||
|
||||
def test_sql_source_grain_in_projection_passes(self):
|
||||
good = SourceDefinition(
|
||||
name="contract_requesters",
|
||||
sql=(
|
||||
"select activity.account_id, activity.user_id, "
|
||||
"account.account_name, requester.email as requester_email "
|
||||
"from orbit_raw.actions activity "
|
||||
"join orbit_raw.accounts account "
|
||||
" on account.account_id = activity.account_id "
|
||||
"join orbit_raw.users requester "
|
||||
" on requester.user_id = activity.user_id"
|
||||
),
|
||||
grain=["account_id", "user_id"],
|
||||
columns=[
|
||||
SourceColumn(name="account_id", type="number"),
|
||||
SourceColumn(name="user_id", type="number"),
|
||||
SourceColumn(name="account_name", type="string"),
|
||||
SourceColumn(name="requester_email", type="string"),
|
||||
],
|
||||
)
|
||||
engine = SemanticEngine.from_sources({"contract_requesters": good})
|
||||
|
||||
report = engine.validate()
|
||||
|
||||
# No grain-related errors. (Other validators may emit unrelated
|
||||
# warnings — we just assert the grain check is clean.)
|
||||
assert not any("grain" in e or "SELECT projection" in e for e in report.errors)
|
||||
|
||||
def test_sql_source_with_select_star_skips_projection_check(self):
|
||||
# SELECT * means we can't statically know projected columns;
|
||||
# the projection check must skip rather than false-fail.
|
||||
src = SourceDefinition(
|
||||
name="opaque",
|
||||
sql="select * from public.events",
|
||||
grain=["event_id"],
|
||||
columns=[SourceColumn(name="event_id", type="number")],
|
||||
)
|
||||
engine = SemanticEngine.from_sources({"opaque": src})
|
||||
|
||||
report = engine.validate()
|
||||
|
||||
assert not any("SELECT projection" in e for e in report.errors)
|
||||
|
||||
|
||||
class TestJoinValidation:
|
||||
def test_join_local_column_must_exist(self):
|
||||
|
|
@ -246,7 +356,9 @@ class TestJoinValidation:
|
|||
report = engine.validate(recently_touched={"large_contract_requesters"})
|
||||
|
||||
assert not report.valid
|
||||
assert any("mart_account_segments" in e and "joins[]" in e for e in report.errors)
|
||||
assert any(
|
||||
"mart_account_segments" in e and "joins[]" in e for e in report.errors
|
||||
)
|
||||
|
||||
|
||||
class TestDisconnectedComponents:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue