Tighten ingest reconciliation guidance

This commit is contained in:
Luca Martial 2026-05-11 17:20:02 -07:00
parent 9f91c26752
commit 6d00cbbc2e
16 changed files with 382 additions and 20 deletions

View file

@ -242,6 +242,35 @@ describe('setup sources step', () => {
});
});
it('uses selected Notion roots when root page ids are provided even if crawl mode says all accessible', async () => {
await addPrimarySource();
const validateNotion = vi.fn(async () => ({ ok: true as const, detail: 'roots=1' }));
await expect(
runKtxSetupSourcesStep(
{
projectDir,
inputMode: 'disabled',
source: 'notion',
sourceConnectionId: 'notion-main',
sourceApiKeyRef: 'env:NOTION_TOKEN',
notionCrawlMode: 'all_accessible',
notionRootPageIds: ['page-1'],
runInitialSourceIngest: false,
skipSources: false,
},
makeIo().io,
{ validateNotion },
),
).resolves.toEqual({ status: 'ready', projectDir, connectionIds: ['notion-main'] });
expect((await readConfig()).connections['notion-main']).toMatchObject({
driver: 'notion',
root_page_ids: ['page-1'],
crawl_mode: 'selected_roots',
});
});
it('defaults interactive Metabase and Looker source setup to the only warehouse connection', async () => {
await addPrimarySource();
const cases: Array<{

View file

@ -510,8 +510,8 @@ function buildLookmlConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionC
}
function buildNotionConnection(args: KtxSetupSourcesArgs): KtxProjectConnectionConfig {
const crawlMode = args.notionCrawlMode ?? 'selected_roots';
const rootPageIds = args.notionRootPageIds ?? [];
const crawlMode = rootPageIds.length > 0 ? 'selected_roots' : (args.notionCrawlMode ?? 'selected_roots');
if (crawlMode === 'selected_roots' && rootPageIds.length === 0) {
throw new Error('Notion selected_roots requires --notion-root-page-id.');
}
@ -1189,8 +1189,8 @@ async function promptForInteractiveSource(
const crawlMode = await prompts.select({
message: 'Which Notion pages should KTX ingest?',
options: [
{ value: 'all_accessible', label: 'All pages the integration can access' },
{ value: 'selected_roots', label: 'Specific pages and their subpages (you\'ll paste page IDs)' },
{ value: 'all_accessible', label: 'All pages the integration can access' },
{ value: 'back', label: 'Back' },
],
});

View file

@ -10,10 +10,10 @@ Parsimonious. Stage 3 WUs already loaded `ingest_triage` and handled conflicts t
1. Load `ingest_triage`, then `sl_capture` + `knowledge_capture`.
2. Call `stage_list()` for the full index of this job's writes. If it is empty AND you have no evictions, exit — the runner short-circuits this case but the skill still teaches you to bail fast.
3. If the system prompt includes `<canonical_pins>`, apply those pins before flagging a same-name or near-duplicate conflict. A pinned `canonicalArtifactKey` keeps the contested name when it is present in the Stage Index; competing variants keep or receive disambiguated names.
4. For each pair of WUs that wrote overlapping SL source names or wiki keys, call `stage_diff` to see the actual difference. If they're the same content, leave it. If they differ per `ingest_triage` rules, apply the correct resolution (rename + capture; election of canonical; silent replace for expression-only re-ingest change; or pinned canonical), then call `emit_conflict_resolution` with the artifact key and decision.
4. Sweep both exact-key conflicts and near-duplicate writes. Compare WUs that wrote overlapping SL source names, overlapping wiki keys, the same `tables:` or `sl_refs:` action details, or obviously equivalent topic titles under different wiki keys. Call `stage_diff` to see the actual difference, and use `wiki_read`/`sl_read_source` when two different keys appear to describe the same table, metric, or source-of-truth mapping. If they're the same content, leave one canonical artifact and record the duplicate as subsumed. If they differ per `ingest_triage` rules, apply the correct resolution (rename + capture; election of canonical; silent replace for expression-only re-ingest change; or pinned canonical), then call `emit_conflict_resolution` with the artifact key and decision.
5. Call `eviction_list()` for deleted raw paths. For each eviction: if inbound refs are empty, remove the artifact (`sl_delete`, `wiki_remove`); if inbound refs exist, retain with a deprecation marker. Then call `emit_eviction_decision` for every removed or retained artifact.
6. If the Stage 4 sweep discovers a raw file whose only honest outcome is standalone SQL, wiki-only capture, or a human flag, call `emit_unmapped_fallback` with the raw path, reason, and fallback kind.
7. Use `read_raw_span` to zoom into specific raw files when you need to resolve what two contested measures actually compute.
7. Use `read_raw_span` to zoom into specific raw files when you need to resolve what two contested measures or wiki pages actually describe.
8. Exit when you've processed every item.
</workflow>

View file

@ -57,7 +57,7 @@ For each card:
- Trivial query (`SELECT *`, simple `COUNT(*)` with no business logic) → do nothing; the runner will record this card as `action_type='skipped'`.
- Duplicate of an existing measure → same as trivial; do nothing for this card.
**Manifest-only names need an overlay first.** If `sl_discover` shows a source name with `Type: table` but `sl_read_source` returns "Source not found", the source lives only in the schema manifest (no standalone overlay yet). `sl_edit_source` cannot edit manifest-only names — you must bootstrap an overlay with `sl_write_source` using the overlay shape:
**Manifest-only names need an overlay first.** If `sl_discover` shows a source name with `Type: table` but `sl_read_source` returns "Source not found", the source lives only in the schema manifest (no standalone overlay yet). `sl_edit_source` cannot edit manifest-only names, and a full standalone `sl_write_source` for that name would shadow manifest columns and joins. Bootstrap an overlay with `sl_write_source` using the overlay shape:
```yaml
name: <SOURCE_NAME>
@ -68,7 +68,7 @@ measures:
Overlay shape: `name:` plus any of `measures:`, `segments:`, `description:`, `joins:`, `disable_joins:`. Never include `sql:`, `table:`, `grain:`, or `columns:` on a manifest-backed name — those would shadow the manifest's schema and drop its joins. Overlay `joins:` are merged additively with the manifest's joins (deduped by `to` + `on`); use `disable_joins: ["<on-clause>"]` to suppress a specific manifest join. After the overlay exists, use `sl_edit_source` for further tweaks. See `sl_capture` skill for the canonical overlay rule.
**Join discovery:** When your card's SQL references warehouse tables (e.g. in `FROM` or `JOIN` clauses), call `sl_discover({ query: '<table>' })` before writing. The matching manifest entry's `name` is the value you put in `joins: [- to: <name>]`. Use `many_to_one` for FK-to-dimension joins, `one_to_many` for the reverse.
**Join discovery:** When your card's SQL references warehouse tables (e.g. in `FROM` or `JOIN` clauses), call `sl_discover({ query: '<table>' })` before writing. The matching manifest entry's `name` is the value you use in `joins: [- to: <name>]` only when the card output exposes a local key that matches the target source grain (for example `account_id = mart_account_segments.account_id`). Do not declare a KTX join just because the card SQL joins that table internally. If the output only exposes display fields such as `account_name`, keep the SQL source self-contained or project the key before adding the join. Use `many_to_one` for FK-to-dimension joins, `one_to_many` for the reverse.
## priorProvenance
@ -162,7 +162,7 @@ After Steps A and B, your SQL must:
- Reference no aliases that aren't defined inside the SQL itself.
- Be valid as a standalone subquery (the validator runs `SELECT * FROM (your_sql) LIMIT 1`).
If `resolutionStatus: "fallback"` and the SQL is still complex enough that you can't confidently translate it, **skip the card** rather than writing broken SQL. Call `emit_unmapped_fallback` with the staged card path as `rawPath`, `reason: "metabase_sql_untranslated"`, and `fallback: "flagged"`.
If `resolutionStatus: "fallback"` and the SQL is still complex enough that you can't confidently translate it, **skip the card** rather than writing broken SQL. Call `emit_unmapped_fallback` with the staged card path as `rawPath`, `reason: "parse_error"`, `detail: "metabase_sql_untranslated"`, and `fallback: "flagged"`.
## Join-graph connectivity
@ -171,8 +171,9 @@ For `source_type: table`:
- Match column names ending in `_id` against existing sources' grain columns.
For `source_type: sql`:
- The validator parses your SQL and **rejects the write** if any FROM/JOIN table has a manifest entry that you did not declare in `joins:`. The error names every missing join target — declare a `many_to_one` join for each and reissue.
- Tables outside the manifest (schemas not covered by this connection — e.g. `staging.*` referenced from a MARTS source) are not flagged. For those, write a single-line `wiki_write` with key `unmapped-table-<table_name>` so the gap is documented, then call `emit_unmapped_fallback` with the staged card path as `rawPath`, `reason: "table_outside_manifest"`, and `fallback: "wiki_only"`.
- The validator parses your SQL and rejects the write when a referenced manifest table has a viable projected local key but no declared `joins:` entry. Add the join only after confirming the output key and target grain match.
- If `sl_discover` resolves the table, it is not outside the manifest. Do not write an `unmapped-table-*` fallback for resolved `orbit_raw`, `mart`, or other manifest-backed sources just because they appear inside card SQL.
- If `sl_discover` cannot resolve a referenced table at all, write a single-line `wiki_write` with key `unmapped-table-<table_name>` so the gap is documented, then call `emit_unmapped_fallback` with the staged card path as `rawPath`, `reason: "missing_target_table"`, and `fallback: "wiki_only"`.
Joins on manifest-backed names compose: the manifest's joins are inherited automatically, and any overlay `joins:` are merged on top (deduped by `to` + `on`). Use `disable_joins: ["<on-clause>"]` in the overlay to suppress a specific manifest join. If `sl_discover` shows a manifest-backed source with `Joins: 0` and the warehouse FK metadata is genuinely absent, declaring application-level joins via the overlay is fair game — bootstrap with `sl_write_source` (overlay shape above), then refine via `sl_edit_source`.

View file

@ -46,6 +46,8 @@ Prefer fewer, stronger entries. Every wiki entry must cite at least one Notion p
If a clustered WorkUnit includes several related pages, synthesize the shared rule or concept instead of writing one thin page per source. For oversized page spans, read only the assigned span unless the WorkUnit explicitly asks for neighboring context.
Search existing wiki pages for the same `tables:` or `sl_refs:` frontmatter and for source-of-truth aliases before creating a new page. If an existing page already documents the same warehouse object or business concept, update it instead of creating a differently named duplicate.
## Citation Style
```md
@ -61,6 +63,7 @@ If a clustered WorkUnit includes several related pages, synthesize the shared ru
- Discover existing sources first with `sl_discover`; read existing source YAML before editing.
- Prefer overlays on manifest-backed sources over standalone SQL.
- If Notion describes a dashboard or metric but does not define executable logic, write a wiki page and attach `sl_refs` only after confirming the referenced source exists.
- Notion `dataSourceCount` counts Notion databases/data sources only. It does not prove that a warehouse/dbt table has or lacks a mapped semantic-layer source.
- Do not create SL sources under the Notion connection just because a page mentions a warehouse, dbt, Looker, or Metabase object. Use the mapped warehouse/source connection after discovery, or emit an unmapped fallback and write wiki-only.
- Distinguish fallback reasons precisely: if a non-Notion warehouse/dbt connection exists but `sl_discover` cannot find the named table/source, use `no_physical_table`; reserve `no_connection_mapping` for cases where there is no plausible non-Notion target connection at all.

View file

@ -8,7 +8,7 @@ const MAX_NOTION_WORK_UNIT_CHARS = 40_000;
export const NOTION_ORG_KNOWLEDGE_WARNING =
'Anything accessible to this Notion integration can become organization knowledge.';
const NOTION_SL_WRITE_GUIDANCE =
'Write wiki entries with wiki_write. Only write or edit SL sources after sl_discover/sl_read_source confirms a mapped non-Notion target source; if no mapped target exists, emit_unmapped_fallback and keep the fact wiki-only. If a warehouse/dbt connection exists but the named table or source is absent, use reason no_physical_table rather than no_connection_mapping. Do not create SL sources under the Notion connection just because a page mentions a warehouse table.';
'Write wiki entries with wiki_write. Search existing wiki pages for the same tables or sl_refs before creating a new page. Only write or edit SL sources after sl_discover/sl_read_source confirms a mapped non-Notion target source; if no mapped target exists, emit_unmapped_fallback and keep the fact wiki-only. Notion dataSourceCount counts Notion databases/data sources only, not warehouse/dbt mappings. If a warehouse/dbt connection exists but the named table or source is absent, use reason no_physical_table rather than no_connection_mapping. Do not create SL sources under the Notion connection just because a page mentions a warehouse table.';
async function walk(root: string): Promise<string[]> {
const entries = await readdir(root, { withFileTypes: true, recursive: true });
@ -117,6 +117,8 @@ export async function chunkNotionStagedDir(stagedDir: string, diffSet?: DiffSet)
reconcileNotes: [
`Notion maxKnowledgeCreatesPerRun=${manifest.maxKnowledgeCreatesPerRun}`,
`Notion maxKnowledgeUpdatesPerRun=${manifest.maxKnowledgeUpdatesPerRun}`,
'Notion dataSourceCount is Notion-only; use sl_discover for warehouse/dbt mapping decisions.',
'Reconcile Notion wiki pages sharing tables/sl_refs before creating distinct artifacts.',
],
contextReport: {
capped: manifest.capped,

View file

@ -84,6 +84,22 @@ describe('clusterNotionWorkUnits', () => {
}
});
test('merges pages into one synthesis unit at the clustering threshold', async () => {
const pages = Array.from({ length: MIN_PAGES_TO_CLUSTER }, (_, i) => ({
id: `p${i}`,
title: `Customer source reference ${i}`,
body: `Customer source reference maps to orbit_analytics.customer ${i}`.repeat(10),
}));
const stagedDir = await makeStaged(pages);
const wus = makeWorkUnits(pages);
const out = await clusterNotionWorkUnits({ workUnits: wus, stagedDir, embedding: mockEmbed });
expect(out).toHaveLength(1);
expect(out[0].unitKey).toBe('notion-cluster-1');
expect(new Set(out[0].rawFiles)).toEqual(new Set(wus.flatMap((wu) => wu.rawFiles)));
expect(out[0].notes).toContain('emit_unmapped_fallback');
expect(out[0].notes).toContain('Do not create SL sources under the Notion connection');
});
test('preserves coverage: every input rawFile appears in some cluster', async () => {
const pages = Array.from({ length: 12 }, (_, i) => ({
id: `p${i}`,

View file

@ -9,7 +9,7 @@ export const MIN_PAGES_TO_CLUSTER = 5;
const CLUSTER_TEXT_BODY_CHARS = 1024;
const CLUSTER_SEED = 42;
const NOTION_CLUSTER_SL_WRITE_GUIDANCE =
'Write wiki entries directly with wiki_write. Only write or edit SL sources after sl_discover/sl_read_source confirms a mapped non-Notion target source; if no mapped target exists, emit_unmapped_fallback and keep the fact wiki-only. If a warehouse/dbt connection exists but the named table or source is absent, use reason no_physical_table rather than no_connection_mapping. Do not create SL sources under the Notion connection just because a page mentions a warehouse table.';
'Write wiki entries directly with wiki_write. Search existing wiki pages for the same tables or sl_refs before creating a new page. Only write or edit SL sources after sl_discover/sl_read_source confirms a mapped non-Notion target source; if no mapped target exists, emit_unmapped_fallback and keep the fact wiki-only. Notion dataSourceCount counts Notion databases/data sources only, not warehouse/dbt mappings. If a warehouse/dbt connection exists but the named table or source is absent, use reason no_physical_table rather than no_connection_mapping. Do not create SL sources under the Notion connection just because a page mentions a warehouse table.';
interface ClusterNotionWorkUnitsArgs {
workUnits: WorkUnit[];
@ -74,7 +74,7 @@ export async function clusterNotionWorkUnits(args: ClusterNotionWorkUnitsArgs):
const { workUnits, stagedDir, embedding } = args;
if (workUnits.length < MIN_PAGES_TO_CLUSTER) return workUnits;
const k = pickK(workUnits.length);
if (k <= 1) return workUnits;
if (k <= 1) return [mergeWorkUnits(workUnits, 0)];
const texts = await Promise.all(workUnits.map((wu) => buildClusterText(wu, stagedDir)));
let vectors: number[][];
try {

View file

@ -247,6 +247,8 @@ describe('NotionSourceAdapter', () => {
expect(result.reconcileNotes).toEqual([
'Notion maxKnowledgeCreatesPerRun=25',
'Notion maxKnowledgeUpdatesPerRun=20',
'Notion dataSourceCount is Notion-only; use sl_discover for warehouse/dbt mapping decisions.',
'Reconcile Notion wiki pages sharing tables/sl_refs before creating distinct artifacts.',
]);
expect(result.contextReport).toEqual({ capped: false, warnings: [NOTION_ORG_KNOWLEDGE_WARNING] });
});

View file

@ -1,5 +1,5 @@
import { describe, expect, it, vi } from 'vitest';
import { buildReconcileSystemPrompt, buildReconcileToolSet } from './build-reconcile-context.js';
import { buildReconcileSystemPrompt, buildReconcileToolSet, buildReconcileUserPrompt } from './build-reconcile-context.js';
describe('buildReconcileSystemPrompt', () => {
it('appends canonical pins when relevant pins are supplied', () => {
@ -39,6 +39,40 @@ describe('buildReconcileSystemPrompt', () => {
});
});
describe('buildReconcileUserPrompt', () => {
it('includes action details so reconciliation can compare different keys for the same table', () => {
const prompt = buildReconcileUserPrompt(
{
jobId: 'j1',
connectionId: 'notion',
workUnits: [
{
unitKey: 'notion-a',
rawFiles: ['pages/a/page.md'],
status: 'success',
actions: [
{
target: 'wiki',
type: 'created',
key: 'orbit-customer-source-reference',
detail: 'tables: orbit_analytics.customer',
},
],
touchedSlSources: [],
},
],
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
},
undefined,
);
expect(prompt).toContain('orbit-customer-source-reference');
expect(prompt).toContain('tables: orbit_analytics.customer');
});
});
describe('buildReconcileToolSet', () => {
it('includes emit_unmapped_fallback with the reconciliation tools', () => {
const toolSet = buildReconcileToolSet({

View file

@ -104,6 +104,10 @@ function curatorPassStateSummary(runState?: ReconcilePromptRunState): string {
].join('\n');
}
function formatStageActionDetail(detail: string): string {
return detail.trim().replace(/\s+/g, ' ');
}
export function buildReconcileUserPrompt(
stageIndex: StageIndex,
ev: EvictionUnit | undefined,
@ -119,7 +123,14 @@ export function buildReconcileUserPrompt(
const actions =
wu.actions.length === 0
? ' actions: (none)'
: wu.actions.map((a) => ` - ${a.target}:${a.type} ${a.key}`).join('\n');
: wu.actions
.map((a) => {
const detail = formatStageActionDetail(a.detail);
return detail.length > 0
? ` - ${a.target}:${a.type} ${a.key}; detail: ${detail}`
: ` - ${a.target}:${a.type} ${a.key}`;
})
.join('\n');
return `- unitKey: ${wu.unitKey} (status=${wu.status})\n${actions}`;
})
.join('\n');

View file

@ -19,7 +19,14 @@ describe('stage_list tool', () => {
unitKey: 'u2',
rawFiles: ['b.yml'],
status: 'success',
actions: [{ target: 'wiki', type: 'created', key: 'page_b', detail: '' }],
actions: [
{
target: 'wiki',
type: 'created',
key: 'page_b',
detail: 'tables: orbit_analytics.customer',
},
],
touchedSlSources: [],
},
],
@ -36,6 +43,7 @@ describe('stage_list tool', () => {
expect(out).toContain('src_a');
expect(out).toContain('u2');
expect(out).toContain('page_b');
expect(out).toContain('tables: orbit_analytics.customer');
});
it('says empty when no writes', async () => {

View file

@ -6,6 +6,10 @@ export interface StageListDeps {
stageIndex: StageIndex;
}
function formatActionDetail(detail: string): string {
return detail.trim().replace(/\s+/g, ' ');
}
export function createStageListTool(deps: StageListDeps) {
return tool({
description:
@ -20,7 +24,14 @@ export function createStageListTool(deps: StageListDeps) {
const actions =
wu.actions.length === 0
? ' (no actions)'
: wu.actions.map((a) => ` - ${a.target}:${a.type} ${a.key}`).join('\n');
: wu.actions
.map((a) => {
const detail = formatActionDetail(a.detail);
return detail.length > 0
? ` - ${a.target}:${a.type} ${a.key}; detail: ${detail}`
: ` - ${a.target}:${a.type} ${a.key}`;
})
.join('\n');
return `- unitKey: ${wu.unitKey} (status=${wu.status})\n rawFiles: ${wu.rawFiles.join(', ') || '(none)'}\n actions:\n${actions}`;
})
.join('\n');

View file

@ -90,6 +90,25 @@ describe('memory runtime assets', () => {
expect(body).not.toContain('a standalone SL source only when raw evidence contains enough table or SQL structure');
});
it('ships Metabase guidance that avoids invalid joins for SQL-only card outputs', async () => {
const body = await readFile(join(skillsDir, 'metabase_ingest', 'SKILL.md'), 'utf-8');
expect(body).toContain('Do not declare a KTX join just because the card SQL joins that table internally');
expect(body).toContain('only when the card output exposes a local key that matches the target source grain');
expect(body).toContain('If `sl_discover` resolves the table, it is not outside the manifest');
expect(body).toContain('reason: "parse_error"');
expect(body).not.toContain('Tables outside the manifest');
expect(body).not.toContain('reason: "metabase_sql_untranslated"');
});
it('ships Notion guidance for physical-table fallbacks and duplicate wiki reconciliation', async () => {
const body = await readFile(join(skillsDir, 'notion_synthesize', 'SKILL.md'), 'utf-8');
expect(body).toContain('Notion `dataSourceCount` counts Notion databases/data sources only');
expect(body).toContain('Search existing wiki pages for the same `tables:` or `sl_refs:` frontmatter');
expect(body).toContain('no_physical_table');
});
it('packages LookML connection-mismatch SL gate guidance', async () => {
const body = await readFile(join(skillsDir, 'lookml_ingest', 'SKILL.md'), 'utf-8');

View file

@ -62,6 +62,7 @@ class SemanticEngine:
report = ValidationReport()
self._check_orphan_join_targets(report)
self._check_invalid_grain(report)
self._check_join_columns(report)
self._check_sql_join_coverage(report, recently_touched=recently_touched)
self._check_disconnected_components(report, recently_touched=recently_touched)
return report
@ -91,6 +92,99 @@ class SemanticEngine:
f"that is not in its columns list"
)
def _check_join_columns(self, report: ValidationReport) -> None:
for source in self.sources.values():
source_columns = {c.name for c in source.columns}
for join in source.joins:
target = self.sources.get(join.to)
if target is None:
continue
target_columns = {c.name for c in target.columns}
try:
local_raw, target_raw = self.graph._parse_on(join.on, join.to)
except ValueError as exc:
report.errors.append(
f"Source '{source.name}' has invalid join to '{join.to}': {exc}"
)
continue
local_cols = [col.strip() for col in local_raw.split(",") if col.strip()]
target_cols = [
col.strip() for col in target_raw.split(",") if col.strip()
]
for local_col in local_cols:
if local_col not in source_columns:
report.errors.append(
f"Source '{source.name}' joins to '{join.to}' on "
f"local column '{local_col}', but '{local_col}' is not "
f"in '{source.name}' columns list"
)
for target_col in target_cols:
if target_col not in target_columns:
report.errors.append(
f"Source '{source.name}' joins to '{join.to}' on "
f"target column '{target_col}', but '{target_col}' is not "
f"in '{join.to}' columns list"
)
if join.relationship not in {"many_to_one", "one_to_one"}:
continue
for local_col, target_col in zip(local_cols, target_cols, strict=False):
if (
local_col in source_columns
and target_col in target_columns
and target_col in target.grain
and self._looks_like_display_value_to_identifier(
local_col, target_col
)
):
report.errors.append(
f"Source '{source.name}' joins '{local_col}' to "
f"'{join.to}.{target_col}', but '{local_col}' looks like "
"a display value and the target column is an identifier "
"grain. Project the matching key column or omit this join."
)
@staticmethod
def _looks_like_display_value_to_identifier(
local_col: str, target_col: str
) -> bool:
if target_col != "id" and not target_col.endswith("_id"):
return False
display_names = {"name", "email", "label", "title", "description"}
display_suffixes = (
"_name",
"_email",
"_label",
"_title",
"_description",
)
return local_col in display_names or local_col.endswith(display_suffixes)
@staticmethod
def _source_exposes_join_key(
source: SourceDefinition, target: SourceDefinition
) -> bool:
source_columns = {c.name.lower() for c in source.columns}
target_name = target.name.lower()
target_name_singular = (
target_name[:-1] if target_name.endswith("s") else target_name
)
for grain_col in target.grain:
grain = grain_col.lower()
if grain in source_columns:
return True
if any(col.endswith(f"_{grain}") for col in source_columns):
return True
if grain == "id":
candidates = {
f"{target_name}_id",
f"{target_name_singular}_id",
}
if source_columns.intersection(candidates):
return True
return False
def _check_sql_join_coverage(
self,
report: ValidationReport,
@ -135,6 +229,8 @@ class SemanticEngine:
continue
if hit_name.lower() in declared:
continue
if not self._source_exposes_join_key(source, self.sources[hit_name]):
continue
if hit_name not in missing:
missing.append(hit_name)
@ -148,11 +244,12 @@ class SemanticEngine:
)
msg = (
f"Source '{source.name}' SQL joins manifest table(s) [{ref_list}] "
f"that are not declared in joins[]. Add a join entry for each, "
f"that have projected key columns but are not declared in joins[]. "
f"Add a join entry for each, "
f"e.g. {{to: {example}, on: '{source.name}.<your_fk> = "
f"{example}.{grain_col}', relationship: many_to_one}}. If a "
f"reference is intentionally absent, document it with a "
f"`unmapped-table-*` wiki note and remove the SQL reference."
f"{example}.{grain_col}', relationship: many_to_one}}. If the "
"SQL intentionally keeps a referenced table internal, omit "
"that table's key column from the SQL source output."
)
report.errors.append(msg)

View file

@ -120,6 +120,135 @@ class TestInvalidGrain:
assert any("bad" in e and "nonexistent_col" in e for e in report.errors)
class TestJoinValidation:
def test_join_local_column_must_exist(self):
orders = _src(
"orders",
columns=["id"],
joins=[
JoinDeclaration(
to="customers",
on="customer_id = customers.id",
relationship="many_to_one",
)
],
)
customers = _src("customers")
engine = SemanticEngine.from_sources({"orders": orders, "customers": customers})
report = engine.validate()
assert not report.valid
assert any(
"orders" in e and "customer_id" in e and "columns list" in e
for e in report.errors
)
def test_many_to_one_join_rejects_display_name_to_id_grain(self):
requesters = _src(
"large_contract_requesters",
columns=["account_name", "requester_email"],
grain=["requester_email"],
joins=[
JoinDeclaration(
to="mart_account_segments",
on="account_name = mart_account_segments.account_id",
relationship="many_to_one",
)
],
)
accounts = _src(
"mart_account_segments",
columns=["account_id", "account_name"],
grain=["account_id"],
)
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"mart_account_segments": accounts,
}
)
report = engine.validate()
assert not report.valid
assert any(
"large_contract_requesters" in e
and "account_name" in e
and "mart_account_segments.account_id" in e
for e in report.errors
)
def test_sql_join_coverage_does_not_require_join_without_projected_key(self):
requesters = SourceDefinition(
name="large_contract_requesters",
sql="""
select accounts.account_name, users.email as requester_email
from orbit_raw.requests requests
join public.mart_account_segments accounts
on requests.account_id = accounts.account_id
join orbit_raw.users users
on requests.user_id = users.user_id
""",
grain=["requester_email"],
columns=[
SourceColumn(name="account_name", type="string"),
SourceColumn(name="requester_email", type="string"),
],
joins=[],
)
accounts = _src(
"mart_account_segments",
columns=["account_id", "account_name"],
grain=["account_id"],
)
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"mart_account_segments": accounts,
}
)
report = engine.validate(recently_touched={"large_contract_requesters"})
assert report.errors == []
def test_sql_join_coverage_requires_join_when_projected_key_exists(self):
requesters = SourceDefinition(
name="large_contract_requesters",
sql="""
select accounts.account_id, users.email as requester_email
from orbit_raw.requests requests
join public.mart_account_segments accounts
on requests.account_id = accounts.account_id
join orbit_raw.users users
on requests.user_id = users.user_id
""",
grain=["requester_email"],
columns=[
SourceColumn(name="account_id", type="string"),
SourceColumn(name="requester_email", type="string"),
],
joins=[],
)
accounts = _src(
"mart_account_segments",
columns=["account_id", "account_name"],
grain=["account_id"],
)
engine = SemanticEngine.from_sources(
{
"large_contract_requesters": requesters,
"mart_account_segments": accounts,
}
)
report = engine.validate(recently_touched={"large_contract_requesters"})
assert not report.valid
assert any("mart_account_segments" in e and "joins[]" in e for e in report.errors)
class TestDisconnectedComponents:
def test_two_components_produce_warning_not_error(self):
a = _src("a")