From b6b63123f8575fff0db22284b51d6db3d1bc2c05 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 18 May 2026 02:39:01 +0200 Subject: [PATCH] feat(ingest): project metricflow semantic models before work units --- .../metricflow/metricflow.adapter.test.ts | 138 +++++++++++++++++- .../adapters/metricflow/metricflow.adapter.ts | 62 +++++++- .../adapters/metricflow/projection-config.ts | 54 +++++++ .../src/ingest/ingest-bundle.runner.ts | 1 + packages/context/src/ingest/types.ts | 2 + 5 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 packages/context/src/ingest/adapters/metricflow/projection-config.ts diff --git a/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.test.ts b/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.test.ts index 19bb6cdc..232624a5 100644 --- a/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.test.ts +++ b/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.test.ts @@ -1,10 +1,12 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { makeLocalGitRepo } from '../../../test/make-local-git-repo.js'; import type { SourceAdapter } from '../../types.js'; +import type { MetricFlowParseResult } from './deep-parse.js'; import { MetricflowSourceAdapter } from './metricflow.adapter.js'; +import { readMetricflowProjectionConfig, writeMetricflowProjectionConfig } from './projection-config.js'; function compileOnlyRequiredDepsCheck(): void { // @ts-expect-error MetricflowSourceAdapter requires an explicit cache home. @@ -22,6 +24,25 @@ async function makeRepo(tmpRoot: string, files: Record) { return makeLocalGitRepo(fixtureDir, join(tmpRoot, 'origin')); } +function metricflowParseResult(): MetricFlowParseResult { + return { + semanticModels: [ + { + name: 'orders', + description: 'Orders', + modelRef: 'orders', + dimensions: [{ name: 'status', column: 'status', type: 'string', label: 'Status' }], + measures: [{ type: 'simple', name: 'order_count', column: 'id', aggregation: 'count' }], + entities: [{ name: 'customer', type: 'foreign', expr: 'customer_id' }], + defaultTimeDimension: null, + }, + ], + crossModelMetrics: [], + relationships: [], + warnings: ['parser warning'], + }; +} + describe('MetricflowSourceAdapter', () => { let tmpRoot: string; let stagedDir: string; @@ -127,4 +148,119 @@ describe('MetricflowSourceAdapter', () => { await expect(readFile(join(stagedDir, 'models/orders.yml'), 'utf-8')).resolves.toContain('semantic_models'); expect(await adapter.detect(stagedDir)).toBe(true); }); + + it('persists parsed target tables for deterministic projection during fetch', async () => { + const repo = await makeRepo(tmpRoot, { + 'dbt_project.yml': 'name: analytics\n', + 'models/orders.yml': 'semantic_models:\n - name: orders\n model: ref("orders")\n', + }); + + await adapter.fetch?.( + { + repoUrl: repo.repoUrl, + branch: 'main', + path: null, + authToken: null, + parsedTargetTables: { + orders: { + ok: true, + catalog: null, + schema: 'analytics', + name: 'orders', + canonicalTable: 'analytics.orders', + }, + }, + }, + stagedDir, + { connectionId: 'warehouse-1', sourceKey: 'metricflow' }, + ); + + await expect(readMetricflowProjectionConfig(stagedDir)).resolves.toMatchObject({ + parsedTargetTables: { + orders: { + ok: true, + schema: 'analytics', + name: 'orders', + }, + }, + }); + }); + + it('projects parsed MetricFlow semantic models in the integration worktree', async () => { + await writeMetricflowProjectionConfig(stagedDir, { + parsedTargetTables: { + orders: { + ok: true, + catalog: null, + schema: 'analytics', + name: 'orders', + canonicalTable: 'analytics.orders', + }, + }, + }); + const scoped = { + getManifestEntry: vi.fn().mockResolvedValue(null), + isManifestBacked: vi.fn().mockResolvedValue(false), + loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }), + loadSource: vi.fn().mockResolvedValue(null), + writeSource: vi.fn().mockResolvedValue({ warnings: [] }), + }; + const semanticLayerService = { + forWorktree: vi.fn().mockReturnValue(scoped), + getManifestEntry: vi.fn(), + isManifestBacked: vi.fn(), + loadAllSources: vi.fn(), + loadSource: vi.fn(), + writeSource: vi.fn(), + }; + + const result = await adapter.project?.({ + connectionId: 'warehouse-1', + sourceKey: 'metricflow', + syncId: 'sync-1', + jobId: 'job-1', + runId: 'run-1', + stagedDir, + workdir: '/tmp/metricflow-integration', + parseArtifacts: metricflowParseResult(), + semanticLayerService: semanticLayerService as never, + }); + + expect(semanticLayerService.forWorktree).toHaveBeenCalledWith('/tmp/metricflow-integration'); + expect(scoped.writeSource).toHaveBeenCalledWith( + 'warehouse-1', + expect.objectContaining({ name: 'orders' }), + 'dbt MetricFlow', + expect.any(String), + 'dbt MetricFlow sync: create source orders', + { skipValidation: true }, + ); + expect(result).toMatchObject({ + warnings: ['parser warning'], + errors: [], + touchedSources: [{ connectionId: 'warehouse-1', sourceName: 'orders' }], + changedWikiPageKeys: [], + }); + }); + + it('returns a projection error when parse artifacts are missing', async () => { + const result = await adapter.project?.({ + connectionId: 'warehouse-1', + sourceKey: 'metricflow', + syncId: 'sync-1', + jobId: 'job-1', + runId: 'run-1', + stagedDir, + workdir: '/tmp/metricflow-integration', + parseArtifacts: undefined, + semanticLayerService: {} as never, + }); + + expect(result).toMatchObject({ + warnings: [], + errors: ['MetricFlow deterministic projection requires parseArtifacts from chunk()'], + touchedSources: [], + changedWikiPageKeys: [], + }); + }); }); diff --git a/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.ts b/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.ts index c8182ed8..8aae1df7 100644 --- a/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.ts +++ b/packages/context/src/ingest/adapters/metricflow/metricflow.adapter.ts @@ -1,10 +1,23 @@ import { join } from 'node:path'; -import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js'; +import type { + ChunkResult, + DeterministicProjectionContext, + DiffSet, + FetchContext, + ProjectionResult, + SourceAdapter, +} from '../../types.js'; import { chunkMetricFlowProject } from './chunk.js'; import { detectMetricFlowStagedDir } from './detect.js'; import { parseMetricflowFiles, type MetricFlowParseResult } from './deep-parse.js'; import { fetchMetricflowRepo } from './fetch.js'; +import { importMetricflowSemanticModels } from './import-semantic-models.js'; import { parseMetricFlowStagedDir, type ParsedMetricFlowProject } from './parse.js'; +import { + metricflowHostTablesFromParsedTargets, + readMetricflowProjectionConfig, + writeMetricflowProjectionConfig, +} from './projection-config.js'; import { parseMetricflowPullConfig } from './pull-config.js'; export interface MetricflowSourceAdapterDeps { @@ -33,6 +46,9 @@ export class MetricflowSourceAdapter implements SourceAdapter { cacheDir: this.resolveCacheDir(ctx.connectionId), stagedDir, }); + await writeMetricflowProjectionConfig(stagedDir, { + parsedTargetTables: config.parsedTargetTables, + }); } async listTargetConnectionIds(_stagedDir: string): Promise { @@ -46,6 +62,37 @@ export class MetricflowSourceAdapter implements SourceAdapter { return { ...chunk, parseArtifacts }; } + async project(ctx: DeterministicProjectionContext): Promise { + if (!isMetricFlowParseResult(ctx.parseArtifacts)) { + return { + warnings: [], + errors: ['MetricFlow deterministic projection requires parseArtifacts from chunk()'], + touchedSources: [], + changedWikiPageKeys: [], + }; + } + + const projectionConfig = await readMetricflowProjectionConfig(ctx.stagedDir); + const result = await importMetricflowSemanticModels( + { semanticLayerService: ctx.semanticLayerService }, + { + connectionId: ctx.connectionId, + parseResult: ctx.parseArtifacts, + targetSchema: null, + hostTables: metricflowHostTablesFromParsedTargets(projectionConfig.parsedTargetTables), + workdir: ctx.workdir, + }, + ); + + return { + result, + warnings: result.warnings, + errors: result.errors, + touchedSources: result.touchedSources, + changedWikiPageKeys: [], + }; + } + private resolveCacheDir(connectionId: string): string { return join(this.deps.homeDir, 'ingest-metricflow-repos', connectionId); } @@ -54,3 +101,16 @@ export class MetricflowSourceAdapter implements SourceAdapter { function parseMetricflowStagedDirForImport(project: ParsedMetricFlowProject): MetricFlowParseResult { return parseMetricflowFiles(project.files); } + +function isMetricFlowParseResult(value: unknown): value is MetricFlowParseResult { + if (!value || typeof value !== 'object') { + return false; + } + const candidate = value as Partial; + return ( + Array.isArray(candidate.semanticModels) && + Array.isArray(candidate.crossModelMetrics) && + Array.isArray(candidate.relationships) && + Array.isArray(candidate.warnings) + ); +} diff --git a/packages/context/src/ingest/adapters/metricflow/projection-config.ts b/packages/context/src/ingest/adapters/metricflow/projection-config.ts new file mode 100644 index 00000000..2d72f774 --- /dev/null +++ b/packages/context/src/ingest/adapters/metricflow/projection-config.ts @@ -0,0 +1,54 @@ +import { mkdir, readFile, writeFile } from 'node:fs/promises'; +import { join } from 'node:path'; +import { z } from 'zod'; +import { parsedTargetTableSchema, type ParsedTargetTable } from '../../parsed-target-table.js'; +import type { MetricflowHostTable } from './semantic-models.js'; + +export const METRICFLOW_PROJECTION_CONFIG_FILE = 'sync-config.json'; + +export const metricflowProjectionConfigSchema = z.object({ + parsedTargetTables: z.record(z.string(), parsedTargetTableSchema).default({}), +}); + +export type MetricflowProjectionConfig = z.infer; + +export async function writeMetricflowProjectionConfig( + stagedDir: string, + config: MetricflowProjectionConfig, +): Promise { + const parsed = metricflowProjectionConfigSchema.parse(config); + await mkdir(stagedDir, { recursive: true }); + await writeFile(join(stagedDir, METRICFLOW_PROJECTION_CONFIG_FILE), `${JSON.stringify(parsed, null, 2)}\n`, 'utf-8'); +} + +export async function readMetricflowProjectionConfig(stagedDir: string): Promise { + const path = join(stagedDir, METRICFLOW_PROJECTION_CONFIG_FILE); + try { + return metricflowProjectionConfigSchema.parse(JSON.parse(await readFile(path, 'utf-8'))); + } catch (error) { + if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') { + return { parsedTargetTables: {} }; + } + throw error; + } +} + +export function metricflowHostTablesFromParsedTargets( + parsedTargetTables: Record, +): MetricflowHostTable[] { + return Object.entries(parsedTargetTables) + .flatMap(([id, table]) => + table.ok + ? [ + { + id, + name: table.name, + catalog: table.catalog, + db: table.schema, + columns: [], + }, + ] + : [], + ) + .sort((left, right) => left.id.localeCompare(right.id)); +} diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 21fc82de..2036f478 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -1352,6 +1352,7 @@ export class IngestBundleRunner { stagedDir, workdir: sessionWorktree.workdir, parseArtifacts, + semanticLayerService: this.deps.semanticLayerService, }), ); if (projection.errors.length > 0) { diff --git a/packages/context/src/ingest/types.ts b/packages/context/src/ingest/types.ts index fb8938d3..370c7511 100644 --- a/packages/context/src/ingest/types.ts +++ b/packages/context/src/ingest/types.ts @@ -1,4 +1,5 @@ import type { KtxEmbeddingPort } from '../core/embedding.js'; +import type { SemanticLayerService } from '../sl/index.js'; import type { MemoryFlowEventSink } from './memory-flow/types.js'; export type IngestTrigger = 'upload' | 'scheduled_pull' | 'manual_resync' | 'manual_override'; @@ -106,6 +107,7 @@ export interface DeterministicProjectionContext { stagedDir: string; workdir: string; parseArtifacts?: unknown; + semanticLayerService: SemanticLayerService; } export interface ProjectionResult {