mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
fix(cli): treat artifact-producing ingests with failures as partial
This commit is contained in:
parent
b313948db4
commit
830c372ca9
2 changed files with 142 additions and 15 deletions
|
|
@ -2,7 +2,7 @@ import { buildMemoryFlowViewModel } from './context/ingest/memory-flow/view-mode
|
|||
import { createMemoryFlowLiveBuffer, sanitizeMemoryFlowError } from './context/ingest/memory-flow/live-buffer.js';
|
||||
import { formatMemoryFlowFinalSummary } from './context/ingest/memory-flow/summary.js';
|
||||
import { getLatestLocalIngestStatus, getLocalIngestStatus, type LocalMetabaseFanoutResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, runLocalIngest, runLocalMetabaseIngest } from './context/ingest/local-ingest.js';
|
||||
import { type IngestReportSnapshot, savedMemoryCountsForReport } from './context/ingest/reports.js';
|
||||
import { type IngestReportSnapshot, ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js';
|
||||
import { ingestReportToMemoryFlowReplay } from './context/ingest/memory-flow/events.js';
|
||||
import type { MemoryFlowEvent, MemoryFlowReplayInput } from './context/ingest/memory-flow/types.js';
|
||||
import { renderMemoryFlowReplay } from './context/ingest/memory-flow/render.js';
|
||||
|
|
@ -93,10 +93,6 @@ export interface KtxIngestDeps {
|
|||
runtimeIo?: KtxIngestIo;
|
||||
}
|
||||
|
||||
function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
|
||||
return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
|
||||
}
|
||||
|
||||
const REPORT_SOURCE_LABELS = new Map<string, string>([
|
||||
['live-database', 'Database schema'],
|
||||
['historic-sql', 'Query history'],
|
||||
|
|
@ -193,7 +189,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
|
|||
if (report.body.tracePath) {
|
||||
io.stdout.write(`Trace: ${report.body.tracePath}\n`);
|
||||
}
|
||||
io.stdout.write(`Status: ${reportStatus(report)}\n`);
|
||||
io.stdout.write(`Status: ${ingestReportOutcome(report)}\n`);
|
||||
io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`);
|
||||
io.stdout.write(`Connection: ${report.connectionId}\n`);
|
||||
io.stdout.write(`Sync: ${report.body.syncId}\n`);
|
||||
|
|
@ -231,7 +227,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng
|
|||
}
|
||||
io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`);
|
||||
for (const child of result.children) {
|
||||
const status = reportStatus(child.report);
|
||||
const status = ingestReportOutcome(child.report);
|
||||
io.stdout.write(
|
||||
`- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`,
|
||||
);
|
||||
|
|
@ -595,7 +591,7 @@ function initialRunMemoryFlowInput(
|
|||
}
|
||||
|
||||
function finalRunMemoryFlowInput(snapshot: MemoryFlowReplayInput, report: IngestReportSnapshot): MemoryFlowReplayInput {
|
||||
const status = reportStatus(report);
|
||||
const status = ingestReportOutcome(report) === 'error' ? 'error' : 'done';
|
||||
return {
|
||||
...snapshot,
|
||||
runId: report.runId,
|
||||
|
|
@ -777,7 +773,7 @@ export async function runKtxIngest(
|
|||
} finally {
|
||||
plainProgress?.flush();
|
||||
}
|
||||
return result.status === 'all_succeeded' ? 0 : 1;
|
||||
return result.status === 'all_failed' ? 1 : 0;
|
||||
}
|
||||
|
||||
const jobId = deps.jobIdFactory?.();
|
||||
|
|
@ -846,7 +842,7 @@ export async function runKtxIngest(
|
|||
liveTui?.close();
|
||||
liveTui = null;
|
||||
io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot));
|
||||
return reportStatus(result.report) === 'done' ? 0 : 1;
|
||||
return ingestReportOutcome(result.report) === 'error' ? 1 : 0;
|
||||
}
|
||||
plainProgress?.flush();
|
||||
await writeReportRecord(result.report, runOutputMode, io, {
|
||||
|
|
@ -854,7 +850,7 @@ export async function runKtxIngest(
|
|||
renderStoredMemoryFlow: deps.renderStoredMemoryFlow,
|
||||
env,
|
||||
});
|
||||
return reportStatus(result.report) === 'done' ? 0 : 1;
|
||||
return ingestReportOutcome(result.report) === 'error' ? 1 : 0;
|
||||
} finally {
|
||||
plainProgress?.flush();
|
||||
liveTui?.close();
|
||||
|
|
|
|||
|
|
@ -403,7 +403,7 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('returns a non-zero code when Metabase fanout has failed children', async () => {
|
||||
it('returns a non-zero code when a Metabase fanout child fully fails', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
|
|
@ -441,7 +441,7 @@ describe('runKtxIngest', () => {
|
|||
{
|
||||
runLocalMetabaseIngest: async () => ({
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
status: 'partial_failure',
|
||||
status: 'all_failed',
|
||||
totals: { workUnits: 1, failedWorkUnits: 1 },
|
||||
children: [
|
||||
{
|
||||
|
|
@ -467,9 +467,83 @@ describe('runKtxIngest', () => {
|
|||
),
|
||||
).resolves.toBe(1);
|
||||
|
||||
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
|
||||
expect(io.stdout()).toContain('Failed tasks: 1');
|
||||
expect(io.stdout()).toContain('Metabase fanout: all_failed');
|
||||
expect(io.stdout()).toContain('status=error');
|
||||
});
|
||||
|
||||
it('exits 0 and reports status=partial when a Metabase child saved memory despite a failure', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
const report = localFakeBundleReport('metabase-child-1', {
|
||||
id: 'report-metabase-child-1',
|
||||
runId: 'run-a',
|
||||
jobId: 'metabase-child-1',
|
||||
connectionId: 'warehouse_a',
|
||||
sourceKey: 'metabase',
|
||||
body: {
|
||||
failedWorkUnits: ['metabase-db-2'],
|
||||
workUnits: [
|
||||
{
|
||||
unitKey: 'metabase-db-1',
|
||||
rawFiles: ['cards/1.json'],
|
||||
status: 'success',
|
||||
actions: [{ target: 'sl', type: 'updated', key: 'warehouse.orders', detail: 'measure' }],
|
||||
touchedSlSources: [],
|
||||
},
|
||||
{
|
||||
unitKey: 'metabase-db-2',
|
||||
rawFiles: ['cards/2.json'],
|
||||
status: 'failed',
|
||||
reason: 'bad SQL',
|
||||
actions: [],
|
||||
touchedSlSources: [],
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
||||
await expect(
|
||||
runKtxIngest(
|
||||
{
|
||||
command: 'run',
|
||||
projectDir,
|
||||
connectionId: 'prod-metabase',
|
||||
adapter: 'metabase',
|
||||
outputMode: 'plain',
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
runLocalMetabaseIngest: async () => ({
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
status: 'partial_failure',
|
||||
totals: { workUnits: 2, failedWorkUnits: 1 },
|
||||
children: [
|
||||
{
|
||||
jobId: 'metabase-child-1',
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
metabaseDatabaseId: 1,
|
||||
targetConnectionId: 'warehouse_a',
|
||||
result: {
|
||||
jobId: 'metabase-child-1',
|
||||
runId: 'run-a',
|
||||
syncId: 'sync-a',
|
||||
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
|
||||
workUnitCount: 2,
|
||||
failedWorkUnits: ['metabase-db-2'],
|
||||
artifactsWritten: 1,
|
||||
commitSha: 'abc',
|
||||
},
|
||||
report,
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
),
|
||||
).resolves.toBe(0);
|
||||
|
||||
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
|
||||
expect(io.stdout()).toContain('status=partial');
|
||||
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
|
|
@ -1140,6 +1214,63 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stdout()).toContain('Status: error\n');
|
||||
});
|
||||
|
||||
it('exits 0 and reports Status: partial when a single-source ingest saved memory despite a failure', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeWarehouseConfig(projectDir);
|
||||
const sourceDir = join(tempDir, 'source');
|
||||
await mkdir(join(sourceDir, 'orders'), { recursive: true });
|
||||
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
|
||||
|
||||
const partialReport = localFakeBundleReport('local-job-partial', {
|
||||
connectionId: 'warehouse',
|
||||
sourceKey: 'fake',
|
||||
body: {
|
||||
failedWorkUnits: ['orders-bad'],
|
||||
workUnits: [
|
||||
{
|
||||
unitKey: 'orders-ok',
|
||||
rawFiles: ['orders/orders.json'],
|
||||
status: 'success',
|
||||
actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }],
|
||||
touchedSlSources: [],
|
||||
},
|
||||
{
|
||||
unitKey: 'orders-bad',
|
||||
rawFiles: ['orders/bad.json'],
|
||||
status: 'failed',
|
||||
reason: 'writer tool failed',
|
||||
actions: [],
|
||||
touchedSlSources: [],
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
const runLocal = vi.fn(async (_input: RunLocalIngestOptions) => ({
|
||||
result: {
|
||||
jobId: 'local-job-partial',
|
||||
runId: partialReport.runId,
|
||||
syncId: partialReport.body.syncId,
|
||||
diffSummary: partialReport.body.diffSummary,
|
||||
workUnitCount: partialReport.body.workUnits.length,
|
||||
failedWorkUnits: partialReport.body.failedWorkUnits,
|
||||
artifactsWritten: 1,
|
||||
commitSha: partialReport.body.commitSha,
|
||||
},
|
||||
report: partialReport,
|
||||
}));
|
||||
|
||||
const io = makeIo();
|
||||
await expect(
|
||||
runKtxIngest(
|
||||
{ command: 'run', projectDir, connectionId: 'warehouse', adapter: 'fake', sourceDir, outputMode: 'plain' },
|
||||
io.io,
|
||||
{ runLocalIngest: runLocal, jobIdFactory: () => 'local-job-partial' },
|
||||
),
|
||||
).resolves.toBe(0);
|
||||
|
||||
expect(io.stdout()).toContain('Status: partial\n');
|
||||
});
|
||||
|
||||
it('prints trace path and error status for stored failed ingest reports', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeWarehouseConfig(projectDir);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue