mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-28 08:49:38 +02:00
* docs: add isolated-diff ingestion design * Refine isolated-diff ingestion design after adversarial review iteration 1 * Refine isolated-diff ingestion design after adversarial review iteration 2 * Refine isolated-diff ingestion design after adversarial review iteration 3 * feat: persist ingest trace events * feat: add isolated ingest patch helpers * feat: validate wiki body semantic references * feat: add final ingest artifact gates * feat: execute ingest work units in child worktrees * feat: integrate isolated work unit patches * feat: route selected ingest sources through isolated diffs * test: cover isolated diff ingestion regressions * feat: add isolated diff ingestion v1 core * docs: document ingest trace inspection * docs: add isolated diff ingestion v1 core plan * fix(ingest): tighten final artifact gates * fix(ingest): gate isolated final integration tree * fix(ingest): persist postmortem failure traces * fix(ingest): trace policy conflicts and cleanup child worktrees * test(ingest): verify isolated diff postmortem coverage * docs: add isolated diff ingestion gates and trace closure plan * fix(ingest): gate provenance before isolated diff squash * docs: add isolated diff ingestion provenance gate closure plan * fix(ingest): gate final wiki references * fix(ingest): enforce SL target connection scope * fix(ingest): trace isolated SL target policy gates * test(ingest): cover isolated diff reference and target gates * chore(ingest): verify isolated diff gate closure * docs: add isolated diff ingestion reference and target gate closure plan * fix(ingest): gate global wiki references * docs: add isolated diff ingestion global wiki reference gate closure plan * fix(ingest): validate scan sources and wiki refs * test(ingest): cover isolated diff textual conflict resolver * test(ingest): cover isolated diff resolver integration * feat(ingest): repair isolated diff textual conflicts * feat(ingest): report isolated diff resolver outcomes * test(ingest): verify isolated diff textual conflict repair * test(ingest): align textual conflict failure coverage * docs: add isolated diff textual conflict resolver plan * test(ingest): cover isolated diff gate repair * feat(ingest): add isolated diff gate repair agent * feat(ingest): repair isolated diff semantic gate failures * feat(ingest): wire isolated diff gate repair * test(ingest): verify isolated diff final gate repair * chore(ingest): verify isolated diff gate repair * docs: add isolated diff gate repair plan * Improve ingest progress updates * feat(ingest): route direct-write connectors through isolated diffs * test(ingest): cover non-metabase isolated diff routing * feat(ingest): project metricflow semantic models before work units * test(ingest): verify metricflow isolated projection path * chore(ingest): verify isolated diff connector migration * docs: add isolated diff connector migration plan * feat(ingest): make isolated diff routing the private default * feat(ingest): promote isolated diff to default runner path * feat(ingest): default local ingest to isolated diffs * chore(ingest): remove isolated diff allowlist references * fix(ingest): preserve transient evidence for isolated work units * docs: add isolated diff default promotion plan * refactor(ingest): remove shared worktree WorkUnit path * docs(ingest): align WorkUnit prompts with isolated diffs * test(ingest): drop unused runner import * docs: add isolated diff shared worktree removal plan * docs: add isolated diff gate repair classification plan * fix: restrict claude-code mcp servers * docs: align ingest trace guidance with public CLI --------- Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
351 lines
12 KiB
TypeScript
351 lines
12 KiB
TypeScript
import { mkdir, writeFile } from 'node:fs/promises';
|
|
import { join } from 'node:path';
|
|
import type { FetchContext, UnresolvedCardInfo } from '../../types.js';
|
|
import type { MetabaseClientFactory, MetabaseRuntimeClient } from './client-port.js';
|
|
import { computeFetchScope, type FetchScope } from './fetch-scope.js';
|
|
import { serializeCard } from './serialize-card.js';
|
|
import type { MetabaseSourceStateReader } from './source-state-port.js';
|
|
import {
|
|
type MetabasePullConfig,
|
|
parseMetabasePullConfig,
|
|
STAGED_FILES,
|
|
type StagedCollectionFile,
|
|
type StagedDatabaseFile,
|
|
type StagedSyncConfig,
|
|
} from './types.js';
|
|
|
|
class IngestInputError extends Error {
|
|
constructor(message: string) {
|
|
super(message);
|
|
this.name = 'IngestInputError';
|
|
}
|
|
}
|
|
|
|
export interface MetabaseFetchLogger {
|
|
log(message: string): void;
|
|
warn(message: string): void;
|
|
}
|
|
|
|
const noopMetabaseFetchLogger: MetabaseFetchLogger = {
|
|
log: () => undefined,
|
|
warn: () => undefined,
|
|
};
|
|
|
|
export interface FetchMetabaseBundleParams {
|
|
pullConfig: unknown;
|
|
stagedDir: string;
|
|
ctx: FetchContext;
|
|
clientFactory: MetabaseClientFactory;
|
|
sourceStateReader: MetabaseSourceStateReader;
|
|
logger?: MetabaseFetchLogger;
|
|
}
|
|
|
|
interface CollectionNode {
|
|
id: number | 'root';
|
|
name: string;
|
|
parentId: number | 'root' | null;
|
|
}
|
|
|
|
function buildCollectionIndex(
|
|
tree: Awaited<ReturnType<MetabaseRuntimeClient['getCollectionTree']>>,
|
|
): Map<number | 'root', CollectionNode> {
|
|
const index = new Map<number | 'root', CollectionNode>();
|
|
function walk(nodes: typeof tree, parentId: number | 'root' | null): void {
|
|
for (const n of nodes) {
|
|
index.set(n.id, { id: n.id, name: n.name, parentId });
|
|
const children = (n.children ?? []) as typeof tree;
|
|
walk(children, n.id);
|
|
}
|
|
}
|
|
walk(tree, null);
|
|
return index;
|
|
}
|
|
|
|
function resolvePath(index: Map<number | 'root', CollectionNode>, collectionId: number | 'root'): string[] {
|
|
const path: string[] = [];
|
|
let cursor: number | 'root' | null = collectionId;
|
|
const visited = new Set<number | 'root'>();
|
|
while (cursor !== null && cursor !== 'root') {
|
|
if (visited.has(cursor)) {
|
|
break;
|
|
}
|
|
visited.add(cursor);
|
|
const node = index.get(cursor);
|
|
if (!node) {
|
|
break;
|
|
}
|
|
path.unshift(node.name);
|
|
cursor = node.parentId;
|
|
}
|
|
return path;
|
|
}
|
|
|
|
export async function fetchMetabaseBundle(params: FetchMetabaseBundleParams): Promise<void> {
|
|
const pullConfig: MetabasePullConfig = parseMetabasePullConfig(params.pullConfig);
|
|
const logger = params.logger ?? noopMetabaseFetchLogger;
|
|
const emitFetchProgress = (percent: number, message: string, transient = false): void => {
|
|
params.ctx.memoryFlow?.emit({
|
|
type: 'stage_progress',
|
|
stage: 'source',
|
|
percent,
|
|
message,
|
|
...(transient ? { transient } : {}),
|
|
});
|
|
};
|
|
const syncState = await params.sourceStateReader.getSourceState(pullConfig.metabaseConnectionId);
|
|
const mapping = syncState.mappings.find(
|
|
(m) => m.metabaseDatabaseId === pullConfig.metabaseDatabaseId && m.syncEnabled,
|
|
);
|
|
if (!mapping?.targetConnectionId) {
|
|
throw new IngestInputError(
|
|
`no sync-enabled mapping for database ${pullConfig.metabaseDatabaseId} on Metabase connection ${pullConfig.metabaseConnectionId}`,
|
|
);
|
|
}
|
|
if (mapping.targetConnectionId !== params.ctx.connectionId) {
|
|
throw new IngestInputError(
|
|
`mapping for database ${pullConfig.metabaseDatabaseId} does not point to connection ${params.ctx.connectionId} (points to ${mapping.targetConnectionId})`,
|
|
);
|
|
}
|
|
|
|
const client = await params.clientFactory.createClient(pullConfig, params.ctx);
|
|
try {
|
|
emitFetchProgress(26, `Fetching Metabase database ${pullConfig.metabaseDatabaseId} metadata`);
|
|
let mappingDatabaseName = mapping.metabaseDatabaseName;
|
|
let mappingEngine = mapping.metabaseEngine;
|
|
if (mappingDatabaseName === null) {
|
|
const database = await client.getDatabase(pullConfig.metabaseDatabaseId);
|
|
mappingDatabaseName = database.name;
|
|
mappingEngine = database.engine ?? null;
|
|
}
|
|
const stagedForScope: StagedSyncConfig = {
|
|
metabaseConnectionId: pullConfig.metabaseConnectionId,
|
|
metabaseDatabaseId: pullConfig.metabaseDatabaseId,
|
|
syncMode: syncState.syncMode,
|
|
selections: syncState.selections.map((s) => ({
|
|
selectionType: s.selectionType,
|
|
metabaseObjectId: s.metabaseObjectId,
|
|
})),
|
|
defaultTagNames: syncState.defaultTagNames,
|
|
mapping: {
|
|
metabaseDatabaseId: mapping.metabaseDatabaseId,
|
|
metabaseDatabaseName: mappingDatabaseName,
|
|
metabaseEngine: mappingEngine,
|
|
targetConnectionId: mapping.targetConnectionId,
|
|
},
|
|
};
|
|
const scope = computeFetchScope(stagedForScope);
|
|
|
|
const collectionTree = await client.getCollectionTree();
|
|
const collectionIndex = buildCollectionIndex(collectionTree);
|
|
|
|
await mkdir(join(params.stagedDir, STAGED_FILES.cardsDir), { recursive: true });
|
|
await mkdir(join(params.stagedDir, STAGED_FILES.collectionsDir), { recursive: true });
|
|
await mkdir(join(params.stagedDir, STAGED_FILES.databasesDir), { recursive: true });
|
|
|
|
const cardIdsToFetch = await resolveCardIdsToFetch(client, scope, pullConfig.metabaseDatabaseId, logger);
|
|
emitFetchProgress(
|
|
28,
|
|
`Fetching ${cardIdsToFetch.length} Metabase card${cardIdsToFetch.length === 1 ? '' : 's'} for database ${
|
|
pullConfig.metabaseDatabaseId
|
|
}`,
|
|
);
|
|
|
|
const referencedCollectionIds = new Set<number>();
|
|
let writtenCards = 0;
|
|
const fetched = new Set<number>();
|
|
const queue: number[] = [...cardIdsToFetch];
|
|
const unresolvedCards: UnresolvedCardInfo[] = [];
|
|
|
|
while (queue.length > 0) {
|
|
const cardId = queue.shift();
|
|
if (cardId === undefined) {
|
|
continue;
|
|
}
|
|
if (fetched.has(cardId)) {
|
|
continue;
|
|
}
|
|
fetched.add(cardId);
|
|
|
|
let fullCard: Awaited<ReturnType<MetabaseRuntimeClient['getCard']>>;
|
|
try {
|
|
fullCard = await client.getCard(cardId);
|
|
} catch (e) {
|
|
logger.warn(`failed to load card ${cardId}: ${e instanceof Error ? e.message : String(e)}`);
|
|
continue;
|
|
}
|
|
if (fullCard.database_id !== pullConfig.metabaseDatabaseId) {
|
|
continue;
|
|
}
|
|
if (fullCard.archived) {
|
|
continue;
|
|
}
|
|
const resolvedResult = await client.getResolvedSql(fullCard).then(
|
|
(sql) => ({ ok: true as const, sql }),
|
|
(err: unknown) => ({ ok: false as const, err }),
|
|
);
|
|
if (!resolvedResult.ok || resolvedResult.sql === null) {
|
|
const reason = classifyResolutionFailure(resolvedResult);
|
|
const errorMessage = resolvedResult.ok
|
|
? undefined
|
|
: resolvedResult.err instanceof Error
|
|
? resolvedResult.err.message
|
|
: String(resolvedResult.err);
|
|
unresolvedCards.push({
|
|
cardId,
|
|
name: fullCard.name,
|
|
reason,
|
|
errorMessage,
|
|
});
|
|
logger.warn(`[metabase.fetch] card ${cardId} ("${fullCard.name}") dropped; reason=${reason}`);
|
|
continue;
|
|
}
|
|
const resolved = resolvedResult.sql;
|
|
const collectionPath =
|
|
fullCard.collection_id && fullCard.collection_id !== 'root'
|
|
? resolvePath(collectionIndex, fullCard.collection_id as number)
|
|
: [];
|
|
const staged = serializeCard({
|
|
card: fullCard,
|
|
resolvedSql: resolved.resolvedSql,
|
|
templateTags: resolved.templateTags ?? [],
|
|
collectionPath,
|
|
resolutionStatus: resolved.resolutionStatus,
|
|
});
|
|
await writeFile(
|
|
join(params.stagedDir, STAGED_FILES.cardsDir, `${fullCard.id}.json`),
|
|
JSON.stringify(staged, null, 2),
|
|
'utf-8',
|
|
);
|
|
writtenCards += 1;
|
|
if (typeof fullCard.collection_id === 'number') {
|
|
referencedCollectionIds.add(fullCard.collection_id);
|
|
}
|
|
|
|
if (scope.kind === 'explicit') {
|
|
for (const refId of staged.referencedCardIds) {
|
|
if (!fetched.has(refId)) {
|
|
queue.push(refId);
|
|
}
|
|
}
|
|
}
|
|
const knownTotal = Math.max(cardIdsToFetch.length, fetched.size + queue.length);
|
|
if (fetched.size === 1 || fetched.size % 10 === 0 || queue.length === 0) {
|
|
emitFetchProgress(
|
|
30,
|
|
`Checked ${fetched.size}/${knownTotal} Metabase cards for database ${pullConfig.metabaseDatabaseId}; wrote ${writtenCards}`,
|
|
true,
|
|
);
|
|
}
|
|
}
|
|
emitFetchProgress(
|
|
32,
|
|
`Fetched Metabase database ${pullConfig.metabaseDatabaseId}: ${writtenCards} cards, ${unresolvedCards.length} unresolved`,
|
|
);
|
|
|
|
for (const colId of referencedCollectionIds) {
|
|
const node = collectionIndex.get(colId);
|
|
if (!node) {
|
|
continue;
|
|
}
|
|
const file: StagedCollectionFile = {
|
|
metabaseId: node.id,
|
|
name: node.name,
|
|
parentId: node.parentId ?? 'root',
|
|
};
|
|
await writeFile(
|
|
join(params.stagedDir, STAGED_FILES.collectionsDir, `${colId}.json`),
|
|
JSON.stringify(file, null, 2),
|
|
'utf-8',
|
|
);
|
|
}
|
|
|
|
const databaseFile: StagedDatabaseFile = {
|
|
metabaseDatabaseId: mapping.metabaseDatabaseId,
|
|
metabaseDatabaseName: mappingDatabaseName,
|
|
metabaseEngine: mappingEngine,
|
|
targetConnectionId: mapping.targetConnectionId,
|
|
};
|
|
await writeFile(
|
|
join(params.stagedDir, STAGED_FILES.databasesDir, `${mapping.metabaseDatabaseId}.json`),
|
|
JSON.stringify(databaseFile, null, 2),
|
|
'utf-8',
|
|
);
|
|
|
|
await writeFile(join(params.stagedDir, STAGED_FILES.syncConfig), JSON.stringify(stagedForScope, null, 2), 'utf-8');
|
|
|
|
if (unresolvedCards.length > 0) {
|
|
await writeFile(
|
|
join(params.stagedDir, STAGED_FILES.unresolvedCards),
|
|
JSON.stringify(unresolvedCards, null, 2),
|
|
'utf-8',
|
|
);
|
|
}
|
|
|
|
logger.log(
|
|
`wrote ${writtenCards} cards for database ${pullConfig.metabaseDatabaseId} -> ${mapping.targetConnectionId} (scope=${scope.kind}); unresolved=${unresolvedCards.length}`,
|
|
);
|
|
} finally {
|
|
await client.cleanup();
|
|
}
|
|
}
|
|
|
|
function classifyResolutionFailure(
|
|
r: { ok: true; sql: { resolvedSql: string } | null } | { ok: false; err: unknown },
|
|
): UnresolvedCardInfo['reason'] {
|
|
if (r.ok && r.sql === null) {
|
|
return 'api_500';
|
|
}
|
|
if (!r.ok) {
|
|
const msg = r.err instanceof Error ? r.err.message : String(r.err);
|
|
if (msg.includes('Cycle detected')) {
|
|
return 'cycle';
|
|
}
|
|
if (msg.includes('no native query')) {
|
|
return 'missing_native';
|
|
}
|
|
}
|
|
return 'unknown';
|
|
}
|
|
|
|
/**
|
|
* Resolve the initial set of card ids to fetch based on the scope. For `all`
|
|
* and `all-except`, this fans out to `getAllCards()` and filters by
|
|
* `database_id` + `excludeCardIds` / `excludeCollectionIds`. For `explicit`,
|
|
* this walks the selection: direct item ids + members of selected collections
|
|
* (via `getCollectionItems`). The closure over `{{#N}}` references is applied
|
|
* later in the main fetch loop.
|
|
*/
|
|
async function resolveCardIdsToFetch(
|
|
client: MetabaseRuntimeClient,
|
|
scope: FetchScope,
|
|
metabaseDatabaseId: number,
|
|
logger: { warn(message: string): void },
|
|
): Promise<number[]> {
|
|
if (scope.kind === 'all' || scope.kind === 'all-except') {
|
|
const all = await client.getAllCards();
|
|
const matching = all.filter((c) => !c.archived && c.database_id === metabaseDatabaseId);
|
|
if (scope.kind === 'all') {
|
|
return matching.map((c) => c.id);
|
|
}
|
|
return matching
|
|
.filter((c) => !scope.excludeCardIds.has(c.id))
|
|
.filter((c) => typeof c.collection_id !== 'number' || !scope.excludeCollectionIds.has(c.collection_id))
|
|
.map((c) => c.id);
|
|
}
|
|
const ids = new Set<number>(scope.includeCardIds);
|
|
for (const colId of scope.includeCollectionIds) {
|
|
let items: Array<{ id: number; model: string }>;
|
|
try {
|
|
items = await client.getCollectionItems(colId);
|
|
} catch (e) {
|
|
logger.warn(`failed to list collection ${colId}: ${e instanceof Error ? e.message : String(e)}`);
|
|
continue;
|
|
}
|
|
for (const item of items) {
|
|
if (item.model === 'card' || item.model === 'dataset' || item.model === 'metric') {
|
|
ids.add(item.id);
|
|
}
|
|
}
|
|
}
|
|
return [...ids];
|
|
}
|