mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
docs: standardize fanout terminology (#218)
This commit is contained in:
parent
4827437f3a
commit
924868841d
19 changed files with 66 additions and 66 deletions
|
|
@ -253,7 +253,7 @@ const engine: EngineNode = {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
index: 3,
|
index: 3,
|
||||||
title: "Detect fan-out",
|
title: "Detect fanout",
|
||||||
detail: "group measures by source, flag chasm traps",
|
detail: "group measures by source, flag chasm traps",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import { SemanticLayerFlow } from "@/components/semantic-layer-flow";
|
||||||
**ktx**'s semantic layer is a compiler that turns intent into SQL. The agent
|
**ktx**'s semantic layer is a compiler that turns intent into SQL. The agent
|
||||||
declares _what_ it wants - measures, dimensions, filters - in a small
|
declares _what_ it wants - measures, dimensions, filters - in a small
|
||||||
semantic query. **ktx** figures out the _how_: which tables to join, what
|
semantic query. **ktx** figures out the _how_: which tables to join, what
|
||||||
grain to aggregate at, how to keep fan-out from inflating measures, and
|
grain to aggregate at, how to keep fanout from inflating measures, and
|
||||||
what dialect the warehouse speaks.
|
what dialect the warehouse speaks.
|
||||||
|
|
||||||
This page covers four mechanics:
|
This page covers four mechanics:
|
||||||
|
|
@ -16,7 +16,7 @@ This page covers four mechanics:
|
||||||
- The semantic query contract agents send to the compiler.
|
- The semantic query contract agents send to the compiler.
|
||||||
- The planner steps that turn a semantic query into SQL.
|
- The planner steps that turn a semantic query into SQL.
|
||||||
- The join graph that backs those steps, and how it's built.
|
- The join graph that backs those steps, and how it's built.
|
||||||
- The fan-out failure mode the compiler is designed to prevent.
|
- The fanout failure mode the compiler is designed to prevent.
|
||||||
|
|
||||||
## Imperative SQL vs declarative semantic querying
|
## Imperative SQL vs declarative semantic querying
|
||||||
|
|
||||||
|
|
@ -84,14 +84,14 @@ same ordered steps before any SQL is emitted.
|
||||||
2. **Pick an anchor and build the join tree.** Choose the largest measure
|
2. **Pick an anchor and build the join tree.** Choose the largest measure
|
||||||
source as the root, then run a shortest-path search across the typed
|
source as the root, then run a shortest-path search across the typed
|
||||||
join graph to reach every required source.
|
join graph to reach every required source.
|
||||||
3. **Detect fan-out.** Group measures by their owning source. If more
|
3. **Detect fanout.** Group measures by their owning source. If more
|
||||||
than one group exists, the planner marks the query as a chasm trap
|
than one group exists, the planner marks the query as a chasm trap
|
||||||
and switches to aggregate-locality compilation.
|
and switches to aggregate-locality compilation.
|
||||||
4. **Classify filters.** Split predicates into row-level (`WHERE`) and
|
4. **Classify filters.** Split predicates into row-level (`WHERE`) and
|
||||||
aggregate-level (`HAVING`) based on whether they reference a measure.
|
aggregate-level (`HAVING`) based on whether they reference a measure.
|
||||||
5. **Generate SQL.** Emit Postgres-shaped SQL with the right shape:
|
5. **Generate SQL.** Emit Postgres-shaped SQL with the right shape:
|
||||||
single-source aggregation when the query is safe, per-source CTEs
|
single-source aggregation when the query is safe, per-source CTEs
|
||||||
when fan-out is present.
|
when fanout is present.
|
||||||
6. **Transpile to the target dialect.** Run the result through `sqlglot`
|
6. **Transpile to the target dialect.** Run the result through `sqlglot`
|
||||||
so the warehouse receives syntax it understands.
|
so the warehouse receives syntax it understands.
|
||||||
|
|
||||||
|
|
@ -107,7 +107,7 @@ inverted, so the planner can traverse from any anchor.
|
||||||
| Relationship | Planning impact |
|
| Relationship | Planning impact |
|
||||||
|--------------|-----------------|
|
|--------------|-----------------|
|
||||||
| `many_to_one` | Safe direction for adding dimensions |
|
| `many_to_one` | Safe direction for adding dimensions |
|
||||||
| `one_to_many` | Multiplies measures and triggers fan-out handling |
|
| `one_to_many` | Multiplies measures and triggers fanout handling |
|
||||||
| `one_to_one` | Safe in either direction when keys match |
|
| `one_to_one` | Safe in either direction when keys match |
|
||||||
| Equal-cost paths | Treated as ambiguous; aliases or explicit joins resolve them |
|
| Equal-cost paths | Treated as ambiguous; aliases or explicit joins resolve them |
|
||||||
|
|
||||||
|
|
@ -286,9 +286,9 @@ inference. Each input contributes a different kind of authority.
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
## Fan-out and aggregate locality
|
## Fanout and aggregate locality
|
||||||
|
|
||||||
Fan-out is the classic analytics failure mode. Two fact tables join to a
|
Fanout is the classic analytics failure mode. Two fact tables join to a
|
||||||
shared dimension. A naive query joins them all together first, so each
|
shared dimension. A naive query joins them all together first, so each
|
||||||
row from one fact is multiplied by the matching rows from the other.
|
row from one fact is multiplied by the matching rows from the other.
|
||||||
Measures duplicate, numbers go wrong, and the agent doesn't notice.
|
Measures duplicate, numbers go wrong, and the agent doesn't notice.
|
||||||
|
|
@ -336,5 +336,5 @@ different from what the agent first proposed.
|
||||||
| Explain the semantic query shape | The semantic query contract | [ktx sl](/docs/cli-reference/ktx-sl) |
|
| Explain the semantic query shape | The semantic query contract | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||||
| Describe what the planner does between query and SQL | What the planner does | [ktx sl](/docs/cli-reference/ktx-sl) |
|
| Describe what the planner does between query and SQL | What the planner does | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||||
| Explain why **ktx** asks for grain and relationship types | The join graph | [Writing context](/docs/guides/writing-context) |
|
| Explain why **ktx** asks for grain and relationship types | The join graph | [Writing context](/docs/guides/writing-context) |
|
||||||
| Diagnose duplicated measures after a join | Fan-out and aggregate locality | [ktx sl](/docs/cli-reference/ktx-sl) |
|
| Diagnose duplicated measures after a join | Fanout and aggregate locality | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||||
| Describe how semantic context stays current | Building and maintaining the graph | [Reviewing Context](/docs/guides/reviewing-context) |
|
| Describe how semantic context stays current | Building and maintaining the graph | [Reviewing Context](/docs/guides/reviewing-context) |
|
||||||
|
|
|
||||||
|
|
@ -156,7 +156,7 @@ joins:
|
||||||
relationship: many_to_one
|
relationship: many_to_one
|
||||||
```
|
```
|
||||||
|
|
||||||
For how the compiler walks the join graph, handles fan-out, and transpiles
|
For how the compiler walks the join graph, handles fanout, and transpiles
|
||||||
dialects, read [Semantic querying](/docs/concepts/semantic-layer-internals).
|
dialects, read [Semantic querying](/docs/concepts/semantic-layer-internals).
|
||||||
|
|
||||||
## Wiki pages
|
## Wiki pages
|
||||||
|
|
@ -240,7 +240,7 @@ models every time the warehouse changes.
|
||||||
| **Surface** | Indexed docs and chats | Modeling language or runtime | YAML and Markdown files |
|
| **Surface** | Indexed docs and chats | Modeling language or runtime | YAML and Markdown files |
|
||||||
| **Data-stack awareness** | None - treats data tools as text | High for declared metrics, none for the surrounding warehouse | Built in: scans schemas, dbt, BI tools, and query history |
|
| **Data-stack awareness** | None - treats data tools as text | High for declared metrics, none for the surrounding warehouse | Built in: scans schemas, dbt, BI tools, and query history |
|
||||||
| **Maintenance** | Manual page authoring | Manual modeling, model-per-change | Auto-maintained: reconciles evidence with accepted files |
|
| **Maintenance** | Manual page authoring | Manual modeling, model-per-change | Auto-maintained: reconciles evidence with accepted files |
|
||||||
| **SQL safety** | None - generates plausible text | Compiled, dialect-correct | Compiled with join-graph and fan-out handling |
|
| **SQL safety** | None - generates plausible text | Compiled, dialect-correct | Compiled with join-graph and fanout handling |
|
||||||
| **Agent edit loop** | Text-only | Tied to the modeling workflow | First-class: patch files, validate, review diffs |
|
| **Agent edit loop** | Text-only | Tied to the modeling workflow | First-class: patch files, validate, review diffs |
|
||||||
|
|
||||||
If you already use MetricFlow, LookML, dbt, or BI tools, **ktx** can ingest that
|
If you already use MetricFlow, LookML, dbt, or BI tools, **ktx** can ingest that
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ referent (e.g., body of a `Semantic sources` page, or `sourceName` as a CLI arg)
|
||||||
| Wiki surface as a whole | **wiki** | "wiki context" |
|
| Wiki surface as a whole | **wiki** | "wiki context" |
|
||||||
| A single Markdown file | **wiki page** | — |
|
| A single Markdown file | **wiki page** | — |
|
||||||
| YAML vs Markdown contrast | **wiki Markdown** (only when contrasting with **semantic source YAML**) | — |
|
| YAML vs Markdown contrast | **wiki Markdown** (only when contrasting with **semantic source YAML**) | — |
|
||||||
| Joins multiplying rows (generic) | **fan-out** | — |
|
| Joins multiplying rows (generic) | **fanout** | — |
|
||||||
| The two named patterns | **chasm trap** / **fan trap** | — |
|
| The two named patterns | **chasm trap** / **fan trap** | — |
|
||||||
| Casual gloss in user prose | **double-count** | (avoid in technical/internals prose) |
|
| Casual gloss in user prose | **double-count** | (avoid in technical/internals prose) |
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,7 @@ describe('local ingest adapters', () => {
|
||||||
expect(looker?.fetch).toBeTypeOf('function');
|
expect(looker?.fetch).toBeTypeOf('function');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('returns the explicit Metabase fan-out boundary before runner construction', async () => {
|
it('returns the explicit Metabase fanout boundary before runner construction', async () => {
|
||||||
const metabase = createDefaultLocalIngestAdapters(project).find((adapter) => adapter.source === 'metabase');
|
const metabase = createDefaultLocalIngestAdapters(project).find((adapter) => adapter.source === 'metabase');
|
||||||
|
|
||||||
await expect(localPullConfigForAdapter(project, metabase!, 'warehouse')).rejects.toThrow(
|
await expect(localPullConfigForAdapter(project, metabase!, 'warehouse')).rejects.toThrow(
|
||||||
|
|
|
||||||
|
|
@ -336,7 +336,7 @@ export async function runLocalMetabaseIngest(
|
||||||
options: RunLocalMetabaseIngestOptions,
|
options: RunLocalMetabaseIngestOptions,
|
||||||
): Promise<LocalMetabaseFanoutResult> {
|
): Promise<LocalMetabaseFanoutResult> {
|
||||||
if ((options as RunLocalMetabaseIngestOptions & { sourceDir?: string }).sourceDir) {
|
if ((options as RunLocalMetabaseIngestOptions & { sourceDir?: string }).sourceDir) {
|
||||||
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');
|
throw new Error('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||||
}
|
}
|
||||||
|
|
||||||
const metabaseConnectionId = safeSegment('metabase connection id', options.metabaseConnectionId);
|
const metabaseConnectionId = safeSegment('metabase connection id', options.metabaseConnectionId);
|
||||||
|
|
|
||||||
|
|
@ -148,7 +148,7 @@ describe('runLocalMetabaseIngest', () => {
|
||||||
).rejects.toThrow('no sync-enabled mappings with a target connection');
|
).rejects.toThrow('no sync-enabled mappings with a target connection');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('seeds yaml-only Metabase mappings before the unhydrated fan-out preflight', async () => {
|
it('seeds yaml-only Metabase mappings before the unhydrated fanout preflight', async () => {
|
||||||
project.config.connections['prod-metabase'].mappings = {
|
project.config.connections['prod-metabase'].mappings = {
|
||||||
databaseMappings: { '1': 'warehouse_a' },
|
databaseMappings: { '1': 'warehouse_a' },
|
||||||
syncEnabled: { '1': true },
|
syncEnabled: { '1': true },
|
||||||
|
|
@ -172,7 +172,7 @@ describe('runLocalMetabaseIngest', () => {
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('rejects source-dir uploads through the Metabase fan-out runner', async () => {
|
it('rejects source-dir uploads through the Metabase fanout runner', async () => {
|
||||||
await expect(
|
await expect(
|
||||||
runLocalMetabaseIngest({
|
runLocalMetabaseIngest({
|
||||||
project,
|
project,
|
||||||
|
|
@ -181,7 +181,7 @@ describe('runLocalMetabaseIngest', () => {
|
||||||
agentRunner: new TestAgentRunner(),
|
agentRunner: new TestAgentRunner(),
|
||||||
sourceDir: tempDir,
|
sourceDir: tempDir,
|
||||||
} as Parameters<typeof runLocalMetabaseIngest>[0] & { sourceDir: string }),
|
} as Parameters<typeof runLocalMetabaseIngest>[0] & { sourceDir: string }),
|
||||||
).rejects.toThrow('source-dir uploads are not supported for the Metabase fan-out adapter');
|
).rejects.toThrow('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('reports partial failure when a child job fails', async () => {
|
it('reports partial failure when a child job fails', async () => {
|
||||||
|
|
|
||||||
|
|
@ -533,7 +533,7 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync
|
||||||
).resolves.toBe(0);
|
).resolves.toBe(0);
|
||||||
|
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||||
expect(io.stdout()).toContain(`target=warehouse_a database=1 status=done job=${jobId}`);
|
expect(io.stdout()).toContain(`target=warehouse_a database=1 status=done job=${jobId}`);
|
||||||
|
|
||||||
const report = await getLocalIngestStatus(project, jobId);
|
const report = await getLocalIngestStatus(project, jobId);
|
||||||
|
|
|
||||||
|
|
@ -346,7 +346,7 @@ describe('runKtxIngest', () => {
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('routes metabase scheduled pulls to the fan-out runner and prints child summaries', async () => {
|
it('routes metabase scheduled pulls to the fanout runner and prints child summaries', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -397,13 +397,13 @@ describe('runKtxIngest', () => {
|
||||||
),
|
),
|
||||||
).resolves.toBe(0);
|
).resolves.toBe(0);
|
||||||
|
|
||||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||||
expect(io.stdout()).toContain('warehouse_a');
|
expect(io.stdout()).toContain('warehouse_a');
|
||||||
expect(io.stdout()).toContain('metabase-child-1');
|
expect(io.stdout()).toContain('metabase-child-1');
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('returns a non-zero code when Metabase fan-out has failed children', async () => {
|
it('returns a non-zero code when Metabase fanout has failed children', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -467,13 +467,13 @@ describe('runKtxIngest', () => {
|
||||||
),
|
),
|
||||||
).resolves.toBe(1);
|
).resolves.toBe(1);
|
||||||
|
|
||||||
expect(io.stdout()).toContain('Metabase fan-out: partial_failure');
|
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
|
||||||
expect(io.stdout()).toContain('Failed tasks: 1');
|
expect(io.stdout()).toContain('Failed tasks: 1');
|
||||||
expect(io.stdout()).toContain('status=error');
|
expect(io.stdout()).toContain('status=error');
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('prints Metabase fan-out progress before the final summary', async () => {
|
it('prints Metabase fanout progress before the final summary', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -548,11 +548,11 @@ describe('runKtxIngest', () => {
|
||||||
expect(io.stderr()).toContain('Targets: 1 mapped database');
|
expect(io.stderr()).toContain('Targets: 1 mapped database');
|
||||||
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1');
|
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1');
|
||||||
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1');
|
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1');
|
||||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||||
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
|
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('writes metabase fan-out progress to stderr and final result to stdout', async () => {
|
it('writes metabase fanout progress to stderr and final result to stdout', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo({ isTTY: true });
|
const io = makeIo({ isTTY: true });
|
||||||
|
|
@ -592,11 +592,11 @@ describe('runKtxIngest', () => {
|
||||||
|
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
expect(io.stderr()).toContain('status=running job=metabase-child-1');
|
expect(io.stderr()).toContain('status=running job=metabase-child-1');
|
||||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||||
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
|
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('emits structured progress for Metabase fan-out without writing progress to JSON output', async () => {
|
it('emits structured progress for Metabase fanout without writing progress to JSON output', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -655,7 +655,7 @@ describe('runKtxIngest', () => {
|
||||||
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('emits structured child ingest progress during Metabase fan-out', async () => {
|
it('emits structured child ingest progress during Metabase fanout', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -766,7 +766,7 @@ describe('runKtxIngest', () => {
|
||||||
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('runs Metabase scheduled ingest through the public CLI command path with real fan-out', async () => {
|
it('runs Metabase scheduled ingest through the public CLI command path with real fanout', async () => {
|
||||||
const projectDir = join(tempDir, 'metabase-cli-project');
|
const projectDir = join(tempDir, 'metabase-cli-project');
|
||||||
await writeWarehouseConfig(projectDir);
|
await writeWarehouseConfig(projectDir);
|
||||||
await writeFile(
|
await writeFile(
|
||||||
|
|
@ -838,7 +838,7 @@ describe('runKtxIngest', () => {
|
||||||
|
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
expect(io.stderr()).toContain('Targets: 2 mapped databases');
|
expect(io.stderr()).toContain('Targets: 2 mapped databases');
|
||||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||||
expect(io.stdout()).toContain('Source: prod-metabase');
|
expect(io.stdout()).toContain('Source: prod-metabase');
|
||||||
expect(io.stdout()).toContain('Children: 2');
|
expect(io.stdout()).toContain('Children: 2');
|
||||||
expect(io.stdout()).toContain('target=warehouse_a database=1 status=done job=metabase-child-1');
|
expect(io.stdout()).toContain('target=warehouse_a database=1 status=done job=metabase-child-1');
|
||||||
|
|
@ -893,7 +893,7 @@ describe('runKtxIngest', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('prints metabase fan-out JSON results', async () => {
|
it('prints metabase fanout JSON results', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -967,7 +967,7 @@ describe('runKtxIngest', () => {
|
||||||
expect(io.stderr()).toBe('');
|
expect(io.stderr()).toBe('');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('rejects source-dir uploads through the metabase fan-out route', async () => {
|
it('rejects source-dir uploads through the metabase fanout route', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -985,13 +985,13 @@ describe('runKtxIngest', () => {
|
||||||
io.io,
|
io.io,
|
||||||
{
|
{
|
||||||
runLocalMetabaseIngest: async () => {
|
runLocalMetabaseIngest: async () => {
|
||||||
throw new Error('fan-out should not be called');
|
throw new Error('fanout should not be called');
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
).resolves.toBe(1);
|
).resolves.toBe(1);
|
||||||
|
|
||||||
expect(io.stderr()).toContain('source-dir uploads are not supported for the Metabase fan-out adapter');
|
expect(io.stderr()).toContain('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||||
expect(io.stderr()).not.toContain('ktx ingest requires llm.provider.backend');
|
expect(io.stderr()).not.toContain('ktx ingest requires llm.provider.backend');
|
||||||
expect(io.stdout()).toBe('');
|
expect(io.stdout()).toBe('');
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -222,7 +222,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng
|
||||||
},
|
},
|
||||||
{ wikiCount: 0, slCount: 0 },
|
{ wikiCount: 0, slCount: 0 },
|
||||||
);
|
);
|
||||||
io.stdout.write(`Metabase fan-out: ${result.status}\n`);
|
io.stdout.write(`Metabase fanout: ${result.status}\n`);
|
||||||
io.stdout.write(`Source: ${result.metabaseConnectionId}\n`);
|
io.stdout.write(`Source: ${result.metabaseConnectionId}\n`);
|
||||||
io.stdout.write(`Children: ${result.children.length}\n`);
|
io.stdout.write(`Children: ${result.children.length}\n`);
|
||||||
if (result.totals) {
|
if (result.totals) {
|
||||||
|
|
@ -719,7 +719,7 @@ export async function runKtxIngest(
|
||||||
localIngestOptions.queryExecutor ??
|
localIngestOptions.queryExecutor ??
|
||||||
(deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(ingestProject);
|
(deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(ingestProject);
|
||||||
if (args.adapter === 'metabase' && args.sourceDir) {
|
if (args.adapter === 'metabase' && args.sourceDir) {
|
||||||
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');
|
throw new Error('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||||
}
|
}
|
||||||
if (args.adapter === 'metabase') {
|
if (args.adapter === 'metabase') {
|
||||||
const executeMetabaseFanout = deps.runLocalMetabaseIngest ?? runLocalMetabaseIngest;
|
const executeMetabaseFanout = deps.runLocalMetabaseIngest ?? runLocalMetabaseIngest;
|
||||||
|
|
|
||||||
|
|
@ -124,7 +124,7 @@ Every standalone column requires `name` and `type`. Overlays have computed colum
|
||||||
|
|
||||||
### Grain
|
### Grain
|
||||||
|
|
||||||
`grain: [col_a, col_b]` - the set of columns that uniquely identify one row. The query engine uses grain to prevent fan-out in joins. Overlays inherit grain from the manifest unless they override.
|
`grain: [col_a, col_b]` - the set of columns that uniquely identify one row. The query engine uses grain to prevent fanout in joins. Overlays inherit grain from the manifest unless they override.
|
||||||
|
|
||||||
### Joins
|
### Joins
|
||||||
|
|
||||||
|
|
@ -177,7 +177,7 @@ The reverse edge (wiki pages that cite this source) is derived automatically fro
|
||||||
|
|
||||||
## Part 2 - Querying via `sl_query`
|
## Part 2 - Querying via `sl_query`
|
||||||
|
|
||||||
The `sl_query` tool generates correct SQL from a structured query. It handles joins, fan-out prevention, aggregation correctness, and filter classification automatically. Prefer it over writing raw SQL whenever the SL has the relevant sources.
|
The `sl_query` tool generates correct SQL from a structured query. It handles joins, fanout prevention, aggregation correctness, and filter classification automatically. Prefer it over writing raw SQL whenever the SL has the relevant sources.
|
||||||
|
|
||||||
### When to prefer sl_query over raw SQL
|
### When to prefer sl_query over raw SQL
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ uv run python -m semantic_layer.cli --model /tmp/model.yaml \
|
||||||
-q '{"measures":["orders.revenue"],"dimensions":["customers.segment"]}' --suggest
|
-q '{"measures":["orders.revenue"],"dimensions":["customers.segment"]}' --suggest
|
||||||
```
|
```
|
||||||
|
|
||||||
### 3. Test fan-out / chasm traps
|
### 3. Test fanout / chasm traps
|
||||||
|
|
||||||
Add multiple measure sources that fan out from a shared dimension hub:
|
Add multiple measure sources that fan out from a shared dimension hub:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -160,7 +160,7 @@ def print_plan(plan) -> None:
|
||||||
print(" Joins:")
|
print(" Joins:")
|
||||||
for jp in plan.join_paths:
|
for jp in plan.join_paths:
|
||||||
print(f" {jp}")
|
print(f" {jp}")
|
||||||
print(f" Fan-out: {plan.fan_out_description}")
|
print(f" Fanout: {plan.fan_out_description}")
|
||||||
if plan.aggregate_locality:
|
if plan.aggregate_locality:
|
||||||
print(" Locality:")
|
print(" Locality:")
|
||||||
for al in plan.aggregate_locality:
|
for al in plan.aggregate_locality:
|
||||||
|
|
|
||||||
|
|
@ -92,7 +92,7 @@ class SqlGenerator:
|
||||||
return "WITH " + source_header + ",\n" + rest
|
return "WITH " + source_header + ",\n" + rest
|
||||||
return "WITH " + source_header + "\n" + outer_transpiled
|
return "WITH " + source_header + "\n" + outer_transpiled
|
||||||
|
|
||||||
# ── Path A: Simple (no fan-out) ────────────────────────────────────
|
# ── Path A: Simple (no fanout) ────────────────────────────────────
|
||||||
|
|
||||||
def _generate_simple(
|
def _generate_simple(
|
||||||
self, plan: ResolvedPlan, sources: dict[str, SourceDefinition]
|
self, plan: ResolvedPlan, sources: dict[str, SourceDefinition]
|
||||||
|
|
@ -216,7 +216,7 @@ class SqlGenerator:
|
||||||
shared_dim_aliases = shared_dim_aliases or set()
|
shared_dim_aliases = shared_dim_aliases or set()
|
||||||
shared_dims = [dk for dk in all_dim_keys if dk["alias"] in shared_dim_aliases]
|
shared_dims = [dk for dk in all_dim_keys if dk["alias"] in shared_dim_aliases]
|
||||||
|
|
||||||
# Validate grain consistency: asymmetric dims cause FULL JOIN fan-out
|
# Validate grain consistency: asymmetric dims cause FULL JOIN fanout
|
||||||
if len(plan.measure_groups) > 1:
|
if len(plan.measure_groups) > 1:
|
||||||
for group in plan.measure_groups:
|
for group in plan.measure_groups:
|
||||||
cte_dim_aliases = {
|
cte_dim_aliases = {
|
||||||
|
|
|
||||||
|
|
@ -107,7 +107,7 @@ class QueryPlanner:
|
||||||
for e in tree.edges
|
for e in tree.edges
|
||||||
]
|
]
|
||||||
|
|
||||||
# 8. Detect fan-out / chasm trap
|
# 8. Detect fanout / chasm trap
|
||||||
has_fan_out, measure_groups, fan_out_desc, locality_descs = (
|
has_fan_out, measure_groups, fan_out_desc, locality_descs = (
|
||||||
self._detect_fan_out(measures, dimensions, tree, filters=query.filters)
|
self._detect_fan_out(measures, dimensions, tree, filters=query.filters)
|
||||||
)
|
)
|
||||||
|
|
@ -937,7 +937,7 @@ class QueryPlanner:
|
||||||
filters: list[str] | None = None,
|
filters: list[str] | None = None,
|
||||||
) -> tuple[bool, list[MeasureGroup], str, list[str]]:
|
) -> tuple[bool, list[MeasureGroup], str, list[str]]:
|
||||||
"""
|
"""
|
||||||
Detect fan-out and chasm traps. Group measures by source.
|
Detect fanout and chasm traps. Group measures by source.
|
||||||
If multiple measure sources exist, each needs its own pre-aggregation CTE.
|
If multiple measure sources exist, each needs its own pre-aggregation CTE.
|
||||||
Also checks filter sources — a filter forcing a one_to_many join from the
|
Also checks filter sources — a filter forcing a one_to_many join from the
|
||||||
measure source is an error (cannot be safely pre-aggregated).
|
measure source is an error (cannot be safely pre-aggregated).
|
||||||
|
|
@ -991,7 +991,7 @@ class QueryPlanner:
|
||||||
|
|
||||||
if len(groups) <= 1:
|
if len(groups) <= 1:
|
||||||
# Single measure group: check the path FROM measure source TO dimension sources.
|
# Single measure group: check the path FROM measure source TO dimension sources.
|
||||||
# Only flag fan-out if those specific paths have one_to_many edges.
|
# Only flag fanout if those specific paths have one_to_many edges.
|
||||||
if groups:
|
if groups:
|
||||||
source_name = next(iter(groups))
|
source_name = next(iter(groups))
|
||||||
source_actual = self.graph.alias_map.get(source_name, source_name)
|
source_actual = self.graph.alias_map.get(source_name, source_name)
|
||||||
|
|
@ -999,7 +999,7 @@ class QueryPlanner:
|
||||||
for dim_src in dim_sources:
|
for dim_src in dim_sources:
|
||||||
if dim_src == source_name:
|
if dim_src == source_name:
|
||||||
continue
|
continue
|
||||||
# Skip alias siblings (same underlying source — no fan-out)
|
# Skip alias siblings (same underlying source — no fanout)
|
||||||
dim_actual = self.graph.alias_map.get(dim_src, dim_src)
|
dim_actual = self.graph.alias_map.get(dim_src, dim_src)
|
||||||
if dim_actual == source_actual:
|
if dim_actual == source_actual:
|
||||||
continue
|
continue
|
||||||
|
|
@ -1008,7 +1008,7 @@ class QueryPlanner:
|
||||||
has_o2m = True
|
has_o2m = True
|
||||||
break
|
break
|
||||||
|
|
||||||
# Also check filter sources for one_to_many fan-out
|
# Also check filter sources for one_to_many fanout
|
||||||
if not has_o2m:
|
if not has_o2m:
|
||||||
for filter_src in filter_sources - dim_sources - {source_name}:
|
for filter_src in filter_sources - dim_sources - {source_name}:
|
||||||
filter_actual = self.graph.alias_map.get(filter_src, filter_src)
|
filter_actual = self.graph.alias_map.get(filter_src, filter_src)
|
||||||
|
|
@ -1019,7 +1019,7 @@ class QueryPlanner:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Filter on '{filter_src}' requires a one_to_many join "
|
f"Filter on '{filter_src}' requires a one_to_many join "
|
||||||
f"from measure source '{source_name}', which would cause "
|
f"from measure source '{source_name}', which would cause "
|
||||||
f"incorrect aggregation (fan-out). Consider rewriting the "
|
f"incorrect aggregation (fanout). Consider rewriting the "
|
||||||
f"filter as a subquery or adding the filter source as a "
|
f"filter as a subquery or adding the filter source as a "
|
||||||
f"dimension source."
|
f"dimension source."
|
||||||
)
|
)
|
||||||
|
|
@ -1033,10 +1033,10 @@ class QueryPlanner:
|
||||||
return (
|
return (
|
||||||
True,
|
True,
|
||||||
measure_groups,
|
measure_groups,
|
||||||
f"Fan-out detected: one_to_many edges from {source_name} to dimensions",
|
f"Fanout detected: one_to_many edges from {source_name} to dimensions",
|
||||||
[f"Pre-aggregate {source_name} measures before joining"],
|
[f"Pre-aggregate {source_name} measures before joining"],
|
||||||
)
|
)
|
||||||
return False, [], "No fan-out", []
|
return False, [], "No fanout", []
|
||||||
|
|
||||||
# Multiple measure sources. Only merge groups that are provably row-safe
|
# Multiple measure sources. Only merge groups that are provably row-safe
|
||||||
# (alias siblings or pure one_to_one chains). many_to_one chains are not
|
# (alias siblings or pure one_to_one chains). many_to_one chains are not
|
||||||
|
|
@ -1048,7 +1048,7 @@ class QueryPlanner:
|
||||||
# All measure sources are on the same safe join chain
|
# All measure sources are on the same safe join chain
|
||||||
if merged_groups:
|
if merged_groups:
|
||||||
mg_name, mg_measures = next(iter(merged_groups.items()))
|
mg_name, mg_measures = next(iter(merged_groups.items()))
|
||||||
# Still check if there's fan-out to dimension sources
|
# Still check if there's fanout to dimension sources
|
||||||
has_o2m = False
|
has_o2m = False
|
||||||
for dim_src in dim_sources:
|
for dim_src in dim_sources:
|
||||||
if dim_src == mg_name:
|
if dim_src == mg_name:
|
||||||
|
|
@ -1061,10 +1061,10 @@ class QueryPlanner:
|
||||||
return (
|
return (
|
||||||
True,
|
True,
|
||||||
[MeasureGroup(source_name=mg_name, measures=mg_measures)],
|
[MeasureGroup(source_name=mg_name, measures=mg_measures)],
|
||||||
f"Fan-out detected: one_to_many edges from {mg_name} to dimensions",
|
f"Fanout detected: one_to_many edges from {mg_name} to dimensions",
|
||||||
[f"Pre-aggregate {mg_name} measures before joining"],
|
[f"Pre-aggregate {mg_name} measures before joining"],
|
||||||
)
|
)
|
||||||
return False, [], "No fan-out", []
|
return False, [], "No fanout", []
|
||||||
|
|
||||||
# True chasm trap — independent measure sources that can't be safely merged.
|
# True chasm trap — independent measure sources that can't be safely merged.
|
||||||
# Before building groups, validate that all filter sources are reachable
|
# Before building groups, validate that all filter sources are reachable
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
"""Dedicated tests for aggregate locality (fan-out/chasm trap correctness)."""
|
"""Dedicated tests for aggregate locality (fanout/chasm trap correctness)."""
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import sqlglot
|
import sqlglot
|
||||||
|
|
@ -213,7 +213,7 @@ class TestNoFanOut:
|
||||||
sqlglot.parse(sql)
|
sqlglot.parse(sql)
|
||||||
|
|
||||||
def test_m2o_join_no_ctes(self, ecommerce_sources):
|
def test_m2o_join_no_ctes(self, ecommerce_sources):
|
||||||
"""orders → customers is m2o, no fan-out."""
|
"""orders → customers is m2o, no fanout."""
|
||||||
graph = JoinGraph(ecommerce_sources)
|
graph = JoinGraph(ecommerce_sources)
|
||||||
graph.build()
|
graph.build()
|
||||||
planner = QueryPlanner(ecommerce_sources, graph)
|
planner = QueryPlanner(ecommerce_sources, graph)
|
||||||
|
|
@ -540,7 +540,7 @@ class TestFactSideDimensionsInChasm:
|
||||||
"""LIMIT 1: Fact-side dimensions in chasm trap (local to one CTE only)."""
|
"""LIMIT 1: Fact-side dimensions in chasm trap (local to one CTE only)."""
|
||||||
|
|
||||||
def test_fact_side_dimension_in_chasm_raises_error(self):
|
def test_fact_side_dimension_in_chasm_raises_error(self):
|
||||||
"""Asymmetric dim from fact_a only → raises error (would cause FULL JOIN fan-out)."""
|
"""Asymmetric dim from fact_a only → raises error (would cause FULL JOIN fanout)."""
|
||||||
hub = SourceDefinition(
|
hub = SourceDefinition(
|
||||||
name="hub",
|
name="hub",
|
||||||
table="public.hub",
|
table="public.hub",
|
||||||
|
|
@ -977,7 +977,7 @@ class TestBug13_FalseChasm_AliasAggregate:
|
||||||
dimensions=["billing_customer.name", "shipping_customer.name"],
|
dimensions=["billing_customer.name", "shipping_customer.name"],
|
||||||
)
|
)
|
||||||
plan = planner.plan(query)
|
plan = planner.plan(query)
|
||||||
assert not plan.has_fan_out, "Should not detect fan-out between alias siblings"
|
assert not plan.has_fan_out, "Should not detect fanout between alias siblings"
|
||||||
sql = gen.generate(plan, sources)
|
sql = gen.generate(plan, sources)
|
||||||
sqlglot.parse(sql)
|
sqlglot.parse(sql)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -305,12 +305,12 @@ class TestPredefinedMeasureDeps:
|
||||||
assert "GROUP BY" in sql.upper()
|
assert "GROUP BY" in sql.upper()
|
||||||
|
|
||||||
|
|
||||||
# ── Planner: fan-out with one_to_many to dimension sources (lines 595-643) ──
|
# ── Planner: fanout with one_to_many to dimension sources (lines 595-643) ──
|
||||||
|
|
||||||
|
|
||||||
class TestFanOutEdgeCases:
|
class TestFanOutEdgeCases:
|
||||||
def test_single_source_fan_out_to_dimension(self):
|
def test_single_source_fan_out_to_dimension(self):
|
||||||
"""Measure source with one_to_many to dimension should trigger fan-out."""
|
"""Measure source with one_to_many to dimension should trigger fanout."""
|
||||||
hub = SourceDefinition(
|
hub = SourceDefinition(
|
||||||
name="hub",
|
name="hub",
|
||||||
table="public.hub",
|
table="public.hub",
|
||||||
|
|
|
||||||
|
|
@ -89,10 +89,10 @@ class TestCrossSourceM2O:
|
||||||
|
|
||||||
|
|
||||||
class TestFanOut:
|
class TestFanOut:
|
||||||
"""Test 3: Fan-out (aggregate locality)."""
|
"""Test 3: Fanout (aggregate locality)."""
|
||||||
|
|
||||||
def test_orders_by_region_no_fanout(self, planner, generator, ecommerce_sources):
|
def test_orders_by_region_no_fanout(self, planner, generator, ecommerce_sources):
|
||||||
"""orders → customers → regions is all m2o. No fan-out needed."""
|
"""orders → customers → regions is all m2o. No fanout needed."""
|
||||||
sql = generate_sql(
|
sql = generate_sql(
|
||||||
planner,
|
planner,
|
||||||
generator,
|
generator,
|
||||||
|
|
|
||||||
|
|
@ -200,12 +200,12 @@ class TestFanOutDetection:
|
||||||
|
|
||||||
|
|
||||||
class TestFanOutSingleSource:
|
class TestFanOutSingleSource:
|
||||||
"""Fan-out when a single measure source has o2m path to dimension source."""
|
"""Fanout when a single measure source has o2m path to dimension source."""
|
||||||
|
|
||||||
def test_reverse_path_fan_out(self):
|
def test_reverse_path_fan_out(self):
|
||||||
"""Querying from customers (dimension) with measures from orders triggers fan-out
|
"""Querying from customers (dimension) with measures from orders triggers fanout
|
||||||
when the path from the measure source (orders) to the dimension source (customers)
|
when the path from the measure source (orders) to the dimension source (customers)
|
||||||
is m2o — so no fan-out. But reversed: measure on customers, dim on orders."""
|
is m2o — so no fanout. But reversed: measure on customers, dim on orders."""
|
||||||
customers = SourceDefinition(
|
customers = SourceDefinition(
|
||||||
name="customers",
|
name="customers",
|
||||||
table="t",
|
table="t",
|
||||||
|
|
@ -248,7 +248,7 @@ class TestFanOutSingleSource:
|
||||||
assert plan.has_fan_out
|
assert plan.has_fan_out
|
||||||
|
|
||||||
def test_m2o_multi_hop_no_fan_out(self, planner):
|
def test_m2o_multi_hop_no_fan_out(self, planner):
|
||||||
"""orders → customers → regions is all m2o. No fan-out."""
|
"""orders → customers → regions is all m2o. No fanout."""
|
||||||
query = SemanticQuery(
|
query = SemanticQuery(
|
||||||
measures=["sum(orders.amount)"],
|
measures=["sum(orders.amount)"],
|
||||||
dimensions=["regions.name"],
|
dimensions=["regions.name"],
|
||||||
|
|
@ -1116,7 +1116,7 @@ class TestDerivedMeasureEdgeCases:
|
||||||
assert_valid_sql(result.sql)
|
assert_valid_sql(result.sql)
|
||||||
|
|
||||||
|
|
||||||
# ── From test_edge_cases.py: filter fan-out detection ────────────────
|
# ── From test_edge_cases.py: filter fanout detection ────────────────
|
||||||
|
|
||||||
|
|
||||||
class TestFilterFanOutDetection:
|
class TestFilterFanOutDetection:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue