mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
fix(cli): treat artifact-producing ingests with failures as partial (#238)
* fix(cli): derive ingest outcomes from saved artifacts * fix(cli): treat artifact-producing ingests with failures as partial * fix(cli): route memory-flow run status through shared ingest outcome * fix(cli): treat partial ingest as saved context in setup status * test(cli): align memory-flow replay expectations with partial ingests
This commit is contained in:
parent
3f0d11e07d
commit
53a6f8d111
10 changed files with 312 additions and 25 deletions
|
|
@ -13,6 +13,7 @@ import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } fro
|
||||||
import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js';
|
import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js';
|
||||||
import type { MemoryFlowEventSink } from './memory-flow/types.js';
|
import type { MemoryFlowEventSink } from './memory-flow/types.js';
|
||||||
import { buildSyncId } from './raw-sources-paths.js';
|
import { buildSyncId } from './raw-sources-paths.js';
|
||||||
|
import { ingestReportOutcome } from './reports.js';
|
||||||
import type { IngestReportBody, IngestReportSnapshot } from './reports.js';
|
import type { IngestReportBody, IngestReportSnapshot } from './reports.js';
|
||||||
import { SqliteBundleIngestStore } from './sqlite-bundle-ingest-store.js';
|
import { SqliteBundleIngestStore } from './sqlite-bundle-ingest-store.js';
|
||||||
import type { IngestBundleResult, IngestJobContext, IngestJobPhase, IngestTrigger, SourceAdapter } from './types.js';
|
import type { IngestBundleResult, IngestJobContext, IngestJobPhase, IngestTrigger, SourceAdapter } from './types.js';
|
||||||
|
|
@ -79,7 +80,7 @@ export interface LocalMetabaseFanoutProgress {
|
||||||
metabaseDatabaseId: number;
|
metabaseDatabaseId: number;
|
||||||
targetConnectionId: string;
|
targetConnectionId: string;
|
||||||
jobId: string;
|
jobId: string;
|
||||||
status: 'done' | 'failed';
|
status: 'done' | 'partial' | 'failed';
|
||||||
}): void;
|
}): void;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -232,11 +233,11 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise<Lo
|
||||||
}
|
}
|
||||||
|
|
||||||
function metabaseFanoutStatus(children: LocalMetabaseFanoutChild[]): LocalMetabaseFanoutResult['status'] {
|
function metabaseFanoutStatus(children: LocalMetabaseFanoutChild[]): LocalMetabaseFanoutResult['status'] {
|
||||||
const succeeded = children.filter((child) => child.report.body.failedWorkUnits.length === 0).length;
|
const outcomes = children.map((child) => ingestReportOutcome(child.report));
|
||||||
if (succeeded === children.length) {
|
if (outcomes.every((outcome) => outcome === 'done')) {
|
||||||
return 'all_succeeded';
|
return 'all_succeeded';
|
||||||
}
|
}
|
||||||
if (succeeded === 0) {
|
if (outcomes.every((outcome) => outcome === 'error')) {
|
||||||
return 'all_failed';
|
return 'all_failed';
|
||||||
}
|
}
|
||||||
return 'partial_failure';
|
return 'partial_failure';
|
||||||
|
|
@ -401,12 +402,13 @@ export async function runLocalMetabaseIngest(
|
||||||
error,
|
error,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
const childOutcome = ingestReportOutcome(child.report);
|
||||||
options.progress?.onMetabaseChildCompleted?.({
|
options.progress?.onMetabaseChildCompleted?.({
|
||||||
metabaseConnectionId,
|
metabaseConnectionId,
|
||||||
metabaseDatabaseId: childPlan.metabaseDatabaseId,
|
metabaseDatabaseId: childPlan.metabaseDatabaseId,
|
||||||
targetConnectionId,
|
targetConnectionId,
|
||||||
jobId: child.report.jobId,
|
jobId: child.report.jobId,
|
||||||
status: child.report.body.failedWorkUnits.length > 0 ? 'failed' : 'done',
|
status: childOutcome === 'error' ? 'failed' : childOutcome,
|
||||||
});
|
});
|
||||||
children.push({
|
children.push({
|
||||||
jobId: child.report.jobId,
|
jobId: child.report.jobId,
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import type { MemoryAction } from '../../../context/memory/types.js';
|
import type { MemoryAction } from '../../../context/memory/types.js';
|
||||||
import type { LocalIngestRunRecord } from '../local-stage-ingest.js';
|
import type { LocalIngestRunRecord } from '../local-stage-ingest.js';
|
||||||
|
import { ingestReportOutcome } from '../reports.js';
|
||||||
import type { IngestReportSnapshot } from '../reports.js';
|
import type { IngestReportSnapshot } from '../reports.js';
|
||||||
import type {
|
import type {
|
||||||
MemoryFlowActionDetail,
|
MemoryFlowActionDetail,
|
||||||
|
|
@ -72,7 +73,7 @@ function fullModeMetadata(input: {
|
||||||
}
|
}
|
||||||
|
|
||||||
function reportStatus(report: IngestReportSnapshot): MemoryFlowReplayInput['status'] {
|
function reportStatus(report: IngestReportSnapshot): MemoryFlowReplayInput['status'] {
|
||||||
return report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
|
return ingestReportOutcome(report) === 'error' ? 'error' : 'done';
|
||||||
}
|
}
|
||||||
|
|
||||||
function reportCreatedEvent(report: IngestReportSnapshot): MemoryFlowEvent {
|
function reportCreatedEvent(report: IngestReportSnapshot): MemoryFlowEvent {
|
||||||
|
|
|
||||||
|
|
@ -146,6 +146,20 @@ export function savedMemoryCountsForReport(report: IngestReportSnapshot): Ingest
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @internal */
|
||||||
|
export type IngestReportOutcome = 'done' | 'partial' | 'error';
|
||||||
|
|
||||||
|
export function ingestReportOutcome(report: IngestReportSnapshot): IngestReportOutcome {
|
||||||
|
if (report.body.status === 'failed') {
|
||||||
|
return 'error';
|
||||||
|
}
|
||||||
|
if (report.body.failedWorkUnits.length === 0) {
|
||||||
|
return 'done';
|
||||||
|
}
|
||||||
|
const { wikiCount, slCount } = savedMemoryCountsForReport(report);
|
||||||
|
return wikiCount + slCount > 0 ? 'partial' : 'error';
|
||||||
|
}
|
||||||
|
|
||||||
export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex {
|
export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex {
|
||||||
return {
|
return {
|
||||||
jobId,
|
jobId,
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ import { buildMemoryFlowViewModel } from './context/ingest/memory-flow/view-mode
|
||||||
import { createMemoryFlowLiveBuffer, sanitizeMemoryFlowError } from './context/ingest/memory-flow/live-buffer.js';
|
import { createMemoryFlowLiveBuffer, sanitizeMemoryFlowError } from './context/ingest/memory-flow/live-buffer.js';
|
||||||
import { formatMemoryFlowFinalSummary } from './context/ingest/memory-flow/summary.js';
|
import { formatMemoryFlowFinalSummary } from './context/ingest/memory-flow/summary.js';
|
||||||
import { getLatestLocalIngestStatus, getLocalIngestStatus, type LocalMetabaseFanoutResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, runLocalIngest, runLocalMetabaseIngest } from './context/ingest/local-ingest.js';
|
import { getLatestLocalIngestStatus, getLocalIngestStatus, type LocalMetabaseFanoutResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, runLocalIngest, runLocalMetabaseIngest } from './context/ingest/local-ingest.js';
|
||||||
import { type IngestReportSnapshot, savedMemoryCountsForReport } from './context/ingest/reports.js';
|
import { type IngestReportSnapshot, ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js';
|
||||||
import { ingestReportToMemoryFlowReplay } from './context/ingest/memory-flow/events.js';
|
import { ingestReportToMemoryFlowReplay } from './context/ingest/memory-flow/events.js';
|
||||||
import type { MemoryFlowEvent, MemoryFlowReplayInput } from './context/ingest/memory-flow/types.js';
|
import type { MemoryFlowEvent, MemoryFlowReplayInput } from './context/ingest/memory-flow/types.js';
|
||||||
import { renderMemoryFlowReplay } from './context/ingest/memory-flow/render.js';
|
import { renderMemoryFlowReplay } from './context/ingest/memory-flow/render.js';
|
||||||
|
|
@ -93,10 +93,6 @@ export interface KtxIngestDeps {
|
||||||
runtimeIo?: KtxIngestIo;
|
runtimeIo?: KtxIngestIo;
|
||||||
}
|
}
|
||||||
|
|
||||||
function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
|
|
||||||
return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
|
|
||||||
}
|
|
||||||
|
|
||||||
const REPORT_SOURCE_LABELS = new Map<string, string>([
|
const REPORT_SOURCE_LABELS = new Map<string, string>([
|
||||||
['live-database', 'Database schema'],
|
['live-database', 'Database schema'],
|
||||||
['historic-sql', 'Query history'],
|
['historic-sql', 'Query history'],
|
||||||
|
|
@ -193,7 +189,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
|
||||||
if (report.body.tracePath) {
|
if (report.body.tracePath) {
|
||||||
io.stdout.write(`Trace: ${report.body.tracePath}\n`);
|
io.stdout.write(`Trace: ${report.body.tracePath}\n`);
|
||||||
}
|
}
|
||||||
io.stdout.write(`Status: ${reportStatus(report)}\n`);
|
io.stdout.write(`Status: ${ingestReportOutcome(report)}\n`);
|
||||||
io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`);
|
io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`);
|
||||||
io.stdout.write(`Connection: ${report.connectionId}\n`);
|
io.stdout.write(`Connection: ${report.connectionId}\n`);
|
||||||
io.stdout.write(`Sync: ${report.body.syncId}\n`);
|
io.stdout.write(`Sync: ${report.body.syncId}\n`);
|
||||||
|
|
@ -231,7 +227,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng
|
||||||
}
|
}
|
||||||
io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`);
|
io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`);
|
||||||
for (const child of result.children) {
|
for (const child of result.children) {
|
||||||
const status = reportStatus(child.report);
|
const status = ingestReportOutcome(child.report);
|
||||||
io.stdout.write(
|
io.stdout.write(
|
||||||
`- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`,
|
`- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`,
|
||||||
);
|
);
|
||||||
|
|
@ -595,7 +591,7 @@ function initialRunMemoryFlowInput(
|
||||||
}
|
}
|
||||||
|
|
||||||
function finalRunMemoryFlowInput(snapshot: MemoryFlowReplayInput, report: IngestReportSnapshot): MemoryFlowReplayInput {
|
function finalRunMemoryFlowInput(snapshot: MemoryFlowReplayInput, report: IngestReportSnapshot): MemoryFlowReplayInput {
|
||||||
const status = reportStatus(report);
|
const status = ingestReportOutcome(report) === 'error' ? 'error' : 'done';
|
||||||
return {
|
return {
|
||||||
...snapshot,
|
...snapshot,
|
||||||
runId: report.runId,
|
runId: report.runId,
|
||||||
|
|
@ -777,7 +773,7 @@ export async function runKtxIngest(
|
||||||
} finally {
|
} finally {
|
||||||
plainProgress?.flush();
|
plainProgress?.flush();
|
||||||
}
|
}
|
||||||
return result.status === 'all_succeeded' ? 0 : 1;
|
return result.status === 'all_failed' ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobId = deps.jobIdFactory?.();
|
const jobId = deps.jobIdFactory?.();
|
||||||
|
|
@ -846,7 +842,7 @@ export async function runKtxIngest(
|
||||||
liveTui?.close();
|
liveTui?.close();
|
||||||
liveTui = null;
|
liveTui = null;
|
||||||
io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot));
|
io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot));
|
||||||
return reportStatus(result.report) === 'done' ? 0 : 1;
|
return ingestReportOutcome(result.report) === 'error' ? 1 : 0;
|
||||||
}
|
}
|
||||||
plainProgress?.flush();
|
plainProgress?.flush();
|
||||||
await writeReportRecord(result.report, runOutputMode, io, {
|
await writeReportRecord(result.report, runOutputMode, io, {
|
||||||
|
|
@ -854,7 +850,7 @@ export async function runKtxIngest(
|
||||||
renderStoredMemoryFlow: deps.renderStoredMemoryFlow,
|
renderStoredMemoryFlow: deps.renderStoredMemoryFlow,
|
||||||
env,
|
env,
|
||||||
});
|
});
|
||||||
return reportStatus(result.report) === 'done' ? 0 : 1;
|
return ingestReportOutcome(result.report) === 'error' ? 1 : 0;
|
||||||
} finally {
|
} finally {
|
||||||
plainProgress?.flush();
|
plainProgress?.flush();
|
||||||
liveTui?.close();
|
liveTui?.close();
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
import { existsSync } from 'node:fs';
|
import { existsSync } from 'node:fs';
|
||||||
import { basename, join, resolve } from 'node:path';
|
import { basename, join, resolve } from 'node:path';
|
||||||
import { getLatestLocalIngestStatus } from './context/ingest/local-ingest.js';
|
import { getLatestLocalIngestStatus } from './context/ingest/local-ingest.js';
|
||||||
import { savedMemoryCountsForReport } from './context/ingest/reports.js';
|
import { ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js';
|
||||||
import { ktxLocalStateDbPath } from './context/project/local-state-db.js';
|
import { ktxLocalStateDbPath } from './context/project/local-state-db.js';
|
||||||
import { loadKtxProject, type KtxLocalProject } from './context/project/project.js';
|
import { loadKtxProject, type KtxLocalProject } from './context/project/project.js';
|
||||||
import { readKtxSetupState } from './context/project/setup-config.js';
|
import { readKtxSetupState } from './context/project/setup-config.js';
|
||||||
|
|
@ -306,7 +306,7 @@ function sourceConnections(config: Awaited<ReturnType<typeof loadKtxProject>>['c
|
||||||
type LocalIngestStatusReport = NonNullable<Awaited<ReturnType<typeof getLatestLocalIngestStatus>>>;
|
type LocalIngestStatusReport = NonNullable<Awaited<ReturnType<typeof getLatestLocalIngestStatus>>>;
|
||||||
|
|
||||||
function reportHasSavedContext(report: LocalIngestStatusReport): boolean {
|
function reportHasSavedContext(report: LocalIngestStatusReport): boolean {
|
||||||
if (report.body.failedWorkUnits.length > 0) {
|
if (ingestReportOutcome(report) === 'error') {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
const counts = savedMemoryCountsForReport(report);
|
const counts = savedMemoryCountsForReport(report);
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||||
import { initKtxProject, type KtxLocalProject } from '../../../src/context/project/project.js';
|
import { initKtxProject, type KtxLocalProject } from '../../../src/context/project/project.js';
|
||||||
import { LocalMetabaseDiscoveryCache } from '../../../src/context/ingest/adapters/metabase/local-source-state-store.js';
|
import { LocalMetabaseDiscoveryCache } from '../../../src/context/ingest/adapters/metabase/local-source-state-store.js';
|
||||||
import { getLocalIngestStatus, runLocalMetabaseIngest } from '../../../src/context/ingest/local-ingest.js';
|
import { getLocalIngestStatus, runLocalMetabaseIngest } from '../../../src/context/ingest/local-ingest.js';
|
||||||
|
import { ingestReportOutcome } from '../../../src/context/ingest/reports.js';
|
||||||
import type { ChunkResult, FetchContext, SourceAdapter } from '../../../src/context/ingest/types.js';
|
import type { ChunkResult, FetchContext, SourceAdapter } from '../../../src/context/ingest/types.js';
|
||||||
|
|
||||||
class TestAgentRunner implements AgentRunnerPort {
|
class TestAgentRunner implements AgentRunnerPort {
|
||||||
|
|
@ -202,6 +203,24 @@ describe('runLocalMetabaseIngest', () => {
|
||||||
expect(result.children[1]?.report.body.failedWorkUnits).toEqual(['metabase-db-2']);
|
expect(result.children[1]?.report.body.failedWorkUnits).toEqual(['metabase-db-2']);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('keeps a child that saved memory out of all_failed when another child fails', async () => {
|
||||||
|
await seedMetabaseState();
|
||||||
|
const agentRunner = new TestAgentRunner();
|
||||||
|
const ids = ['metabase-child-1', 'metabase-child-2'];
|
||||||
|
|
||||||
|
const result = await runLocalMetabaseIngest({
|
||||||
|
project,
|
||||||
|
adapters: [new FakeMetabaseSourceAdapter()],
|
||||||
|
metabaseConnectionId: 'prod-metabase',
|
||||||
|
agentRunner,
|
||||||
|
jobIdFactory: () => ids.shift() ?? 'metabase-child-extra',
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result.status).toBe('partial_failure');
|
||||||
|
expect(ingestReportOutcome(result.children[0].report)).toBe('done');
|
||||||
|
expect(ingestReportOutcome(result.children[1].report)).toBe('error');
|
||||||
|
});
|
||||||
|
|
||||||
it('captures fetch-time child failures and continues later mappings', async () => {
|
it('captures fetch-time child failures and continues later mappings', async () => {
|
||||||
await seedMetabaseState();
|
await seedMetabaseState();
|
||||||
project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' };
|
project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' };
|
||||||
|
|
|
||||||
|
|
@ -166,7 +166,7 @@ describe('memory-flow event mapping', () => {
|
||||||
runId: 'run-1',
|
runId: 'run-1',
|
||||||
connectionId: 'warehouse',
|
connectionId: 'warehouse',
|
||||||
adapter: 'lookml',
|
adapter: 'lookml',
|
||||||
status: 'error',
|
status: 'done',
|
||||||
sourceDir: null,
|
sourceDir: null,
|
||||||
syncId: 'sync-2',
|
syncId: 'sync-2',
|
||||||
reportId: 'report-1',
|
reportId: 'report-1',
|
||||||
|
|
@ -308,7 +308,7 @@ describe('memory-flow event mapping', () => {
|
||||||
sourceReportPath: 'report-1',
|
sourceReportPath: 'report-1',
|
||||||
fallbackReason: null,
|
fallbackReason: null,
|
||||||
});
|
});
|
||||||
expect(replay.status).toBe('error');
|
expect(replay.status).toBe('done');
|
||||||
expect(replay.reportId).toBe('report-1');
|
expect(replay.reportId).toBe('report-1');
|
||||||
expect(replay.reportPath).toBe('report-1');
|
expect(replay.reportPath).toBe('report-1');
|
||||||
expect(replay.events[0]).toMatchObject({ type: 'source_acquired', emittedAt: '2026-05-01T10:00:00.000Z' });
|
expect(replay.events[0]).toMatchObject({ type: 'source_acquired', emittedAt: '2026-05-01T10:00:00.000Z' });
|
||||||
|
|
|
||||||
71
packages/cli/test/context/ingest/reports.test.ts
Normal file
71
packages/cli/test/context/ingest/reports.test.ts
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
import { describe, expect, it } from 'vitest';
|
||||||
|
import { ingestReportOutcome } from '../../../src/context/ingest/reports.js';
|
||||||
|
import type { IngestReportSnapshot } from '../../../src/context/ingest/reports.js';
|
||||||
|
|
||||||
|
function report(body: Partial<IngestReportSnapshot['body']>): IngestReportSnapshot {
|
||||||
|
return {
|
||||||
|
id: 'r',
|
||||||
|
runId: 'run',
|
||||||
|
jobId: 'job',
|
||||||
|
connectionId: 'warehouse',
|
||||||
|
sourceKey: 'metabase',
|
||||||
|
createdAt: '2026-05-29T00:00:00.000Z',
|
||||||
|
body: {
|
||||||
|
syncId: 'sync',
|
||||||
|
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
|
||||||
|
commitSha: null,
|
||||||
|
workUnits: [],
|
||||||
|
failedWorkUnits: [],
|
||||||
|
reconciliationSkipped: false,
|
||||||
|
conflictsResolved: [],
|
||||||
|
evictionsApplied: [],
|
||||||
|
unmappedFallbacks: [],
|
||||||
|
evictionInputs: [],
|
||||||
|
unresolvedCards: [],
|
||||||
|
supersededBy: null,
|
||||||
|
overrideOf: null,
|
||||||
|
provenanceRows: [],
|
||||||
|
toolTranscripts: [],
|
||||||
|
...body,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const savingWorkUnit = {
|
||||||
|
unitKey: 'ok',
|
||||||
|
rawFiles: ['cards/1.json'],
|
||||||
|
status: 'success' as const,
|
||||||
|
actions: [{ target: 'sl' as const, type: 'updated' as const, key: 'warehouse.orders', detail: 'measure' }],
|
||||||
|
touchedSlSources: [],
|
||||||
|
};
|
||||||
|
|
||||||
|
const failedWorkUnit = {
|
||||||
|
unitKey: 'bad',
|
||||||
|
rawFiles: ['cards/2.json'],
|
||||||
|
status: 'failed' as const,
|
||||||
|
reason: 'tool write failed',
|
||||||
|
actions: [],
|
||||||
|
touchedSlSources: [],
|
||||||
|
};
|
||||||
|
|
||||||
|
describe('ingestReportOutcome', () => {
|
||||||
|
it('returns done when there are no failed work units', () => {
|
||||||
|
expect(ingestReportOutcome(report({ workUnits: [savingWorkUnit] }))).toBe('done');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns partial when failed work units coexist with saved memory', () => {
|
||||||
|
expect(
|
||||||
|
ingestReportOutcome(report({ workUnits: [savingWorkUnit, failedWorkUnit], failedWorkUnits: ['bad'] })),
|
||||||
|
).toBe('partial');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns error when failed work units produced no saved memory', () => {
|
||||||
|
expect(ingestReportOutcome(report({ workUnits: [failedWorkUnit], failedWorkUnits: ['bad'] }))).toBe('error');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns error for a stage-level failure even if artifacts were recorded', () => {
|
||||||
|
expect(ingestReportOutcome(report({ status: 'failed', workUnits: [savingWorkUnit], failedWorkUnits: [] }))).toBe(
|
||||||
|
'error',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -403,7 +403,7 @@ describe('runKtxIngest', () => {
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('returns a non-zero code when Metabase fanout has failed children', async () => {
|
it('returns a non-zero code when a Metabase fanout child fully fails', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeMetabaseConfig(projectDir);
|
await writeMetabaseConfig(projectDir);
|
||||||
const io = makeIo();
|
const io = makeIo();
|
||||||
|
|
@ -441,7 +441,7 @@ describe('runKtxIngest', () => {
|
||||||
{
|
{
|
||||||
runLocalMetabaseIngest: async () => ({
|
runLocalMetabaseIngest: async () => ({
|
||||||
metabaseConnectionId: 'prod-metabase',
|
metabaseConnectionId: 'prod-metabase',
|
||||||
status: 'partial_failure',
|
status: 'all_failed',
|
||||||
totals: { workUnits: 1, failedWorkUnits: 1 },
|
totals: { workUnits: 1, failedWorkUnits: 1 },
|
||||||
children: [
|
children: [
|
||||||
{
|
{
|
||||||
|
|
@ -467,9 +467,83 @@ describe('runKtxIngest', () => {
|
||||||
),
|
),
|
||||||
).resolves.toBe(1);
|
).resolves.toBe(1);
|
||||||
|
|
||||||
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
|
expect(io.stdout()).toContain('Metabase fanout: all_failed');
|
||||||
expect(io.stdout()).toContain('Failed tasks: 1');
|
|
||||||
expect(io.stdout()).toContain('status=error');
|
expect(io.stdout()).toContain('status=error');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('exits 0 and reports status=partial when a Metabase child saved memory despite a failure', async () => {
|
||||||
|
const projectDir = join(tempDir, 'project');
|
||||||
|
await writeMetabaseConfig(projectDir);
|
||||||
|
const io = makeIo();
|
||||||
|
const report = localFakeBundleReport('metabase-child-1', {
|
||||||
|
id: 'report-metabase-child-1',
|
||||||
|
runId: 'run-a',
|
||||||
|
jobId: 'metabase-child-1',
|
||||||
|
connectionId: 'warehouse_a',
|
||||||
|
sourceKey: 'metabase',
|
||||||
|
body: {
|
||||||
|
failedWorkUnits: ['metabase-db-2'],
|
||||||
|
workUnits: [
|
||||||
|
{
|
||||||
|
unitKey: 'metabase-db-1',
|
||||||
|
rawFiles: ['cards/1.json'],
|
||||||
|
status: 'success',
|
||||||
|
actions: [{ target: 'sl', type: 'updated', key: 'warehouse.orders', detail: 'measure' }],
|
||||||
|
touchedSlSources: [],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
unitKey: 'metabase-db-2',
|
||||||
|
rawFiles: ['cards/2.json'],
|
||||||
|
status: 'failed',
|
||||||
|
reason: 'bad SQL',
|
||||||
|
actions: [],
|
||||||
|
touchedSlSources: [],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
runKtxIngest(
|
||||||
|
{
|
||||||
|
command: 'run',
|
||||||
|
projectDir,
|
||||||
|
connectionId: 'prod-metabase',
|
||||||
|
adapter: 'metabase',
|
||||||
|
outputMode: 'plain',
|
||||||
|
},
|
||||||
|
io.io,
|
||||||
|
{
|
||||||
|
runLocalMetabaseIngest: async () => ({
|
||||||
|
metabaseConnectionId: 'prod-metabase',
|
||||||
|
status: 'partial_failure',
|
||||||
|
totals: { workUnits: 2, failedWorkUnits: 1 },
|
||||||
|
children: [
|
||||||
|
{
|
||||||
|
jobId: 'metabase-child-1',
|
||||||
|
metabaseConnectionId: 'prod-metabase',
|
||||||
|
metabaseDatabaseId: 1,
|
||||||
|
targetConnectionId: 'warehouse_a',
|
||||||
|
result: {
|
||||||
|
jobId: 'metabase-child-1',
|
||||||
|
runId: 'run-a',
|
||||||
|
syncId: 'sync-a',
|
||||||
|
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
|
||||||
|
workUnitCount: 2,
|
||||||
|
failedWorkUnits: ['metabase-db-2'],
|
||||||
|
artifactsWritten: 1,
|
||||||
|
commitSha: 'abc',
|
||||||
|
},
|
||||||
|
report,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
).resolves.toBe(0);
|
||||||
|
|
||||||
|
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
|
||||||
|
expect(io.stdout()).toContain('status=partial');
|
||||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -1140,6 +1214,63 @@ describe('runKtxIngest', () => {
|
||||||
expect(io.stdout()).toContain('Status: error\n');
|
expect(io.stdout()).toContain('Status: error\n');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('exits 0 and reports Status: partial when a single-source ingest saved memory despite a failure', async () => {
|
||||||
|
const projectDir = join(tempDir, 'project');
|
||||||
|
await writeWarehouseConfig(projectDir);
|
||||||
|
const sourceDir = join(tempDir, 'source');
|
||||||
|
await mkdir(join(sourceDir, 'orders'), { recursive: true });
|
||||||
|
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
|
||||||
|
|
||||||
|
const partialReport = localFakeBundleReport('local-job-partial', {
|
||||||
|
connectionId: 'warehouse',
|
||||||
|
sourceKey: 'fake',
|
||||||
|
body: {
|
||||||
|
failedWorkUnits: ['orders-bad'],
|
||||||
|
workUnits: [
|
||||||
|
{
|
||||||
|
unitKey: 'orders-ok',
|
||||||
|
rawFiles: ['orders/orders.json'],
|
||||||
|
status: 'success',
|
||||||
|
actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }],
|
||||||
|
touchedSlSources: [],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
unitKey: 'orders-bad',
|
||||||
|
rawFiles: ['orders/bad.json'],
|
||||||
|
status: 'failed',
|
||||||
|
reason: 'writer tool failed',
|
||||||
|
actions: [],
|
||||||
|
touchedSlSources: [],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const runLocal = vi.fn(async (_input: RunLocalIngestOptions) => ({
|
||||||
|
result: {
|
||||||
|
jobId: 'local-job-partial',
|
||||||
|
runId: partialReport.runId,
|
||||||
|
syncId: partialReport.body.syncId,
|
||||||
|
diffSummary: partialReport.body.diffSummary,
|
||||||
|
workUnitCount: partialReport.body.workUnits.length,
|
||||||
|
failedWorkUnits: partialReport.body.failedWorkUnits,
|
||||||
|
artifactsWritten: 1,
|
||||||
|
commitSha: partialReport.body.commitSha,
|
||||||
|
},
|
||||||
|
report: partialReport,
|
||||||
|
}));
|
||||||
|
|
||||||
|
const io = makeIo();
|
||||||
|
await expect(
|
||||||
|
runKtxIngest(
|
||||||
|
{ command: 'run', projectDir, connectionId: 'warehouse', adapter: 'fake', sourceDir, outputMode: 'plain' },
|
||||||
|
io.io,
|
||||||
|
{ runLocalIngest: runLocal, jobIdFactory: () => 'local-job-partial' },
|
||||||
|
),
|
||||||
|
).resolves.toBe(0);
|
||||||
|
|
||||||
|
expect(io.stdout()).toContain('Status: partial\n');
|
||||||
|
});
|
||||||
|
|
||||||
it('prints trace path and error status for stored failed ingest reports', async () => {
|
it('prints trace path and error status for stored failed ingest reports', async () => {
|
||||||
const projectDir = join(tempDir, 'project');
|
const projectDir = join(tempDir, 'project');
|
||||||
await writeWarehouseConfig(projectDir);
|
await writeWarehouseConfig(projectDir);
|
||||||
|
|
|
||||||
|
|
@ -398,6 +398,59 @@ describe('setup status', () => {
|
||||||
expect(rendered).toContain('KTX context built: yes');
|
expect(rendered).toContain('KTX context built: yes');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('reports context ready after a partial ingest report saved memory', async () => {
|
||||||
|
await writeFile(
|
||||||
|
join(tempDir, 'ktx.yaml'),
|
||||||
|
[
|
||||||
|
'setup:',
|
||||||
|
' database_connection_ids:',
|
||||||
|
' - warehouse',
|
||||||
|
'connections:',
|
||||||
|
' warehouse:',
|
||||||
|
' driver: postgres',
|
||||||
|
' url: env:DATABASE_URL',
|
||||||
|
'ingest:',
|
||||||
|
' embeddings:',
|
||||||
|
' backend: none',
|
||||||
|
' dimensions: 8',
|
||||||
|
'',
|
||||||
|
].join('\n'),
|
||||||
|
'utf-8',
|
||||||
|
);
|
||||||
|
await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases'] });
|
||||||
|
await persistLocalBundleReport(
|
||||||
|
tempDir,
|
||||||
|
localFakeBundleReport('warehouse-job-partial', {
|
||||||
|
connectionId: 'warehouse',
|
||||||
|
sourceKey: 'fake',
|
||||||
|
body: {
|
||||||
|
failedWorkUnits: ['orders-bad'],
|
||||||
|
workUnits: [
|
||||||
|
{
|
||||||
|
unitKey: 'orders-ok',
|
||||||
|
rawFiles: ['orders/orders.json'],
|
||||||
|
status: 'success',
|
||||||
|
actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }],
|
||||||
|
touchedSlSources: [],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
unitKey: 'orders-bad',
|
||||||
|
rawFiles: ['orders/bad.json'],
|
||||||
|
status: 'failed',
|
||||||
|
reason: 'writer tool failed',
|
||||||
|
actions: [],
|
||||||
|
touchedSlSources: [],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const status = await readKtxSetupStatus(tempDir);
|
||||||
|
|
||||||
|
expect(status.context).toMatchObject({ ready: true, status: 'completed' });
|
||||||
|
});
|
||||||
|
|
||||||
it('formats plain and JSON setup status payloads', async () => {
|
it('formats plain and JSON setup status payloads', async () => {
|
||||||
const status = await readKtxSetupStatus(tempDir);
|
const status = await readKtxSetupStatus(tempDir);
|
||||||
const rendered = formatKtxSetupStatus(status);
|
const rendered = formatKtxSetupStatus(status);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue