mirror of
https://github.com/Kaelio/ktx.git
synced 2026-07-01 08:59:39 +02:00
feat(ingest): default local ingest to isolated diffs (#128)
* docs: add isolated-diff ingestion design * Refine isolated-diff ingestion design after adversarial review iteration 1 * Refine isolated-diff ingestion design after adversarial review iteration 2 * Refine isolated-diff ingestion design after adversarial review iteration 3 * feat: persist ingest trace events * feat: add isolated ingest patch helpers * feat: validate wiki body semantic references * feat: add final ingest artifact gates * feat: execute ingest work units in child worktrees * feat: integrate isolated work unit patches * feat: route selected ingest sources through isolated diffs * test: cover isolated diff ingestion regressions * feat: add isolated diff ingestion v1 core * docs: document ingest trace inspection * docs: add isolated diff ingestion v1 core plan * fix(ingest): tighten final artifact gates * fix(ingest): gate isolated final integration tree * fix(ingest): persist postmortem failure traces * fix(ingest): trace policy conflicts and cleanup child worktrees * test(ingest): verify isolated diff postmortem coverage * docs: add isolated diff ingestion gates and trace closure plan * fix(ingest): gate provenance before isolated diff squash * docs: add isolated diff ingestion provenance gate closure plan * fix(ingest): gate final wiki references * fix(ingest): enforce SL target connection scope * fix(ingest): trace isolated SL target policy gates * test(ingest): cover isolated diff reference and target gates * chore(ingest): verify isolated diff gate closure * docs: add isolated diff ingestion reference and target gate closure plan * fix(ingest): gate global wiki references * docs: add isolated diff ingestion global wiki reference gate closure plan * fix(ingest): validate scan sources and wiki refs * test(ingest): cover isolated diff textual conflict resolver * test(ingest): cover isolated diff resolver integration * feat(ingest): repair isolated diff textual conflicts * feat(ingest): report isolated diff resolver outcomes * test(ingest): verify isolated diff textual conflict repair * test(ingest): align textual conflict failure coverage * docs: add isolated diff textual conflict resolver plan * test(ingest): cover isolated diff gate repair * feat(ingest): add isolated diff gate repair agent * feat(ingest): repair isolated diff semantic gate failures * feat(ingest): wire isolated diff gate repair * test(ingest): verify isolated diff final gate repair * chore(ingest): verify isolated diff gate repair * docs: add isolated diff gate repair plan * Improve ingest progress updates * feat(ingest): route direct-write connectors through isolated diffs * test(ingest): cover non-metabase isolated diff routing * feat(ingest): project metricflow semantic models before work units * test(ingest): verify metricflow isolated projection path * chore(ingest): verify isolated diff connector migration * docs: add isolated diff connector migration plan * feat(ingest): make isolated diff routing the private default * feat(ingest): promote isolated diff to default runner path * feat(ingest): default local ingest to isolated diffs * chore(ingest): remove isolated diff allowlist references * fix(ingest): preserve transient evidence for isolated work units * docs: add isolated diff default promotion plan * refactor(ingest): remove shared worktree WorkUnit path * docs(ingest): align WorkUnit prompts with isolated diffs * test(ingest): drop unused runner import * docs: add isolated diff shared worktree removal plan * docs: add isolated diff gate repair classification plan * fix: restrict claude-code mcp servers * docs: align ingest trace guidance with public CLI --------- Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
parent
d1c84e5564
commit
e64da5a85d
66 changed files with 22346 additions and 514 deletions
|
|
@ -138,6 +138,52 @@ describe('fetchMetabaseBundle', () => {
|
|||
expect(warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('emits memory-flow progress while fetching Metabase cards', async () => {
|
||||
const events: unknown[] = [];
|
||||
|
||||
await fetchMetabaseBundle({
|
||||
pullConfig: { metabaseConnectionId, metabaseDatabaseId: 42 },
|
||||
stagedDir,
|
||||
ctx: {
|
||||
...makeFetchContext(),
|
||||
memoryFlow: {
|
||||
emit: (event) => events.push(event),
|
||||
update: vi.fn(),
|
||||
finish: vi.fn(),
|
||||
snapshot: vi.fn(),
|
||||
},
|
||||
},
|
||||
clientFactory,
|
||||
sourceStateReader,
|
||||
});
|
||||
|
||||
expect(events).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
type: 'stage_progress',
|
||||
stage: 'source',
|
||||
message: 'Fetching Metabase database 42 metadata',
|
||||
}),
|
||||
expect.objectContaining({
|
||||
type: 'stage_progress',
|
||||
stage: 'source',
|
||||
message: 'Fetching 1 Metabase card for database 42',
|
||||
}),
|
||||
expect.objectContaining({
|
||||
type: 'stage_progress',
|
||||
stage: 'source',
|
||||
message: 'Checked 1/1 Metabase cards for database 42; wrote 1',
|
||||
transient: true,
|
||||
}),
|
||||
expect.objectContaining({
|
||||
type: 'stage_progress',
|
||||
stage: 'source',
|
||||
message: 'Fetched Metabase database 42: 1 cards, 0 unresolved',
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('routes Metabase fetch warnings through the injected logger', async () => {
|
||||
const logger = {
|
||||
log: vi.fn(),
|
||||
|
|
|
|||
|
|
@ -83,6 +83,15 @@ function resolvePath(index: Map<number | 'root', CollectionNode>, collectionId:
|
|||
export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise<void> {
|
||||
const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig);
|
||||
const logger = params.logger ?? noopMetabaseFetchLogger;
|
||||
const emitFetchProgress = (percent: number, message: string, transient = false): void => {
|
||||
params.ctx.memoryFlow?.emit({
|
||||
type: 'stage_progress',
|
||||
stage: 'source',
|
||||
percent,
|
||||
message,
|
||||
...(transient ? { transient } : {}),
|
||||
});
|
||||
};
|
||||
const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId);
|
||||
const mapping = syncState.mappings.find(
|
||||
(m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled,
|
||||
|
|
@ -100,6 +109,7 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr
|
|||
|
||||
const client = await params.clientFactory.createClient(pullConfig, params.ctx);
|
||||
try {
|
||||
emitFetchProgress(26, `Fetching Metabase database ${pullConfig.metabaseDatabaseId} metadata`);
|
||||
let mappingDatabaseName = mapping.metabaseDatabaseName;
|
||||
let mappingEngine = mapping.metabaseEngine;
|
||||
if (mappingDatabaseName === null) {
|
||||
|
|
@ -133,6 +143,12 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr
|
|||
await mkdir(join(params.stagedDir, STAGED_FILES.databasesDir), { recursive: true });
|
||||
|
||||
const cardIdsToFetch = await resolveCardIdsToFetch(client, scope, pullConfig.metabaseDatabaseId, logger);
|
||||
emitFetchProgress(
|
||||
28,
|
||||
`Fetching ${cardIdsToFetch.length} Metabase card${cardIdsToFetch.length === 1 ? '' : 's'} for database ${
|
||||
pullConfig.metabaseDatabaseId
|
||||
}`,
|
||||
);
|
||||
|
||||
const referencedCollectionIds = new Set<number>();
|
||||
let writtenCards = 0;
|
||||
|
|
@ -212,7 +228,19 @@ export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Pr
|
|||
}
|
||||
}
|
||||
}
|
||||
const knownTotal = Math.max(cardIdsToFetch.length, fetched.size + queue.length);
|
||||
if (fetched.size === 1 || fetched.size % 10 === 0 || queue.length === 0) {
|
||||
emitFetchProgress(
|
||||
30,
|
||||
`Checked ${fetched.size}/${knownTotal} Metabase cards for database ${pullConfig.metabaseDatabaseId}; wrote ${writtenCards}`,
|
||||
true,
|
||||
);
|
||||
}
|
||||
}
|
||||
emitFetchProgress(
|
||||
32,
|
||||
`Fetched Metabase database ${pullConfig.metabaseDatabaseId}: ${writtenCards} cards, ${unresolvedCards.length} unresolved`,
|
||||
);
|
||||
|
||||
for (const colId of referencedCollectionIds) {
|
||||
const node = collectionIndex.get(colId);
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { makeLocalGitRepo } from '../../../test/make-local-git-repo.js';
|
||||
import type { SourceAdapter } from '../../types.js';
|
||||
import type { MetricFlowParseResult } from './deep-parse.js';
|
||||
import { MetricflowSourceAdapter } from './metricflow.adapter.js';
|
||||
import { readMetricflowProjectionConfig, writeMetricflowProjectionConfig } from './projection-config.js';
|
||||
|
||||
function compileOnlyRequiredDepsCheck(): void {
|
||||
// @ts-expect-error MetricflowSourceAdapter requires an explicit cache home.
|
||||
|
|
@ -22,6 +24,25 @@ async function makeRepo(tmpRoot: string, files: Record<string, string>) {
|
|||
return makeLocalGitRepo(fixtureDir, join(tmpRoot, 'origin'));
|
||||
}
|
||||
|
||||
function metricflowParseResult(): MetricFlowParseResult {
|
||||
return {
|
||||
semanticModels: [
|
||||
{
|
||||
name: 'orders',
|
||||
description: 'Orders',
|
||||
modelRef: 'orders',
|
||||
dimensions: [{ name: 'status', column: 'status', type: 'string', label: 'Status' }],
|
||||
measures: [{ type: 'simple', name: 'order_count', column: 'id', aggregation: 'count' }],
|
||||
entities: [{ name: 'customer', type: 'foreign', expr: 'customer_id' }],
|
||||
defaultTimeDimension: null,
|
||||
},
|
||||
],
|
||||
crossModelMetrics: [],
|
||||
relationships: [],
|
||||
warnings: ['parser warning'],
|
||||
};
|
||||
}
|
||||
|
||||
describe('MetricflowSourceAdapter', () => {
|
||||
let tmpRoot: string;
|
||||
let stagedDir: string;
|
||||
|
|
@ -127,4 +148,119 @@ describe('MetricflowSourceAdapter', () => {
|
|||
await expect(readFile(join(stagedDir, 'models/orders.yml'), 'utf-8')).resolves.toContain('semantic_models');
|
||||
expect(await adapter.detect(stagedDir)).toBe(true);
|
||||
});
|
||||
|
||||
it('persists parsed target tables for deterministic projection during fetch', async () => {
|
||||
const repo = await makeRepo(tmpRoot, {
|
||||
'dbt_project.yml': 'name: analytics\n',
|
||||
'models/orders.yml': 'semantic_models:\n - name: orders\n model: ref("orders")\n',
|
||||
});
|
||||
|
||||
await adapter.fetch?.(
|
||||
{
|
||||
repoUrl: repo.repoUrl,
|
||||
branch: 'main',
|
||||
path: null,
|
||||
authToken: null,
|
||||
parsedTargetTables: {
|
||||
orders: {
|
||||
ok: true,
|
||||
catalog: null,
|
||||
schema: 'analytics',
|
||||
name: 'orders',
|
||||
canonicalTable: 'analytics.orders',
|
||||
},
|
||||
},
|
||||
},
|
||||
stagedDir,
|
||||
{ connectionId: 'warehouse-1', sourceKey: 'metricflow' },
|
||||
);
|
||||
|
||||
await expect(readMetricflowProjectionConfig(stagedDir)).resolves.toMatchObject({
|
||||
parsedTargetTables: {
|
||||
orders: {
|
||||
ok: true,
|
||||
schema: 'analytics',
|
||||
name: 'orders',
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('projects parsed MetricFlow semantic models in the integration worktree', async () => {
|
||||
await writeMetricflowProjectionConfig(stagedDir, {
|
||||
parsedTargetTables: {
|
||||
orders: {
|
||||
ok: true,
|
||||
catalog: null,
|
||||
schema: 'analytics',
|
||||
name: 'orders',
|
||||
canonicalTable: 'analytics.orders',
|
||||
},
|
||||
},
|
||||
});
|
||||
const scoped = {
|
||||
getManifestEntry: vi.fn().mockResolvedValue(null),
|
||||
isManifestBacked: vi.fn().mockResolvedValue(false),
|
||||
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
|
||||
loadSource: vi.fn().mockResolvedValue(null),
|
||||
writeSource: vi.fn().mockResolvedValue({ warnings: [] }),
|
||||
};
|
||||
const semanticLayerService = {
|
||||
forWorktree: vi.fn().mockReturnValue(scoped),
|
||||
getManifestEntry: vi.fn(),
|
||||
isManifestBacked: vi.fn(),
|
||||
loadAllSources: vi.fn(),
|
||||
loadSource: vi.fn(),
|
||||
writeSource: vi.fn(),
|
||||
};
|
||||
|
||||
const result = await adapter.project?.({
|
||||
connectionId: 'warehouse-1',
|
||||
sourceKey: 'metricflow',
|
||||
syncId: 'sync-1',
|
||||
jobId: 'job-1',
|
||||
runId: 'run-1',
|
||||
stagedDir,
|
||||
workdir: '/tmp/metricflow-integration',
|
||||
parseArtifacts: metricflowParseResult(),
|
||||
semanticLayerService: semanticLayerService as never,
|
||||
});
|
||||
|
||||
expect(semanticLayerService.forWorktree).toHaveBeenCalledWith('/tmp/metricflow-integration');
|
||||
expect(scoped.writeSource).toHaveBeenCalledWith(
|
||||
'warehouse-1',
|
||||
expect.objectContaining({ name: 'orders' }),
|
||||
'dbt MetricFlow',
|
||||
expect.any(String),
|
||||
'dbt MetricFlow sync: create source orders',
|
||||
{ skipValidation: true },
|
||||
);
|
||||
expect(result).toMatchObject({
|
||||
warnings: ['parser warning'],
|
||||
errors: [],
|
||||
touchedSources: [{ connectionId: 'warehouse-1', sourceName: 'orders' }],
|
||||
changedWikiPageKeys: [],
|
||||
});
|
||||
});
|
||||
|
||||
it('returns a projection error when parse artifacts are missing', async () => {
|
||||
const result = await adapter.project?.({
|
||||
connectionId: 'warehouse-1',
|
||||
sourceKey: 'metricflow',
|
||||
syncId: 'sync-1',
|
||||
jobId: 'job-1',
|
||||
runId: 'run-1',
|
||||
stagedDir,
|
||||
workdir: '/tmp/metricflow-integration',
|
||||
parseArtifacts: undefined,
|
||||
semanticLayerService: {} as never,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
warnings: [],
|
||||
errors: ['MetricFlow deterministic projection requires parseArtifacts from chunk()'],
|
||||
touchedSources: [],
|
||||
changedWikiPageKeys: [],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,10 +1,23 @@
|
|||
import { join } from 'node:path';
|
||||
import type { ChunkResult, DiffSet, FetchContext, SourceAdapter } from '../../types.js';
|
||||
import type {
|
||||
ChunkResult,
|
||||
DeterministicProjectionContext,
|
||||
DiffSet,
|
||||
FetchContext,
|
||||
ProjectionResult,
|
||||
SourceAdapter,
|
||||
} from '../../types.js';
|
||||
import { chunkMetricFlowProject } from './chunk.js';
|
||||
import { detectMetricFlowStagedDir } from './detect.js';
|
||||
import { parseMetricflowFiles, type MetricFlowParseResult } from './deep-parse.js';
|
||||
import { fetchMetricflowRepo } from './fetch.js';
|
||||
import { importMetricflowSemanticModels } from './import-semantic-models.js';
|
||||
import { parseMetricFlowStagedDir, type ParsedMetricFlowProject } from './parse.js';
|
||||
import {
|
||||
metricflowHostTablesFromParsedTargets,
|
||||
readMetricflowProjectionConfig,
|
||||
writeMetricflowProjectionConfig,
|
||||
} from './projection-config.js';
|
||||
import { parseMetricflowPullConfig } from './pull-config.js';
|
||||
|
||||
export interface MetricflowSourceAdapterDeps {
|
||||
|
|
@ -33,6 +46,9 @@ export class MetricflowSourceAdapter implements SourceAdapter {
|
|||
cacheDir: this.resolveCacheDir(ctx.connectionId),
|
||||
stagedDir,
|
||||
});
|
||||
await writeMetricflowProjectionConfig(stagedDir, {
|
||||
parsedTargetTables: config.parsedTargetTables,
|
||||
});
|
||||
}
|
||||
|
||||
async listTargetConnectionIds(_stagedDir: string): Promise<string[]> {
|
||||
|
|
@ -46,6 +62,37 @@ export class MetricflowSourceAdapter implements SourceAdapter {
|
|||
return { ...chunk, parseArtifacts };
|
||||
}
|
||||
|
||||
async project(ctx: DeterministicProjectionContext): Promise<ProjectionResult> {
|
||||
if (!isMetricFlowParseResult(ctx.parseArtifacts)) {
|
||||
return {
|
||||
warnings: [],
|
||||
errors: ['MetricFlow deterministic projection requires parseArtifacts from chunk()'],
|
||||
touchedSources: [],
|
||||
changedWikiPageKeys: [],
|
||||
};
|
||||
}
|
||||
|
||||
const projectionConfig = await readMetricflowProjectionConfig(ctx.stagedDir);
|
||||
const result = await importMetricflowSemanticModels(
|
||||
{ semanticLayerService: ctx.semanticLayerService },
|
||||
{
|
||||
connectionId: ctx.connectionId,
|
||||
parseResult: ctx.parseArtifacts,
|
||||
targetSchema: null,
|
||||
hostTables: metricflowHostTablesFromParsedTargets(projectionConfig.parsedTargetTables),
|
||||
workdir: ctx.workdir,
|
||||
},
|
||||
);
|
||||
|
||||
return {
|
||||
result,
|
||||
warnings: result.warnings,
|
||||
errors: result.errors,
|
||||
touchedSources: result.touchedSources,
|
||||
changedWikiPageKeys: [],
|
||||
};
|
||||
}
|
||||
|
||||
private resolveCacheDir(connectionId: string): string {
|
||||
return join(this.deps.homeDir, 'ingest-metricflow-repos', connectionId);
|
||||
}
|
||||
|
|
@ -54,3 +101,16 @@ export class MetricflowSourceAdapter implements SourceAdapter {
|
|||
function parseMetricflowStagedDirForImport(project: ParsedMetricFlowProject): MetricFlowParseResult {
|
||||
return parseMetricflowFiles(project.files);
|
||||
}
|
||||
|
||||
function isMetricFlowParseResult(value: unknown): value is MetricFlowParseResult {
|
||||
if (!value || typeof value !== 'object') {
|
||||
return false;
|
||||
}
|
||||
const candidate = value as Partial<MetricFlowParseResult>;
|
||||
return (
|
||||
Array.isArray(candidate.semanticModels) &&
|
||||
Array.isArray(candidate.crossModelMetrics) &&
|
||||
Array.isArray(candidate.relationships) &&
|
||||
Array.isArray(candidate.warnings)
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
import { mkdir, readFile, writeFile } from 'node:fs/promises';
|
||||
import { join } from 'node:path';
|
||||
import { z } from 'zod';
|
||||
import { parsedTargetTableSchema, type ParsedTargetTable } from '../../parsed-target-table.js';
|
||||
import type { MetricflowHostTable } from './semantic-models.js';
|
||||
|
||||
const METRICFLOW_PROJECTION_CONFIG_FILE = 'sync-config.json';
|
||||
|
||||
const metricflowProjectionConfigSchema = z.object({
|
||||
parsedTargetTables: z.record(z.string(), parsedTargetTableSchema).default({}),
|
||||
});
|
||||
|
||||
export type MetricflowProjectionConfig = z.infer<typeof metricflowProjectionConfigSchema>;
|
||||
|
||||
export async function writeMetricflowProjectionConfig(
|
||||
stagedDir: string,
|
||||
config: MetricflowProjectionConfig,
|
||||
): Promise<void> {
|
||||
const parsed = metricflowProjectionConfigSchema.parse(config);
|
||||
await mkdir(stagedDir, { recursive: true });
|
||||
await writeFile(join(stagedDir, METRICFLOW_PROJECTION_CONFIG_FILE), `${JSON.stringify(parsed, null, 2)}\n`, 'utf-8');
|
||||
}
|
||||
|
||||
export async function readMetricflowProjectionConfig(stagedDir: string): Promise<MetricflowProjectionConfig> {
|
||||
const path = join(stagedDir, METRICFLOW_PROJECTION_CONFIG_FILE);
|
||||
try {
|
||||
return metricflowProjectionConfigSchema.parse(JSON.parse(await readFile(path, 'utf-8')));
|
||||
} catch (error) {
|
||||
if (error && typeof error === 'object' && 'code' in error && error.code === 'ENOENT') {
|
||||
return { parsedTargetTables: {} };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export function metricflowHostTablesFromParsedTargets(
|
||||
parsedTargetTables: Record<string, ParsedTargetTable>,
|
||||
): MetricflowHostTable[] {
|
||||
return Object.entries(parsedTargetTables)
|
||||
.flatMap(([id, table]) =>
|
||||
table.ok
|
||||
? [
|
||||
{
|
||||
id,
|
||||
name: table.name,
|
||||
catalog: table.catalog,
|
||||
db: table.schema,
|
||||
columns: [],
|
||||
},
|
||||
]
|
||||
: [],
|
||||
)
|
||||
.sort((left, right) => left.id.localeCompare(right.id));
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue