feat(ingest): project metricflow semantic models before work units

This commit is contained in:
Andrey Avtomonov 2026-05-18 02:39:01 +02:00
parent 0137de05a2
commit b6b63123f8
5 changed files with 255 additions and 2 deletions

View file

@ -1,10 +1,12 @@
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { makeLocalGitRepo } from '../../../test/make-local-git-repo.js';
import type { SourceAdapter } from '../../types.js';
import type { MetricFlowParseResult } from './deep-parse.js';
import { MetricflowSourceAdapter } from './metricflow.adapter.js';
import { readMetricflowProjectionConfig, writeMetricflowProjectionConfig } from './projection-config.js';
function compileOnlyRequiredDepsCheck(): void {
// @ts-expect-error MetricflowSourceAdapter requires an explicit cache home.
@ -22,6 +24,25 @@ async function makeRepo(tmpRoot: string, files: Record<string, string>) {
return makeLocalGitRepo(fixtureDir, join(tmpRoot, 'origin'));
}
function metricflowParseResult(): MetricFlowParseResult {
return {
semanticModels: [
{
name: 'orders',
description: 'Orders',
modelRef: 'orders',
dimensions: [{ name: 'status', column: 'status', type: 'string', label: 'Status' }],
measures: [{ type: 'simple', name: 'order_count', column: 'id', aggregation: 'count' }],
entities: [{ name: 'customer', type: 'foreign', expr: 'customer_id' }],
defaultTimeDimension: null,
},
],
crossModelMetrics: [],
relationships: [],
warnings: ['parser warning'],
};
}
describe('MetricflowSourceAdapter', () => {
let tmpRoot: string;
let stagedDir: string;
@ -127,4 +148,119 @@ describe('MetricflowSourceAdapter', () => {
await expect(readFile(join(stagedDir, 'models/orders.yml'), 'utf-8')).resolves.toContain('semantic_models');
expect(await adapter.detect(stagedDir)).toBe(true);
});
it('persists parsed target tables for deterministic projection during fetch', async () => {
const repo = await makeRepo(tmpRoot, {
'dbt_project.yml': 'name: analytics\n',
'models/orders.yml': 'semantic_models:\n - name: orders\n model: ref("orders")\n',
});
await adapter.fetch?.(
{
repoUrl: repo.repoUrl,
branch: 'main',
path: null,
authToken: null,
parsedTargetTables: {
orders: {
ok: true,
catalog: null,
schema: 'analytics',
name: 'orders',
canonicalTable: 'analytics.orders',
},
},
},
stagedDir,
{ connectionId: 'warehouse-1', sourceKey: 'metricflow' },
);
await expect(readMetricflowProjectionConfig(stagedDir)).resolves.toMatchObject({
parsedTargetTables: {
orders: {
ok: true,
schema: 'analytics',
name: 'orders',
},
},
});
});
it('projects parsed MetricFlow semantic models in the integration worktree', async () => {
await writeMetricflowProjectionConfig(stagedDir, {
parsedTargetTables: {
orders: {
ok: true,
catalog: null,
schema: 'analytics',
name: 'orders',
canonicalTable: 'analytics.orders',
},
},
});
const scoped = {
getManifestEntry: vi.fn().mockResolvedValue(null),
isManifestBacked: vi.fn().mockResolvedValue(false),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockResolvedValue(null),
writeSource: vi.fn().mockResolvedValue({ warnings: [] }),
};
const semanticLayerService = {
forWorktree: vi.fn().mockReturnValue(scoped),
getManifestEntry: vi.fn(),
isManifestBacked: vi.fn(),
loadAllSources: vi.fn(),
loadSource: vi.fn(),
writeSource: vi.fn(),
};
const result = await adapter.project?.({
connectionId: 'warehouse-1',
sourceKey: 'metricflow',
syncId: 'sync-1',
jobId: 'job-1',
runId: 'run-1',
stagedDir,
workdir: '/tmp/metricflow-integration',
parseArtifacts: metricflowParseResult(),
semanticLayerService: semanticLayerService as never,
});
expect(semanticLayerService.forWorktree).toHaveBeenCalledWith('/tmp/metricflow-integration');
expect(scoped.writeSource).toHaveBeenCalledWith(
'warehouse-1',
expect.objectContaining({ name: 'orders' }),
'dbt MetricFlow',
expect.any(String),
'dbt MetricFlow sync: create source orders',
{ skipValidation: true },
);
expect(result).toMatchObject({
warnings: ['parser warning'],
errors: [],
touchedSources: [{ connectionId: 'warehouse-1', sourceName: 'orders' }],
changedWikiPageKeys: [],
});
});
it('returns a projection error when parse artifacts are missing', async () => {
const result = await adapter.project?.({
connectionId: 'warehouse-1',
sourceKey: 'metricflow',
syncId: 'sync-1',
jobId: 'job-1',
runId: 'run-1',
stagedDir,
workdir: '/tmp/metricflow-integration',
parseArtifacts: undefined,
semanticLayerService: {} as never,
});
expect(result).toMatchObject({
warnings: [],
errors: ['MetricFlow deterministic projection requires parseArtifacts from chunk()'],
touchedSources: [],
changedWikiPageKeys: [],
});
});
});

View file

@ -1,10 +1,23 @@
import { join } from 'node:path';
import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js';
import type {
ChunkResult,
DeterministicProjectionContext,
DiffSet,
FetchContext,
ProjectionResult,
SourceAdapter,
} from '../../types.js';
import { chunkMetricFlowProject } from './chunk.js';
import { detectMetricFlowStagedDir } from './detect.js';
import { parseMetricflowFiles, type MetricFlowParseResult } from './deep-parse.js';
import { fetchMetricflowRepo } from './fetch.js';
import { importMetricflowSemanticModels } from './import-semantic-models.js';
import { parseMetricFlowStagedDir, type ParsedMetricFlowProject } from './parse.js';
import {
metricflowHostTablesFromParsedTargets,
readMetricflowProjectionConfig,
writeMetricflowProjectionConfig,
} from './projection-config.js';
import { parseMetricflowPullConfig } from './pull-config.js';
export interface MetricflowSourceAdapterDeps {
@ -33,6 +46,9 @@ export class MetricflowSourceAdapter implements SourceAdapter {
cacheDir: this.resolveCacheDir(ctx.connectionId),
stagedDir,
});
await writeMetricflowProjectionConfig(stagedDir, {
parsedTargetTables: config.parsedTargetTables,
});
}
async listTargetConnectionIds(_stagedDir: string): Promise<string[]> {
@ -46,6 +62,37 @@ export class MetricflowSourceAdapter implements SourceAdapter {
return { ...chunk, parseArtifacts };
}
async project(ctx: DeterministicProjectionContext): Promise<ProjectionResult> {
if (!isMetricFlowParseResult(ctx.parseArtifacts)) {
return {
warnings: [],
errors: ['MetricFlow deterministic projection requires parseArtifacts from chunk()'],
touchedSources: [],
changedWikiPageKeys: [],
};
}
const projectionConfig = await readMetricflowProjectionConfig(ctx.stagedDir);
const result = await importMetricflowSemanticModels(
{ semanticLayerService: ctx.semanticLayerService },
{
connectionId: ctx.connectionId,
parseResult: ctx.parseArtifacts,
targetSchema: null,
hostTables: metricflowHostTablesFromParsedTargets(projectionConfig.parsedTargetTables),
workdir: ctx.workdir,
},
);
return {
result,
warnings: result.warnings,
errors: result.errors,
touchedSources: result.touchedSources,
changedWikiPageKeys: [],
};
}
private resolveCacheDir(connectionId: string): string {
return join(this.deps.homeDir, 'ingest-metricflow-repos', connectionId);
}
@ -54,3 +101,16 @@ export class MetricflowSourceAdapter implements SourceAdapter {
function parseMetricflowStagedDirForImport(project: ParsedMetricFlowProject): MetricFlowParseResult {
return parseMetricflowFiles(project.files);
}
function isMetricFlowParseResult(value: unknown): value is MetricFlowParseResult {
if (!value || typeof value !== 'object') {
return false;
}
const candidate = value as Partial<MetricFlowParseResult>;
return (
Array.isArray(candidate.semanticModels) &&
Array.isArray(candidate.crossModelMetrics) &&
Array.isArray(candidate.relationships) &&
Array.isArray(candidate.warnings)
);
}

View file

@ -0,0 +1,54 @@
import { mkdir, readFile, writeFile } from 'node:fs/promises';
import { join } from 'node:path';
import { z } from 'zod';
import { parsedTargetTableSchema, type ParsedTargetTable } from '../../parsed-target-table.js';
import type { MetricflowHostTable } from './semantic-models.js';
export const METRICFLOW_PROJECTION_CONFIG_FILE = 'sync-config.json';
export const metricflowProjectionConfigSchema = z.object({
parsedTargetTables: z.record(z.string(), parsedTargetTableSchema).default({}),
});
export type MetricflowProjectionConfig = z.infer<typeof metricflowProjectionConfigSchema>;
export async function writeMetricflowProjectionConfig(
stagedDir: string,
config: MetricflowProjectionConfig,
): Promise<void> {
const parsed = metricflowProjectionConfigSchema.parse(config);
await mkdir(stagedDir, { recursive: true });
await writeFile(join(stagedDir, METRICFLOW_PROJECTION_CONFIG_FILE), `${JSON.stringify(parsed, null, 2)}\n`, 'utf-8');
}
export async function readMetricflowProjectionConfig(stagedDir: string): Promise<MetricflowProjectionConfig> {
const path = join(stagedDir, METRICFLOW_PROJECTION_CONFIG_FILE);
try {
return metricflowProjectionConfigSchema.parse(JSON.parse(await readFile(path, 'utf-8')));
} catch (error) {
if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') {
return { parsedTargetTables: {} };
}
throw error;
}
}
export function metricflowHostTablesFromParsedTargets(
parsedTargetTables: Record<string, ParsedTargetTable>,
): MetricflowHostTable[] {
return Object.entries(parsedTargetTables)
.flatMap(([id, table]) =>
table.ok
? [
{
id,
name: table.name,
catalog: table.catalog,
db: table.schema,
columns: [],
},
]
: [],
)
.sort((left, right) => left.id.localeCompare(right.id));
}

View file

@ -1352,6 +1352,7 @@ export class IngestBundleRunner {
stagedDir,
workdir: sessionWorktree.workdir,
parseArtifacts,
semanticLayerService: this.deps.semanticLayerService,
}),
);
if (projection.errors.length > 0) {

View file

@ -1,4 +1,5 @@
import type { KtxEmbeddingPort } from '../core/embedding.js';
import type { SemanticLayerService } from '../sl/index.js';
import type { MemoryFlowEventSink } from './memory-flow/types.js';
export type IngestTrigger = 'upload' | 'scheduled_pull' | 'manual_resync' | 'manual_override';
@ -106,6 +107,7 @@ export interface DeterministicProjectionContext {
stagedDir: string;
workdir: string;
parseArtifacts?: unknown;
semanticLayerService: SemanticLayerService;
}
export interface ProjectionResult {