mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
Merge remote-tracking branch 'origin/main' into connector-solid-review
This commit is contained in:
commit
4c303db2a4
23 changed files with 86 additions and 66 deletions
5
.github/workflows/ci.yml
vendored
5
.github/workflows/ci.yml
vendored
|
|
@ -10,6 +10,11 @@ on:
|
|||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
DO_NOT_TRACK: "1"
|
||||
KTX_TELEMETRY_DISABLED: "1"
|
||||
NEXT_TELEMETRY_DISABLED: "1"
|
||||
|
||||
concurrency:
|
||||
group: ktx-ci-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
|
|
|||
5
.github/workflows/release.yml
vendored
5
.github/workflows/release.yml
vendored
|
|
@ -26,6 +26,11 @@ permissions:
|
|||
contents: write
|
||||
id-token: write
|
||||
|
||||
env:
|
||||
DO_NOT_TRACK: "1"
|
||||
KTX_TELEMETRY_DISABLED: "1"
|
||||
NEXT_TELEMETRY_DISABLED: "1"
|
||||
|
||||
concurrency:
|
||||
group: ktx-release-${{ github.ref }}
|
||||
cancel-in-progress: false
|
||||
|
|
|
|||
5
.github/workflows/triage-issues.yml
vendored
5
.github/workflows/triage-issues.yml
vendored
|
|
@ -7,6 +7,11 @@ on:
|
|||
permissions:
|
||||
issues: write
|
||||
|
||||
env:
|
||||
DO_NOT_TRACK: "1"
|
||||
KTX_TELEMETRY_DISABLED: "1"
|
||||
NEXT_TELEMETRY_DISABLED: "1"
|
||||
|
||||
jobs:
|
||||
label-external:
|
||||
name: Add needs-triage to external issues
|
||||
|
|
|
|||
|
|
@ -24,6 +24,11 @@ database migrations, ORPC contracts, or `python-service/` layout exist here.
|
|||
- **MUST**: Keep package/public API changes intentional. Do not add compatibility
|
||||
wrappers for old **ktx** names unless the user explicitly asks for a migration
|
||||
bridge.
|
||||
- **MUST**: Avoid compatibility shims for old **ktx** features, command shapes,
|
||||
configuration formats, or internal APIs. This rule does not prohibit
|
||||
compatibility support for third-party systems and libraries, such as
|
||||
Metabase version differences. Keep the **ktx** codebase clean instead of
|
||||
preserving stale **ktx** behavior.
|
||||
- **MUST**: Treat **ktx** as having no public users unless the user says otherwise.
|
||||
Legacy support is not necessary by default; prefer clean breaking changes over
|
||||
compatibility shims, migration bridges, or preserved stale behavior.
|
||||
|
|
|
|||
|
|
@ -253,7 +253,7 @@ const engine: EngineNode = {
|
|||
},
|
||||
{
|
||||
index: 3,
|
||||
title: "Detect fan-out",
|
||||
title: "Detect fanout",
|
||||
detail: "group measures by source, flag chasm traps",
|
||||
},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import { SemanticLayerFlow } from "@/components/semantic-layer-flow";
|
|||
**ktx**'s semantic layer is a compiler that turns intent into SQL. The agent
|
||||
declares _what_ it wants - measures, dimensions, filters - in a small
|
||||
semantic query. **ktx** figures out the _how_: which tables to join, what
|
||||
grain to aggregate at, how to keep fan-out from inflating measures, and
|
||||
grain to aggregate at, how to keep fanout from inflating measures, and
|
||||
what dialect the warehouse speaks.
|
||||
|
||||
This page covers four mechanics:
|
||||
|
|
@ -16,7 +16,7 @@ This page covers four mechanics:
|
|||
- The semantic query contract agents send to the compiler.
|
||||
- The planner steps that turn a semantic query into SQL.
|
||||
- The join graph that backs those steps, and how it's built.
|
||||
- The fan-out failure mode the compiler is designed to prevent.
|
||||
- The fanout failure mode the compiler is designed to prevent.
|
||||
|
||||
## Imperative SQL vs declarative semantic querying
|
||||
|
||||
|
|
@ -84,14 +84,14 @@ same ordered steps before any SQL is emitted.
|
|||
2. **Pick an anchor and build the join tree.** Choose the largest measure
|
||||
source as the root, then run a shortest-path search across the typed
|
||||
join graph to reach every required source.
|
||||
3. **Detect fan-out.** Group measures by their owning source. If more
|
||||
3. **Detect fanout.** Group measures by their owning source. If more
|
||||
than one group exists, the planner marks the query as a chasm trap
|
||||
and switches to aggregate-locality compilation.
|
||||
4. **Classify filters.** Split predicates into row-level (`WHERE`) and
|
||||
aggregate-level (`HAVING`) based on whether they reference a measure.
|
||||
5. **Generate SQL.** Emit Postgres-shaped SQL with the right shape:
|
||||
single-source aggregation when the query is safe, per-source CTEs
|
||||
when fan-out is present.
|
||||
when fanout is present.
|
||||
6. **Transpile to the target dialect.** Run the result through `sqlglot`
|
||||
so the warehouse receives syntax it understands.
|
||||
|
||||
|
|
@ -107,7 +107,7 @@ inverted, so the planner can traverse from any anchor.
|
|||
| Relationship | Planning impact |
|
||||
|--------------|-----------------|
|
||||
| `many_to_one` | Safe direction for adding dimensions |
|
||||
| `one_to_many` | Multiplies measures and triggers fan-out handling |
|
||||
| `one_to_many` | Multiplies measures and triggers fanout handling |
|
||||
| `one_to_one` | Safe in either direction when keys match |
|
||||
| Equal-cost paths | Treated as ambiguous; aliases or explicit joins resolve them |
|
||||
|
||||
|
|
@ -286,9 +286,9 @@ inference. Each input contributes a different kind of authority.
|
|||
</div>
|
||||
</div>
|
||||
|
||||
## Fan-out and aggregate locality
|
||||
## Fanout and aggregate locality
|
||||
|
||||
Fan-out is the classic analytics failure mode. Two fact tables join to a
|
||||
Fanout is the classic analytics failure mode. Two fact tables join to a
|
||||
shared dimension. A naive query joins them all together first, so each
|
||||
row from one fact is multiplied by the matching rows from the other.
|
||||
Measures duplicate, numbers go wrong, and the agent doesn't notice.
|
||||
|
|
@ -336,5 +336,5 @@ different from what the agent first proposed.
|
|||
| Explain the semantic query shape | The semantic query contract | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||
| Describe what the planner does between query and SQL | What the planner does | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||
| Explain why **ktx** asks for grain and relationship types | The join graph | [Writing context](/docs/guides/writing-context) |
|
||||
| Diagnose duplicated measures after a join | Fan-out and aggregate locality | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||
| Diagnose duplicated measures after a join | Fanout and aggregate locality | [ktx sl](/docs/cli-reference/ktx-sl) |
|
||||
| Describe how semantic context stays current | Building and maintaining the graph | [Reviewing Context](/docs/guides/reviewing-context) |
|
||||
|
|
|
|||
|
|
@ -156,7 +156,7 @@ joins:
|
|||
relationship: many_to_one
|
||||
```
|
||||
|
||||
For how the compiler walks the join graph, handles fan-out, and transpiles
|
||||
For how the compiler walks the join graph, handles fanout, and transpiles
|
||||
dialects, read [Semantic querying](/docs/concepts/semantic-layer-internals).
|
||||
|
||||
## Wiki pages
|
||||
|
|
@ -240,7 +240,7 @@ models every time the warehouse changes.
|
|||
| **Surface** | Indexed docs and chats | Modeling language or runtime | YAML and Markdown files |
|
||||
| **Data-stack awareness** | None - treats data tools as text | High for declared metrics, none for the surrounding warehouse | Built in: scans schemas, dbt, BI tools, and query history |
|
||||
| **Maintenance** | Manual page authoring | Manual modeling, model-per-change | Auto-maintained: reconciles evidence with accepted files |
|
||||
| **SQL safety** | None - generates plausible text | Compiled, dialect-correct | Compiled with join-graph and fan-out handling |
|
||||
| **SQL safety** | None - generates plausible text | Compiled, dialect-correct | Compiled with join-graph and fanout handling |
|
||||
| **Agent edit loop** | Text-only | Tied to the modeling workflow | First-class: patch files, validate, review diffs |
|
||||
|
||||
If you already use MetricFlow, LookML, dbt, or BI tools, **ktx** can ingest that
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ referent (e.g., body of a `Semantic sources` page, or `sourceName` as a CLI arg)
|
|||
| Wiki surface as a whole | **wiki** | "wiki context" |
|
||||
| A single Markdown file | **wiki page** | — |
|
||||
| YAML vs Markdown contrast | **wiki Markdown** (only when contrasting with **semantic source YAML**) | — |
|
||||
| Joins multiplying rows (generic) | **fan-out** | — |
|
||||
| Joins multiplying rows (generic) | **fanout** | — |
|
||||
| The two named patterns | **chasm trap** / **fan trap** | — |
|
||||
| Casual gloss in user prose | **double-count** | (avoid in technical/internals prose) |
|
||||
|
||||
|
|
|
|||
|
|
@ -336,7 +336,7 @@ export async function runLocalMetabaseIngest(
|
|||
options: RunLocalMetabaseIngestOptions,
|
||||
): Promise<LocalMetabaseFanoutResult> {
|
||||
if ((options as RunLocalMetabaseIngestOptions & { sourceDir?: string }).sourceDir) {
|
||||
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');
|
||||
throw new Error('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||
}
|
||||
|
||||
const metabaseConnectionId = safeSegment('metabase connection id', options.metabaseConnectionId);
|
||||
|
|
|
|||
|
|
@ -222,7 +222,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng
|
|||
},
|
||||
{ wikiCount: 0, slCount: 0 },
|
||||
);
|
||||
io.stdout.write(`Metabase fan-out: ${result.status}\n`);
|
||||
io.stdout.write(`Metabase fanout: ${result.status}\n`);
|
||||
io.stdout.write(`Source: ${result.metabaseConnectionId}\n`);
|
||||
io.stdout.write(`Children: ${result.children.length}\n`);
|
||||
if (result.totals) {
|
||||
|
|
@ -719,7 +719,7 @@ export async function runKtxIngest(
|
|||
localIngestOptions.queryExecutor ??
|
||||
(deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(ingestProject);
|
||||
if (args.adapter === 'metabase' && args.sourceDir) {
|
||||
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');
|
||||
throw new Error('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||
}
|
||||
if (args.adapter === 'metabase') {
|
||||
const executeMetabaseFanout = deps.runLocalMetabaseIngest ?? runLocalMetabaseIngest;
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ Every standalone column requires `name` and `type`. Overlays have computed colum
|
|||
|
||||
### Grain
|
||||
|
||||
`grain: [col_a, col_b]` - the set of columns that uniquely identify one row. The query engine uses grain to prevent fan-out in joins. Overlays inherit grain from the manifest unless they override.
|
||||
`grain: [col_a, col_b]` - the set of columns that uniquely identify one row. The query engine uses grain to prevent fanout in joins. Overlays inherit grain from the manifest unless they override.
|
||||
|
||||
### Joins
|
||||
|
||||
|
|
@ -177,7 +177,7 @@ The reverse edge (wiki pages that cite this source) is derived automatically fro
|
|||
|
||||
## Part 2 - Querying via `sl_query`
|
||||
|
||||
The `sl_query` tool generates correct SQL from a structured query. It handles joins, fan-out prevention, aggregation correctness, and filter classification automatically. Prefer it over writing raw SQL whenever the SL has the relevant sources.
|
||||
The `sl_query` tool generates correct SQL from a structured query. It handles joins, fanout prevention, aggregation correctness, and filter classification automatically. Prefer it over writing raw SQL whenever the SL has the relevant sources.
|
||||
|
||||
### When to prefer sl_query over raw SQL
|
||||
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ describe('local ingest adapters', () => {
|
|||
expect(looker?.fetch).toBeTypeOf('function');
|
||||
});
|
||||
|
||||
it('returns the explicit Metabase fan-out boundary before runner construction', async () => {
|
||||
it('returns the explicit Metabase fanout boundary before runner construction', async () => {
|
||||
const metabase = createDefaultLocalIngestAdapters(project).find((adapter) => adapter.source === 'metabase');
|
||||
|
||||
await expect(localPullConfigForAdapter(project, metabase!, 'warehouse')).rejects.toThrow(
|
||||
|
|
|
|||
|
|
@ -148,7 +148,7 @@ describe('runLocalMetabaseIngest', () => {
|
|||
).rejects.toThrow('no sync-enabled mappings with a target connection');
|
||||
});
|
||||
|
||||
it('seeds yaml-only Metabase mappings before the unhydrated fan-out preflight', async () => {
|
||||
it('seeds yaml-only Metabase mappings before the unhydrated fanout preflight', async () => {
|
||||
project.config.connections['prod-metabase'].mappings = {
|
||||
databaseMappings: { '1': 'warehouse_a' },
|
||||
syncEnabled: { '1': true },
|
||||
|
|
@ -172,7 +172,7 @@ describe('runLocalMetabaseIngest', () => {
|
|||
]);
|
||||
});
|
||||
|
||||
it('rejects source-dir uploads through the Metabase fan-out runner', async () => {
|
||||
it('rejects source-dir uploads through the Metabase fanout runner', async () => {
|
||||
await expect(
|
||||
runLocalMetabaseIngest({
|
||||
project,
|
||||
|
|
@ -181,7 +181,7 @@ describe('runLocalMetabaseIngest', () => {
|
|||
agentRunner: new TestAgentRunner(),
|
||||
sourceDir: tempDir,
|
||||
} as Parameters<typeof runLocalMetabaseIngest>[0] & { sourceDir: string }),
|
||||
).rejects.toThrow('source-dir uploads are not supported for the Metabase fan-out adapter');
|
||||
).rejects.toThrow('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||
});
|
||||
|
||||
it('reports partial failure when a child job fails', async () => {
|
||||
|
|
|
|||
|
|
@ -533,7 +533,7 @@ export async function runPublicMetabaseSyncModeCase(tempDir: string, input: Sync
|
|||
).resolves.toBe(0);
|
||||
|
||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
||||
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||
expect(io.stdout()).toContain(`target=warehouse_a database=1 status=done job=${jobId}`);
|
||||
|
||||
const report = await getLocalIngestStatus(project, jobId);
|
||||
|
|
|
|||
|
|
@ -346,7 +346,7 @@ describe('runKtxIngest', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('routes metabase scheduled pulls to the fan-out runner and prints child summaries', async () => {
|
||||
it('routes metabase scheduled pulls to the fanout runner and prints child summaries', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -397,13 +397,13 @@ describe('runKtxIngest', () => {
|
|||
),
|
||||
).resolves.toBe(0);
|
||||
|
||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
||||
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||
expect(io.stdout()).toContain('warehouse_a');
|
||||
expect(io.stdout()).toContain('metabase-child-1');
|
||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('returns a non-zero code when Metabase fan-out has failed children', async () => {
|
||||
it('returns a non-zero code when Metabase fanout has failed children', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -467,13 +467,13 @@ describe('runKtxIngest', () => {
|
|||
),
|
||||
).resolves.toBe(1);
|
||||
|
||||
expect(io.stdout()).toContain('Metabase fan-out: partial_failure');
|
||||
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
|
||||
expect(io.stdout()).toContain('Failed tasks: 1');
|
||||
expect(io.stdout()).toContain('status=error');
|
||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('prints Metabase fan-out progress before the final summary', async () => {
|
||||
it('prints Metabase fanout progress before the final summary', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -548,11 +548,11 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stderr()).toContain('Targets: 1 mapped database');
|
||||
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=running job=metabase-child-1');
|
||||
expect(io.stderr()).toContain('- database=1 target=warehouse_a status=done job=metabase-child-1');
|
||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
||||
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
|
||||
});
|
||||
|
||||
it('writes metabase fan-out progress to stderr and final result to stdout', async () => {
|
||||
it('writes metabase fanout progress to stderr and final result to stdout', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo({ isTTY: true });
|
||||
|
|
@ -592,11 +592,11 @@ describe('runKtxIngest', () => {
|
|||
|
||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
expect(io.stderr()).toContain('status=running job=metabase-child-1');
|
||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
||||
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||
expect(io.stdout()).not.toContain('status=running job=metabase-child-1');
|
||||
});
|
||||
|
||||
it('emits structured progress for Metabase fan-out without writing progress to JSON output', async () => {
|
||||
it('emits structured progress for Metabase fanout without writing progress to JSON output', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -655,7 +655,7 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('emits structured child ingest progress during Metabase fan-out', async () => {
|
||||
it('emits structured child ingest progress during Metabase fanout', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -766,7 +766,7 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('runs Metabase scheduled ingest through the public CLI command path with real fan-out', async () => {
|
||||
it('runs Metabase scheduled ingest through the public CLI command path with real fanout', async () => {
|
||||
const projectDir = join(tempDir, 'metabase-cli-project');
|
||||
await writeWarehouseConfig(projectDir);
|
||||
await writeFile(
|
||||
|
|
@ -838,7 +838,7 @@ describe('runKtxIngest', () => {
|
|||
|
||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
expect(io.stderr()).toContain('Targets: 2 mapped databases');
|
||||
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
|
||||
expect(io.stdout()).toContain('Metabase fanout: all_succeeded');
|
||||
expect(io.stdout()).toContain('Source: prod-metabase');
|
||||
expect(io.stdout()).toContain('Children: 2');
|
||||
expect(io.stdout()).toContain('target=warehouse_a database=1 status=done job=metabase-child-1');
|
||||
|
|
@ -893,7 +893,7 @@ describe('runKtxIngest', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('prints metabase fan-out JSON results', async () => {
|
||||
it('prints metabase fanout JSON results', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -967,7 +967,7 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stderr()).toBe('');
|
||||
});
|
||||
|
||||
it('rejects source-dir uploads through the metabase fan-out route', async () => {
|
||||
it('rejects source-dir uploads through the metabase fanout route', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -985,13 +985,13 @@ describe('runKtxIngest', () => {
|
|||
io.io,
|
||||
{
|
||||
runLocalMetabaseIngest: async () => {
|
||||
throw new Error('fan-out should not be called');
|
||||
throw new Error('fanout should not be called');
|
||||
},
|
||||
},
|
||||
),
|
||||
).resolves.toBe(1);
|
||||
|
||||
expect(io.stderr()).toContain('source-dir uploads are not supported for the Metabase fan-out adapter');
|
||||
expect(io.stderr()).toContain('source-dir uploads are not supported for the Metabase fanout adapter');
|
||||
expect(io.stderr()).not.toContain('ktx ingest requires llm.provider.backend');
|
||||
expect(io.stdout()).toBe('');
|
||||
});
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ uv run python -m semantic_layer.cli --model /tmp/model.yaml \
|
|||
-q '{"measures":["orders.revenue"],"dimensions":["customers.segment"]}' --suggest
|
||||
```
|
||||
|
||||
### 3. Test fan-out / chasm traps
|
||||
### 3. Test fanout / chasm traps
|
||||
|
||||
Add multiple measure sources that fan out from a shared dimension hub:
|
||||
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ def print_plan(plan) -> None:
|
|||
print(" Joins:")
|
||||
for jp in plan.join_paths:
|
||||
print(f" {jp}")
|
||||
print(f" Fan-out: {plan.fan_out_description}")
|
||||
print(f" Fanout: {plan.fan_out_description}")
|
||||
if plan.aggregate_locality:
|
||||
print(" Locality:")
|
||||
for al in plan.aggregate_locality:
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ class SqlGenerator:
|
|||
return "WITH " + source_header + ",\n" + rest
|
||||
return "WITH " + source_header + "\n" + outer_transpiled
|
||||
|
||||
# ── Path A: Simple (no fan-out) ────────────────────────────────────
|
||||
# ── Path A: Simple (no fanout) ────────────────────────────────────
|
||||
|
||||
def _generate_simple(
|
||||
self, plan: ResolvedPlan, sources: dict[str, SourceDefinition]
|
||||
|
|
@ -216,7 +216,7 @@ class SqlGenerator:
|
|||
shared_dim_aliases = shared_dim_aliases or set()
|
||||
shared_dims = [dk for dk in all_dim_keys if dk["alias"] in shared_dim_aliases]
|
||||
|
||||
# Validate grain consistency: asymmetric dims cause FULL JOIN fan-out
|
||||
# Validate grain consistency: asymmetric dims cause FULL JOIN fanout
|
||||
if len(plan.measure_groups) > 1:
|
||||
for group in plan.measure_groups:
|
||||
cte_dim_aliases = {
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ class QueryPlanner:
|
|||
for e in tree.edges
|
||||
]
|
||||
|
||||
# 8. Detect fan-out / chasm trap
|
||||
# 8. Detect fanout / chasm trap
|
||||
has_fan_out, measure_groups, fan_out_desc, locality_descs = (
|
||||
self._detect_fan_out(measures, dimensions, tree, filters=query.filters)
|
||||
)
|
||||
|
|
@ -937,7 +937,7 @@ class QueryPlanner:
|
|||
filters: list[str] | None = None,
|
||||
) -> tuple[bool, list[MeasureGroup], str, list[str]]:
|
||||
"""
|
||||
Detect fan-out and chasm traps. Group measures by source.
|
||||
Detect fanout and chasm traps. Group measures by source.
|
||||
If multiple measure sources exist, each needs its own pre-aggregation CTE.
|
||||
Also checks filter sources — a filter forcing a one_to_many join from the
|
||||
measure source is an error (cannot be safely pre-aggregated).
|
||||
|
|
@ -991,7 +991,7 @@ class QueryPlanner:
|
|||
|
||||
if len(groups) <= 1:
|
||||
# Single measure group: check the path FROM measure source TO dimension sources.
|
||||
# Only flag fan-out if those specific paths have one_to_many edges.
|
||||
# Only flag fanout if those specific paths have one_to_many edges.
|
||||
if groups:
|
||||
source_name = next(iter(groups))
|
||||
source_actual = self.graph.alias_map.get(source_name, source_name)
|
||||
|
|
@ -999,7 +999,7 @@ class QueryPlanner:
|
|||
for dim_src in dim_sources:
|
||||
if dim_src == source_name:
|
||||
continue
|
||||
# Skip alias siblings (same underlying source — no fan-out)
|
||||
# Skip alias siblings (same underlying source — no fanout)
|
||||
dim_actual = self.graph.alias_map.get(dim_src, dim_src)
|
||||
if dim_actual == source_actual:
|
||||
continue
|
||||
|
|
@ -1008,7 +1008,7 @@ class QueryPlanner:
|
|||
has_o2m = True
|
||||
break
|
||||
|
||||
# Also check filter sources for one_to_many fan-out
|
||||
# Also check filter sources for one_to_many fanout
|
||||
if not has_o2m:
|
||||
for filter_src in filter_sources - dim_sources - {source_name}:
|
||||
filter_actual = self.graph.alias_map.get(filter_src, filter_src)
|
||||
|
|
@ -1019,7 +1019,7 @@ class QueryPlanner:
|
|||
raise ValueError(
|
||||
f"Filter on '{filter_src}' requires a one_to_many join "
|
||||
f"from measure source '{source_name}', which would cause "
|
||||
f"incorrect aggregation (fan-out). Consider rewriting the "
|
||||
f"incorrect aggregation (fanout). Consider rewriting the "
|
||||
f"filter as a subquery or adding the filter source as a "
|
||||
f"dimension source."
|
||||
)
|
||||
|
|
@ -1033,10 +1033,10 @@ class QueryPlanner:
|
|||
return (
|
||||
True,
|
||||
measure_groups,
|
||||
f"Fan-out detected: one_to_many edges from {source_name} to dimensions",
|
||||
f"Fanout detected: one_to_many edges from {source_name} to dimensions",
|
||||
[f"Pre-aggregate {source_name} measures before joining"],
|
||||
)
|
||||
return False, [], "No fan-out", []
|
||||
return False, [], "No fanout", []
|
||||
|
||||
# Multiple measure sources. Only merge groups that are provably row-safe
|
||||
# (alias siblings or pure one_to_one chains). many_to_one chains are not
|
||||
|
|
@ -1048,7 +1048,7 @@ class QueryPlanner:
|
|||
# All measure sources are on the same safe join chain
|
||||
if merged_groups:
|
||||
mg_name, mg_measures = next(iter(merged_groups.items()))
|
||||
# Still check if there's fan-out to dimension sources
|
||||
# Still check if there's fanout to dimension sources
|
||||
has_o2m = False
|
||||
for dim_src in dim_sources:
|
||||
if dim_src == mg_name:
|
||||
|
|
@ -1061,10 +1061,10 @@ class QueryPlanner:
|
|||
return (
|
||||
True,
|
||||
[MeasureGroup(source_name=mg_name, measures=mg_measures)],
|
||||
f"Fan-out detected: one_to_many edges from {mg_name} to dimensions",
|
||||
f"Fanout detected: one_to_many edges from {mg_name} to dimensions",
|
||||
[f"Pre-aggregate {mg_name} measures before joining"],
|
||||
)
|
||||
return False, [], "No fan-out", []
|
||||
return False, [], "No fanout", []
|
||||
|
||||
# True chasm trap — independent measure sources that can't be safely merged.
|
||||
# Before building groups, validate that all filter sources are reachable
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
"""Dedicated tests for aggregate locality (fan-out/chasm trap correctness)."""
|
||||
"""Dedicated tests for aggregate locality (fanout/chasm trap correctness)."""
|
||||
|
||||
import pytest
|
||||
import sqlglot
|
||||
|
|
@ -213,7 +213,7 @@ class TestNoFanOut:
|
|||
sqlglot.parse(sql)
|
||||
|
||||
def test_m2o_join_no_ctes(self, ecommerce_sources):
|
||||
"""orders → customers is m2o, no fan-out."""
|
||||
"""orders → customers is m2o, no fanout."""
|
||||
graph = JoinGraph(ecommerce_sources)
|
||||
graph.build()
|
||||
planner = QueryPlanner(ecommerce_sources, graph)
|
||||
|
|
@ -540,7 +540,7 @@ class TestFactSideDimensionsInChasm:
|
|||
"""LIMIT 1: Fact-side dimensions in chasm trap (local to one CTE only)."""
|
||||
|
||||
def test_fact_side_dimension_in_chasm_raises_error(self):
|
||||
"""Asymmetric dim from fact_a only → raises error (would cause FULL JOIN fan-out)."""
|
||||
"""Asymmetric dim from fact_a only → raises error (would cause FULL JOIN fanout)."""
|
||||
hub = SourceDefinition(
|
||||
name="hub",
|
||||
table="public.hub",
|
||||
|
|
@ -977,7 +977,7 @@ class TestBug13_FalseChasm_AliasAggregate:
|
|||
dimensions=["billing_customer.name", "shipping_customer.name"],
|
||||
)
|
||||
plan = planner.plan(query)
|
||||
assert not plan.has_fan_out, "Should not detect fan-out between alias siblings"
|
||||
assert not plan.has_fan_out, "Should not detect fanout between alias siblings"
|
||||
sql = gen.generate(plan, sources)
|
||||
sqlglot.parse(sql)
|
||||
|
||||
|
|
|
|||
|
|
@ -305,12 +305,12 @@ class TestPredefinedMeasureDeps:
|
|||
assert "GROUP BY" in sql.upper()
|
||||
|
||||
|
||||
# ── Planner: fan-out with one_to_many to dimension sources (lines 595-643) ──
|
||||
# ── Planner: fanout with one_to_many to dimension sources (lines 595-643) ──
|
||||
|
||||
|
||||
class TestFanOutEdgeCases:
|
||||
def test_single_source_fan_out_to_dimension(self):
|
||||
"""Measure source with one_to_many to dimension should trigger fan-out."""
|
||||
"""Measure source with one_to_many to dimension should trigger fanout."""
|
||||
hub = SourceDefinition(
|
||||
name="hub",
|
||||
table="public.hub",
|
||||
|
|
|
|||
|
|
@ -89,10 +89,10 @@ class TestCrossSourceM2O:
|
|||
|
||||
|
||||
class TestFanOut:
|
||||
"""Test 3: Fan-out (aggregate locality)."""
|
||||
"""Test 3: Fanout (aggregate locality)."""
|
||||
|
||||
def test_orders_by_region_no_fanout(self, planner, generator, ecommerce_sources):
|
||||
"""orders → customers → regions is all m2o. No fan-out needed."""
|
||||
"""orders → customers → regions is all m2o. No fanout needed."""
|
||||
sql = generate_sql(
|
||||
planner,
|
||||
generator,
|
||||
|
|
|
|||
|
|
@ -200,12 +200,12 @@ class TestFanOutDetection:
|
|||
|
||||
|
||||
class TestFanOutSingleSource:
|
||||
"""Fan-out when a single measure source has o2m path to dimension source."""
|
||||
"""Fanout when a single measure source has o2m path to dimension source."""
|
||||
|
||||
def test_reverse_path_fan_out(self):
|
||||
"""Querying from customers (dimension) with measures from orders triggers fan-out
|
||||
"""Querying from customers (dimension) with measures from orders triggers fanout
|
||||
when the path from the measure source (orders) to the dimension source (customers)
|
||||
is m2o — so no fan-out. But reversed: measure on customers, dim on orders."""
|
||||
is m2o — so no fanout. But reversed: measure on customers, dim on orders."""
|
||||
customers = SourceDefinition(
|
||||
name="customers",
|
||||
table="t",
|
||||
|
|
@ -248,7 +248,7 @@ class TestFanOutSingleSource:
|
|||
assert plan.has_fan_out
|
||||
|
||||
def test_m2o_multi_hop_no_fan_out(self, planner):
|
||||
"""orders → customers → regions is all m2o. No fan-out."""
|
||||
"""orders → customers → regions is all m2o. No fanout."""
|
||||
query = SemanticQuery(
|
||||
measures=["sum(orders.amount)"],
|
||||
dimensions=["regions.name"],
|
||||
|
|
@ -1116,7 +1116,7 @@ class TestDerivedMeasureEdgeCases:
|
|||
assert_valid_sql(result.sql)
|
||||
|
||||
|
||||
# ── From test_edge_cases.py: filter fan-out detection ────────────────
|
||||
# ── From test_edge_cases.py: filter fanout detection ────────────────
|
||||
|
||||
|
||||
class TestFilterFanOutDetection:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue