fix: stabilize sync process by rounding cutoff date to midnight UTC and update cleanup logic for user databases

This commit is contained in:
Anish Sarkar 2026-02-04 19:58:47 +05:30
parent 6989059e94
commit dec85b6417
2 changed files with 67 additions and 19 deletions

View file

@ -12,7 +12,7 @@
* 3. Works even if logout cleanup fails
*/
import { PGlite } from "@electric-sql/pglite";
import { PGlite, type Transaction } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";
import { electricSync } from "@electric-sql/pglite-sync";
@ -56,7 +56,10 @@ const pendingSyncs = new Map<string, Promise<SyncHandle>>();
// v2: user-specific database architecture
// v3: consistent cutoff date for sync+queries, visibility refresh support
// v4: heartbeat-based stale notification detection with updated_at tracking
const SYNC_VERSION = 4;
// v5: fixed duplicate key errors (root cause: unstable cutoff dates in use-inbox.ts)
// - added onMustRefetch handler for server-side refetch scenarios
// - fixed getSyncCutoffDate to use stable midnight UTC timestamps
const SYNC_VERSION = 5;
// Database name prefix for identifying SurfSense databases
const DB_PREFIX = "surfsense-";
@ -77,7 +80,7 @@ function getDbName(userId: string): string {
}
/**
* Clean up databases from OTHER users (not the current user)
* Clean up databases from OTHER users AND old versions
* This is called on login to ensure clean state
*/
async function cleanupOtherUserDatabases(currentUserId: string): Promise<void> {
@ -85,6 +88,10 @@ async function cleanupOtherUserDatabases(currentUserId: string): Promise<void> {
return;
}
// The exact database identifier we want to keep (current user + current version)
// Format: "surfsense-{userId}-v{version}"
const currentDbIdentifier = `${DB_PREFIX}${currentUserId}-v${SYNC_VERSION}`;
try {
// Try to list all databases (not supported in all browsers)
if (typeof window.indexedDB.databases === "function") {
@ -95,14 +102,15 @@ async function cleanupOtherUserDatabases(currentUserId: string): Promise<void> {
if (!dbName) continue;
// Check if this is a SurfSense database
if (dbName.startsWith(DB_PREFIX) || dbName.includes("surfsense")) {
// Don't delete current user's database
if (dbName.includes(currentUserId)) {
console.log(`[Electric] Keeping current user's database: ${dbName}`);
if (dbName.includes("surfsense")) {
// Check if this is the current database
// PGlite stores with "/pglite/" prefix, so we check if the name ENDS WITH our identifier
if (dbName.endsWith(currentDbIdentifier)) {
console.log(`[Electric] Keeping current database: ${dbName}`);
continue;
}
// Delete databases from other users
// Delete ALL other databases (other users OR old versions of current user)
try {
console.log(`[Electric] Deleting stale database: ${dbName}`);
window.indexedDB.deleteDatabase(dbName);
@ -413,7 +421,22 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
}, 5000);
});
// Include userId in shapeKey for user-specific sync state
// ROOT CAUSE FIX: The duplicate key errors were caused by unstable cutoff dates
// in use-inbox.ts generating different sync keys on each render.
// That's now fixed (rounded to midnight UTC in getSyncCutoffDate).
// We can safely use shapeKey for fast incremental sync.
const shapeKey = `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`;
// Type assertion to PGlite with electric extension
const pgWithElectric = db as unknown as {
electric: {
syncShapeToTable: (
config: Record<string, unknown>
) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>;
};
};
const shapeConfig = {
shape: {
url: `${electricUrl}/v1/shape`,
@ -425,7 +448,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
},
table,
primaryKey,
shapeKey: `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // User-specific versioned key
shapeKey, // Re-enabled for fast incremental sync (root cause in use-inbox.ts is fixed)
onInitialSync: () => {
console.log(
`[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite`
@ -440,6 +463,36 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
);
rejectInitialSync(error);
},
// Handle must-refetch: clear table data before Electric re-inserts from scratch
// This prevents "duplicate key" errors when the shape is invalidated
onMustRefetch: async (tx: Transaction) => {
console.log(
`[Electric] ⚠️ Must refetch triggered for ${table} - clearing existing data`
);
try {
// Delete rows matching the shape's WHERE clause
// If no WHERE clause, delete all rows from the table
if (validatedWhere) {
// Parse the WHERE clause to build a DELETE statement
// The WHERE clause is already validated and formatted
await tx.exec(`DELETE FROM ${table} WHERE ${validatedWhere}`);
console.log(
`[Electric] 🗑️ Cleared ${table} rows matching: ${validatedWhere}`
);
} else {
// No WHERE clause means we're syncing the entire table
await tx.exec(`DELETE FROM ${table}`);
console.log(`[Electric] 🗑️ Cleared all rows from ${table}`);
}
} catch (cleanupError) {
console.error(
`[Electric] ❌ Failed to clear ${table} during must-refetch:`,
cleanupError
);
// Re-throw to let Electric handle the error
throw cleanupError;
}
},
};
console.log(
@ -447,15 +500,6 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
JSON.stringify(shapeConfig, null, 2)
);
// Type assertion to PGlite with electric extension
const pgWithElectric = db as PGlite & {
electric: {
syncShapeToTable: (
config: typeof shapeConfig
) => Promise<{ unsubscribe: () => void; isUpToDate: boolean; stream: unknown }>;
};
};
let shape: { unsubscribe: () => void; isUpToDate: boolean; stream: unknown };
try {
shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);