mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
fix(cli): derive ingest outcomes from saved artifacts
This commit is contained in:
parent
8ebc4ce107
commit
b313948db4
4 changed files with 110 additions and 5 deletions
|
|
@ -13,6 +13,7 @@ import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } fro
|
|||
import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js';
|
||||
import type { MemoryFlowEventSink } from './memory-flow/types.js';
|
||||
import { buildSyncId } from './raw-sources-paths.js';
|
||||
import { ingestReportOutcome } from './reports.js';
|
||||
import type { IngestReportBody, IngestReportSnapshot } from './reports.js';
|
||||
import { SqliteBundleIngestStore } from './sqlite-bundle-ingest-store.js';
|
||||
import type { IngestBundleResult, IngestJobContext, IngestJobPhase, IngestTrigger, SourceAdapter } from './types.js';
|
||||
|
|
@ -79,7 +80,7 @@ export interface LocalMetabaseFanoutProgress {
|
|||
metabaseDatabaseId: number;
|
||||
targetConnectionId: string;
|
||||
jobId: string;
|
||||
status: 'done' | 'failed';
|
||||
status: 'done' | 'partial' | 'failed';
|
||||
}): void;
|
||||
}
|
||||
|
||||
|
|
@ -232,11 +233,11 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise<Lo
|
|||
}
|
||||
|
||||
function metabaseFanoutStatus(children: LocalMetabaseFanoutChild[]): LocalMetabaseFanoutResult['status'] {
|
||||
const succeeded = children.filter((child) => child.report.body.failedWorkUnits.length === 0).length;
|
||||
if (succeeded === children.length) {
|
||||
const outcomes = children.map((child) => ingestReportOutcome(child.report));
|
||||
if (outcomes.every((outcome) => outcome === 'done')) {
|
||||
return 'all_succeeded';
|
||||
}
|
||||
if (succeeded === 0) {
|
||||
if (outcomes.every((outcome) => outcome === 'error')) {
|
||||
return 'all_failed';
|
||||
}
|
||||
return 'partial_failure';
|
||||
|
|
@ -406,7 +407,7 @@ export async function runLocalMetabaseIngest(
|
|||
metabaseDatabaseId: childPlan.metabaseDatabaseId,
|
||||
targetConnectionId,
|
||||
jobId: child.report.jobId,
|
||||
status: child.report.body.failedWorkUnits.length > 0 ? 'failed' : 'done',
|
||||
status: ingestReportOutcome(child.report) === 'error' ? 'failed' : ingestReportOutcome(child.report),
|
||||
});
|
||||
children.push({
|
||||
jobId: child.report.jobId,
|
||||
|
|
|
|||
|
|
@ -146,6 +146,20 @@ export function savedMemoryCountsForReport(report: IngestReportSnapshot): Ingest
|
|||
};
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export type IngestReportOutcome = 'done' | 'partial' | 'error';
|
||||
|
||||
export function ingestReportOutcome(report: IngestReportSnapshot): IngestReportOutcome {
|
||||
if (report.body.status === 'failed') {
|
||||
return 'error';
|
||||
}
|
||||
if (report.body.failedWorkUnits.length === 0) {
|
||||
return 'done';
|
||||
}
|
||||
const { wikiCount, slCount } = savedMemoryCountsForReport(report);
|
||||
return wikiCount + slCount > 0 ? 'partial' : 'error';
|
||||
}
|
||||
|
||||
export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex {
|
||||
return {
|
||||
jobId,
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
|||
import { initKtxProject, type KtxLocalProject } from '../../../src/context/project/project.js';
|
||||
import { LocalMetabaseDiscoveryCache } from '../../../src/context/ingest/adapters/metabase/local-source-state-store.js';
|
||||
import { getLocalIngestStatus, runLocalMetabaseIngest } from '../../../src/context/ingest/local-ingest.js';
|
||||
import { ingestReportOutcome } from '../../../src/context/ingest/reports.js';
|
||||
import type { ChunkResult, FetchContext, SourceAdapter } from '../../../src/context/ingest/types.js';
|
||||
|
||||
class TestAgentRunner implements AgentRunnerPort {
|
||||
|
|
@ -202,6 +203,24 @@ describe('runLocalMetabaseIngest', () => {
|
|||
expect(result.children[1]?.report.body.failedWorkUnits).toEqual(['metabase-db-2']);
|
||||
});
|
||||
|
||||
it('keeps a child that saved memory out of all_failed when another child fails', async () => {
|
||||
await seedMetabaseState();
|
||||
const agentRunner = new TestAgentRunner();
|
||||
const ids = ['metabase-child-1', 'metabase-child-2'];
|
||||
|
||||
const result = await runLocalMetabaseIngest({
|
||||
project,
|
||||
adapters: [new FakeMetabaseSourceAdapter()],
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
agentRunner,
|
||||
jobIdFactory: () => ids.shift() ?? 'metabase-child-extra',
|
||||
});
|
||||
|
||||
expect(result.status).toBe('partial_failure');
|
||||
expect(ingestReportOutcome(result.children[0].report)).toBe('done');
|
||||
expect(ingestReportOutcome(result.children[1].report)).toBe('error');
|
||||
});
|
||||
|
||||
it('captures fetch-time child failures and continues later mappings', async () => {
|
||||
await seedMetabaseState();
|
||||
project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' };
|
||||
|
|
|
|||
71
packages/cli/test/context/ingest/reports.test.ts
Normal file
71
packages/cli/test/context/ingest/reports.test.ts
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
import { ingestReportOutcome } from '../../../src/context/ingest/reports.js';
|
||||
import type { IngestReportSnapshot } from '../../../src/context/ingest/reports.js';
|
||||
|
||||
function report(body: Partial<IngestReportSnapshot['body']>): IngestReportSnapshot {
|
||||
return {
|
||||
id: 'r',
|
||||
runId: 'run',
|
||||
jobId: 'job',
|
||||
connectionId: 'warehouse',
|
||||
sourceKey: 'metabase',
|
||||
createdAt: '2026-05-29T00:00:00.000Z',
|
||||
body: {
|
||||
syncId: 'sync',
|
||||
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
|
||||
commitSha: null,
|
||||
workUnits: [],
|
||||
failedWorkUnits: [],
|
||||
reconciliationSkipped: false,
|
||||
conflictsResolved: [],
|
||||
evictionsApplied: [],
|
||||
unmappedFallbacks: [],
|
||||
evictionInputs: [],
|
||||
unresolvedCards: [],
|
||||
supersededBy: null,
|
||||
overrideOf: null,
|
||||
provenanceRows: [],
|
||||
toolTranscripts: [],
|
||||
...body,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const savingWorkUnit = {
|
||||
unitKey: 'ok',
|
||||
rawFiles: ['cards/1.json'],
|
||||
status: 'success' as const,
|
||||
actions: [{ target: 'sl' as const, type: 'updated' as const, key: 'warehouse.orders', detail: 'measure' }],
|
||||
touchedSlSources: [],
|
||||
};
|
||||
|
||||
const failedWorkUnit = {
|
||||
unitKey: 'bad',
|
||||
rawFiles: ['cards/2.json'],
|
||||
status: 'failed' as const,
|
||||
reason: 'tool write failed',
|
||||
actions: [],
|
||||
touchedSlSources: [],
|
||||
};
|
||||
|
||||
describe('ingestReportOutcome', () => {
|
||||
it('returns done when there are no failed work units', () => {
|
||||
expect(ingestReportOutcome(report({ workUnits: [savingWorkUnit] }))).toBe('done');
|
||||
});
|
||||
|
||||
it('returns partial when failed work units coexist with saved memory', () => {
|
||||
expect(
|
||||
ingestReportOutcome(report({ workUnits: [savingWorkUnit, failedWorkUnit], failedWorkUnits: ['bad'] })),
|
||||
).toBe('partial');
|
||||
});
|
||||
|
||||
it('returns error when failed work units produced no saved memory', () => {
|
||||
expect(ingestReportOutcome(report({ workUnits: [failedWorkUnit], failedWorkUnits: ['bad'] }))).toBe('error');
|
||||
});
|
||||
|
||||
it('returns error for a stage-level failure even if artifacts were recorded', () => {
|
||||
expect(ingestReportOutcome(report({ status: 'failed', workUnits: [savingWorkUnit], failedWorkUnits: [] }))).toBe(
|
||||
'error',
|
||||
);
|
||||
});
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue