Improve schema setup and Notion ingest UX (#14)

* Improve schema setup and Notion ingest UX

* Handle Postgres network scan failures

* WIP: save local changes before main merge

* Refine setup prompt choices

* Tighten ingest reconciliation guidance

* Commit setup config updates

* Canonicalize unmapped fallback details

* Count reconciliation actions in reports

* Harden semantic layer source validation

* Return wiki content after edits

* Validate SL sources against manifests

* Validate wiki refs before writes

* Simplify CLI next steps

* Clarify agent setup summary

* Surface dbt target SL sources

* Recover SL write fallbacks

* Preserve failed context build metadata

* Track raw paths for ingest actions

* test(cli): update seeded demo expectations

* fix(ingest): scope fallback recovery checks

* fix(sl): tighten source validation guards

* fix(wiki): ignore empty embedding vectors

* Improve Notion ingest UX

* Enforce flat wiki keys

* test(context): update wiki key assertion

---------

Co-authored-by: Andrey Avtomonov <andreybavt@gmail.com>
This commit is contained in:
Luca Martial 2026-05-12 16:56:58 -04:00 committed by GitHub
parent 866d33e71a
commit 60457e9407
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
116 changed files with 4177 additions and 610 deletions

View file

@ -12,6 +12,7 @@ from semantic_layer.models import (
)
from semantic_layer.planner import QueryPlanner
from semantic_layer.sql_table_extractor import (
extract_projected_columns,
extract_table_refs,
ref_matches_source_table,
)
@ -62,6 +63,7 @@ class SemanticEngine:
report = ValidationReport()
self._check_orphan_join_targets(report)
self._check_invalid_grain(report)
self._check_join_columns(report)
self._check_sql_join_coverage(report, recently_touched=recently_touched)
self._check_disconnected_components(report, recently_touched=recently_touched)
return report
@ -82,15 +84,144 @@ class SemanticEngine:
report.errors.extend(self._collect_orphan_join_target_errors())
def _check_invalid_grain(self, report: ValidationReport) -> None:
dialect = getattr(self.generator, "dialect", "postgres")
for source in self.sources.values():
qualified_grain: set[str] = set()
for grain_col in source.grain:
if "." in grain_col:
qualified_grain.add(grain_col)
report.errors.append(
f"Source '{source.name}' grain entry '{grain_col}' is a "
f"qualified name. Grain must use unqualified output column "
f"names (e.g. 'account_id', not 'activity.account_id')."
)
for col in source.columns:
if "." in col.name:
report.errors.append(
f"Source '{source.name}' column name '{col.name}' contains "
f"'.'. Column names must be unqualified."
)
column_names = {c.name for c in source.columns}
for grain_col in source.grain:
if grain_col in qualified_grain:
continue
if grain_col not in column_names:
report.errors.append(
f"Source '{source.name}' has grain column '{grain_col}' "
f"that is not in its columns list"
)
if source.is_sql_source and source.sql:
projected = extract_projected_columns(source.sql, dialect=dialect)
if projected is not None:
for grain_col in source.grain:
if grain_col in qualified_grain:
continue
if grain_col not in projected:
report.errors.append(
f"Source '{source.name}' grain column '{grain_col}' "
f"is not in the SQL SELECT projection. Add it to the "
f"SELECT list (or remove it from grain)."
)
def _check_join_columns(self, report: ValidationReport) -> None:
for source in self.sources.values():
source_columns = {c.name for c in source.columns}
for join in source.joins:
target = self.sources.get(join.to)
if target is None:
continue
target_columns = {c.name for c in target.columns}
try:
local_raw, target_raw = self.graph._parse_on(join.on, join.to)
except ValueError as exc:
report.errors.append(
f"Source '{source.name}' has invalid join to '{join.to}': {exc}"
)
continue
local_cols = [
col.strip() for col in local_raw.split(",") if col.strip()
]
target_cols = [
col.strip() for col in target_raw.split(",") if col.strip()
]
for local_col in local_cols:
if local_col not in source_columns:
report.errors.append(
f"Source '{source.name}' joins to '{join.to}' on "
f"local column '{local_col}', but '{local_col}' is not "
f"in '{source.name}' columns list"
)
for target_col in target_cols:
if target_col not in target_columns:
report.errors.append(
f"Source '{source.name}' joins to '{join.to}' on "
f"target column '{target_col}', but '{target_col}' is not "
f"in '{join.to}' columns list"
)
if join.relationship not in {"many_to_one", "one_to_one"}:
continue
for local_col, target_col in zip(local_cols, target_cols, strict=False):
if (
local_col in source_columns
and target_col in target_columns
and target_col in target.grain
and self._looks_like_display_value_to_identifier(
local_col, target_col
)
):
report.errors.append(
f"Source '{source.name}' joins '{local_col}' to "
f"'{join.to}.{target_col}', but '{local_col}' looks like "
"a display value and the target column is an identifier "
"grain. Project the matching key column or omit this join."
)
@staticmethod
def _looks_like_display_value_to_identifier(
local_col: str, target_col: str
) -> bool:
if target_col != "id" and not target_col.endswith("_id"):
return False
display_names = {"name", "email", "label", "title", "description"}
display_suffixes = (
"_name",
"_email",
"_label",
"_title",
"_description",
)
return local_col in display_names or local_col.endswith(display_suffixes)
@staticmethod
def _source_exposes_join_key(
source: SourceDefinition, target: SourceDefinition
) -> bool:
source_columns = {c.name.lower() for c in source.columns}
target_name = target.name.lower()
target_name_singular = (
target_name[:-1] if target_name.endswith("s") else target_name
)
for grain_col in target.grain:
grain = grain_col.lower()
if grain in source_columns:
return True
if grain == "id":
candidates = {
f"{target_name}_id",
f"{target_name_singular}_id",
}
if source_columns.intersection(candidates):
return True
continue
if any(col.endswith(f"_{grain}") for col in source_columns):
return True
return False
def _check_sql_join_coverage(
self,
report: ValidationReport,
@ -135,6 +266,8 @@ class SemanticEngine:
continue
if hit_name.lower() in declared:
continue
if not self._source_exposes_join_key(source, self.sources[hit_name]):
continue
if hit_name not in missing:
missing.append(hit_name)
@ -148,11 +281,12 @@ class SemanticEngine:
)
msg = (
f"Source '{source.name}' SQL joins manifest table(s) [{ref_list}] "
f"that are not declared in joins[]. Add a join entry for each, "
f"that have projected key columns but are not declared in joins[]. "
f"Add a join entry for each, "
f"e.g. {{to: {example}, on: '{source.name}.<your_fk> = "
f"{example}.{grain_col}', relationship: many_to_one}}. If a "
f"reference is intentionally absent, document it with a "
f"`unmapped-table-*` wiki note and remove the SQL reference."
f"{example}.{grain_col}', relationship: many_to_one}}. If the "
"SQL intentionally keeps a referenced table internal, omit "
"that table's key column from the SQL source output."
)
report.errors.append(msg)

View file

@ -70,3 +70,30 @@ def ref_matches_source_table(ref: tuple[str, ...], source_table: str) -> bool:
if len(ref) > len(src):
return False
return src[-len(ref) :] == ref
def extract_projected_columns(sql: str, dialect: str = "postgres") -> set[str] | None:
"""Return the set of output column names projected by `sql`.
Returns None if the projection cannot be statically determined when
SELECT * (or qualified `t.*`) is present, or when parsing fails. Callers
should treat None as "unknown projection" and skip projection-dependent
checks rather than reporting a false-positive error.
"""
try:
tree = sqlglot.parse_one(sql, read=dialect)
except Exception as e:
logger.debug("extract_projected_columns: parse failed (%s); skipping", e)
return None
if not isinstance(tree, exp.Select):
return None
for projection in tree.expressions:
# Bare `*` or `t.*` — projection list is opaque.
if isinstance(projection, exp.Star):
return None
if isinstance(projection, exp.Column) and isinstance(projection.this, exp.Star):
return None
return {name for name in tree.named_selects if name}

View file

@ -119,6 +119,275 @@ class TestInvalidGrain:
assert not report.valid
assert any("bad" in e and "nonexistent_col" in e for e in report.errors)
def test_qualified_grain_name_is_rejected(self):
bad = _src(
"activity",
columns=["account_id"],
grain=["activity.account_id"],
)
engine = SemanticEngine.from_sources({"activity": bad})
report = engine.validate()
assert not report.valid
assert any(
"activity" in e and "activity.account_id" in e and "qualified" in e
for e in report.errors
)
def test_qualified_column_name_is_rejected(self):
bad = SourceDefinition(
name="activity",
table="public.activity",
grain=["account_id"],
columns=[
SourceColumn(name="account_id", type="number"),
SourceColumn(name="activity.user_id", type="number"),
],
)
engine = SemanticEngine.from_sources({"activity": bad})
report = engine.validate()
assert not report.valid
assert any(
"activity" in e and "activity.user_id" in e and "unqualified" in e
for e in report.errors
)
def test_sql_source_grain_missing_from_projection(self):
bad = SourceDefinition(
name="large_contract_requesters",
sql=(
"select account.account_name, requester.email as requester_email "
"from orbit_raw.actions activity "
"join orbit_raw.accounts account "
" on account.account_id = activity.account_id "
"join orbit_raw.users requester "
" on requester.user_id = activity.user_id"
),
grain=["account_id", "user_id"],
columns=[
SourceColumn(name="account_id", type="number"),
SourceColumn(name="user_id", type="number"),
SourceColumn(name="account_name", type="string"),
SourceColumn(name="requester_email", type="string"),
],
)
engine = SemanticEngine.from_sources({"large_contract_requesters": bad})
report = engine.validate()
assert not report.valid
assert any(
"large_contract_requesters" in e
and "account_id" in e
and "SELECT projection" in e
for e in report.errors
)
def test_sql_source_grain_in_projection_passes(self):
good = SourceDefinition(
name="contract_requesters",
sql=(
"select activity.account_id, activity.user_id, "
"account.account_name, requester.email as requester_email "
"from orbit_raw.actions activity "
"join orbit_raw.accounts account "
" on account.account_id = activity.account_id "
"join orbit_raw.users requester "
" on requester.user_id = activity.user_id"
),
grain=["account_id", "user_id"],
columns=[
SourceColumn(name="account_id", type="number"),
SourceColumn(name="user_id", type="number"),
SourceColumn(name="account_name", type="string"),
SourceColumn(name="requester_email", type="string"),
],
)
engine = SemanticEngine.from_sources({"contract_requesters": good})
report = engine.validate()
# No grain-related errors. (Other validators may emit unrelated
# warnings — we just assert the grain check is clean.)
assert not any("grain" in e or "SELECT projection" in e for e in report.errors)
def test_sql_source_with_select_star_skips_projection_check(self):
# SELECT * means we can't statically know projected columns;
# the projection check must skip rather than false-fail.
src = SourceDefinition(
name="opaque",
sql="select * from public.events",
grain=["event_id"],
columns=[SourceColumn(name="event_id", type="number")],
)
engine = SemanticEngine.from_sources({"opaque": src})
report = engine.validate()
assert not any("SELECT projection" in e for e in report.errors)
class TestJoinValidation:
def test_join_local_column_must_exist(self):
orders = _src(
"orders",
columns=["id"],
joins=[
JoinDeclaration(
to="customers",
on="customer_id = customers.id",
relationship="many_to_one",
)
],
)
customers = _src("customers")
engine = SemanticEngine.from_sources({"orders": orders, "customers": customers})
report = engine.validate()
assert not report.valid
assert any(
"orders" in e and "customer_id" in e and "columns list" in e
for e in report.errors
)
def test_many_to_one_join_rejects_display_name_to_id_grain(self):
requesters = _src(
"large_contract_requesters",
columns=["account_name", "requester_email"],
grain=["requester_email"],
joins=[
JoinDeclaration(
to="mart_account_segments",
on="account_name = mart_account_segments.account_id",
relationship="many_to_one",
)
],
)
accounts = _src(
"mart_account_segments",
columns=["account_id", "account_name"],
grain=["account_id"],
)
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"mart_account_segments": accounts,
}
)
report = engine.validate()
assert not report.valid
assert any(
"large_contract_requesters" in e
and "account_name" in e
and "mart_account_segments.account_id" in e
for e in report.errors
)
def test_sql_join_coverage_does_not_require_join_without_projected_key(self):
requesters = SourceDefinition(
name="large_contract_requesters",
sql="""
select accounts.account_name, users.email as requester_email
from orbit_raw.requests requests
join public.mart_account_segments accounts
on requests.account_id = accounts.account_id
join orbit_raw.users users
on requests.user_id = users.user_id
""",
grain=["requester_email"],
columns=[
SourceColumn(name="account_name", type="string"),
SourceColumn(name="requester_email", type="string"),
],
joins=[],
)
accounts = _src(
"mart_account_segments",
columns=["account_id", "account_name"],
grain=["account_id"],
)
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"mart_account_segments": accounts,
}
)
report = engine.validate(recently_touched={"large_contract_requesters"})
assert report.errors == []
def test_sql_join_coverage_does_not_treat_unrelated_id_suffix_as_id_key(self):
requesters = SourceDefinition(
name="large_contract_requesters",
sql="""
select accounts.account_name, requests.user_id
from orbit_raw.requests requests
join public.accounts accounts
on requests.account_id = accounts.id
""",
grain=["user_id"],
columns=[
SourceColumn(name="account_name", type="string"),
SourceColumn(name="user_id", type="string"),
],
joins=[],
)
accounts = _src("accounts", columns=["id", "account_name"], grain=["id"])
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"accounts": accounts,
}
)
report = engine.validate(recently_touched={"large_contract_requesters"})
assert report.errors == []
def test_sql_join_coverage_requires_join_when_projected_key_exists(self):
requesters = SourceDefinition(
name="large_contract_requesters",
sql="""
select accounts.account_id, users.email as requester_email
from orbit_raw.requests requests
join public.mart_account_segments accounts
on requests.account_id = accounts.account_id
join orbit_raw.users users
on requests.user_id = users.user_id
""",
grain=["requester_email"],
columns=[
SourceColumn(name="account_id", type="string"),
SourceColumn(name="requester_email", type="string"),
],
joins=[],
)
accounts = _src(
"mart_account_segments",
columns=["account_id", "account_name"],
grain=["account_id"],
)
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"mart_account_segments": accounts,
}
)
report = engine.validate(recently_touched={"large_contract_requesters"})
assert not report.valid
assert any(
"mart_account_segments" in e and "joins[]" in e for e in report.errors
)
class TestDisconnectedComponents:
def test_two_components_produce_warning_not_error(self):