fix: recover snapshots and branch rc tags (#185)

This commit is contained in:
Andrey Avtomonov 2026-05-20 15:22:01 +02:00 committed by GitHub
parent c24e07a115
commit 2667952aa9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 110 additions and 10 deletions

View file

@ -351,6 +351,43 @@ describe('local ingest', () => {
).rejects.toThrow();
});
it('writes a new raw snapshot when an unchanged latest snapshot is missing from disk', async () => {
const sourceDir = join(tempDir, 'missing-snapshot-source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders","version":1}\n', 'utf-8');
const first = await runLocalStageOnlyIngest({
project,
adapters: [new FakeSourceAdapter()],
adapter: 'fake',
connectionId: 'warehouse',
sourceDir,
jobId: 'local-missing-snapshot-1',
now: () => new Date('2026-04-27T12:20:00.000Z'),
});
await rm(join(project.projectDir, 'raw-sources/warehouse/fake', first.syncId), { recursive: true, force: true });
const rerun = await runLocalStageOnlyIngest({
project,
adapters: [new FakeSourceAdapter()],
adapter: 'fake',
connectionId: 'warehouse',
sourceDir,
jobId: 'local-missing-snapshot-2',
now: () => new Date('2026-04-27T12:25:00.000Z'),
});
expect(rerun.previousRunId).toBe(first.runId);
expect(rerun.syncId).toBe('2026-04-27-122500-local-missing-snapshot-2');
expect(rerun.diffSummary).toEqual({ added: 0, modified: 0, deleted: 0, unchanged: 1 });
expect(rerun.workUnitCount).toBe(0);
await expect(
readFile(join(project.projectDir, 'raw-sources/warehouse/fake', rerun.syncId, 'orders/orders.json'), 'utf-8'),
).resolves.toBe('{"name":"orders","version":1}\n');
});
it('reuses the existing sync id when the same local run id is retried', async () => {
const sourceDir = join(tempDir, 'idempotent-source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });

View file

@ -209,6 +209,16 @@ async function pruneStaleRawFiles(input: {
return staleRawPaths;
}
async function rawSnapshotContainsFiles(
project: KtxLocalProject,
rawPrefix: string,
relativeFiles: string[],
): Promise<boolean> {
const existing = await project.fileStore.listFiles(rawPrefix);
const existingFiles = new Set(existing.files);
return relativeFiles.every((file) => existingFiles.has(`${rawPrefix}/${file}`));
}
async function prepareLocalStagedDir(
project: KtxLocalProject,
adapter: SourceAdapter,
@ -292,14 +302,20 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti
priorHashes,
scopeDescriptor ? scopeDescriptor.isPathInScope.bind(scopeDescriptor) : undefined,
);
const unchangedFromLatestCompletedRun =
const matchesLatestCompletedRun =
!existingRun &&
!!latestReport &&
diffSet.added.length === 0 &&
diffSet.modified.length === 0 &&
diffSet.deleted.length === 0;
const reusableLatestSyncId = matchesLatestCompletedRun ? latestReport.syncId : null;
const latestRawPrefix = reusableLatestSyncId
? `raw-sources/${connectionId}/${adapter.source}/${reusableLatestSyncId}`
: null;
const canReuseLatestCompletedRun =
latestRawPrefix !== null && (await rawSnapshotContainsFiles(options.project, latestRawPrefix, relativeFiles));
const syncId =
existingRun?.syncId ?? (unchangedFromLatestCompletedRun ? latestReport.syncId : buildSyncId(started, jobId));
existingRun?.syncId ?? (canReuseLatestCompletedRun && reusableLatestSyncId ? reusableLatestSyncId : buildSyncId(started, jobId));
options.memoryFlow?.update({ syncId });
options.memoryFlow?.emit({ type: 'raw_snapshot_written', syncId, rawFileCount: relativeFiles.length });
options.memoryFlow?.emit({
@ -319,7 +335,7 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti
});
const rawPrefix = `raw-sources/${connectionId}/${adapter.source}/${syncId}`;
const rawPaths = relativeFiles.map((file) => `${rawPrefix}/${file}`);
const staleRawPaths = options.dryRun || unchangedFromLatestCompletedRun
const staleRawPaths = options.dryRun || canReuseLatestCompletedRun
? []
: await pruneStaleRawFiles({
project: options.project,
@ -331,7 +347,7 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti
for (const file of relativeFiles) {
const absolutePath = assertInside(stagedDir, join(stagedDir, file));
const rawPath = `${rawPrefix}/${file}`;
if (!options.dryRun && !unchangedFromLatestCompletedRun) {
if (!options.dryRun && !canReuseLatestCompletedRun) {
await options.project.fileStore.writeFile(
rawPath,
await readFile(absolutePath, 'utf-8'),
@ -387,7 +403,7 @@ async function runLocalStageOnlyIngestInner(options: RunLocalStageOnlyIngestOpti
rawContentHashes: Object.fromEntries(hashes),
});
const commitPaths = unchangedFromLatestCompletedRun ? [] : [...rawPaths, ...staleRawPaths].sort();
const commitPaths = canReuseLatestCompletedRun ? [] : [...rawPaths, ...staleRawPaths].sort();
if (commitPaths.length > 0) {
await options.project.git.commitFiles(
commitPaths,