diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 04f76a7f2..7ef8f7bbf 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -16,6 +16,17 @@ import { PGlite, type Transaction } from "@electric-sql/pglite"; import { live } from "@electric-sql/pglite/live"; import { electricSync } from "@electric-sql/pglite-sync"; +// Debug logging - only logs in development, silent in production +const IS_DEV = process.env.NODE_ENV === "development"; + +function debugLog(...args: unknown[]) { + if (IS_DEV) console.log(...args); +} + +function debugWarn(...args: unknown[]) { + if (IS_DEV) console.warn(...args); +} + // Types export interface ElectricClient { db: PGlite; @@ -106,23 +117,23 @@ async function cleanupOtherUserDatabases(currentUserId: string): Promise { // Check if this is the current database // PGlite stores with "/pglite/" prefix, so we check if the name ENDS WITH our identifier if (dbName.endsWith(currentDbIdentifier)) { - console.log(`[Electric] Keeping current database: ${dbName}`); + debugLog(`[Electric] Keeping current database: ${dbName}`); continue; } // Delete ALL other databases (other users OR old versions of current user) try { - console.log(`[Electric] Deleting stale database: ${dbName}`); + debugLog(`[Electric] Deleting stale database: ${dbName}`); window.indexedDB.deleteDatabase(dbName); } catch (deleteErr) { - console.warn(`[Electric] Failed to delete database ${dbName}:`, deleteErr); + debugWarn(`[Electric] Failed to delete database ${dbName}:`, deleteErr); } } } } } catch (err) { // indexedDB.databases() not supported - that's okay, login cleanup is best-effort - console.warn("[Electric] Could not enumerate databases for cleanup:", err); + debugWarn("[Electric] Could not enumerate databases for cleanup:", err); } } @@ -148,7 +159,7 @@ export async function initElectric(userId: string): Promise { // If initialized for a different user, close the old client first if (electricClient && currentUserId !== userId) { - console.log(`[Electric] User changed from ${currentUserId} to ${userId}, reinitializing...`); + debugLog(`[Electric] User changed from ${currentUserId} to ${userId}, reinitializing...`); await cleanupElectric(); } @@ -163,12 +174,12 @@ export async function initElectric(userId: string): Promise { initPromise = (async () => { try { // STEP 1: Clean up databases from other users (login-time cleanup) - console.log("[Electric] Cleaning up databases from other users..."); + debugLog("[Electric] Cleaning up databases from other users..."); await cleanupOtherUserDatabases(userId); // STEP 2: Create user-specific PGlite database const dbName = getDbName(userId); - console.log(`[Electric] Initializing database: ${dbName}`); + debugLog(`[Electric] Initializing database: ${dbName}`); const db = await PGlite.create({ dataDir: dbName, @@ -298,14 +309,14 @@ export async function initElectric(userId: string): Promise { // Check if we already have an active sync for this shape (memory optimization) const existingHandle = activeSyncHandles.get(cacheKey); if (existingHandle) { - console.log(`[Electric] Reusing existing sync handle for: ${cacheKey}`); + debugLog(`[Electric] Reusing existing sync handle for: ${cacheKey}`); return existingHandle; } // Check if there's already a pending sync for this shape (prevent race condition) const pendingSync = pendingSyncs.get(cacheKey); if (pendingSync) { - console.log(`[Electric] Waiting for pending sync to complete: ${cacheKey}`); + debugLog(`[Electric] Waiting for pending sync to complete: ${cacheKey}`); return pendingSync; } @@ -331,7 +342,7 @@ export async function initElectric(userId: string): Promise { if (singleQuoteCount % 2 !== 0) { // Odd number of quotes means unterminated string literal - console.warn("Where clause has unmatched quotes, fixing:", where); + debugWarn("Where clause has unmatched quotes, fixing:", where); // Add closing quote at the end validatedWhere = `${where}'`; params.where = validatedWhere; @@ -345,15 +356,15 @@ export async function initElectric(userId: string): Promise { if (columns) params.columns = columns.join(","); - console.log("[Electric] Syncing shape with params:", params); - console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`); - console.log("[Electric] Where clause:", where, "Validated:", validatedWhere); + debugLog("[Electric] Syncing shape with params:", params); + debugLog("[Electric] Electric URL:", `${electricUrl}/v1/shape`); + debugLog("[Electric] Where clause:", where, "Validated:", validatedWhere); try { // Debug: Test Electric SQL connection directly first (DEV ONLY - skipped in production) if (process.env.NODE_ENV === "development") { const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`; - console.log("[Electric] Testing Electric SQL directly:", testUrl); + debugLog("[Electric] Testing Electric SQL directly:", testUrl); try { const testResponse = await fetch(testUrl); const testHeaders = { @@ -361,9 +372,9 @@ export async function initElectric(userId: string): Promise { offset: testResponse.headers.get("electric-offset"), upToDate: testResponse.headers.get("electric-up-to-date"), }; - console.log("[Electric] Direct Electric SQL response headers:", testHeaders); + debugLog("[Electric] Direct Electric SQL response headers:", testHeaders); const testData = await testResponse.json(); - console.log( + debugLog( "[Electric] Direct Electric SQL data count:", Array.isArray(testData) ? testData.length : "not array", testData @@ -404,14 +415,14 @@ export async function initElectric(userId: string): Promise { // Shorter timeout (5 seconds) as fallback setTimeout(() => { if (!syncResolved) { - console.warn( + debugWarn( `[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` ); // Check isUpToDate one more time before resolving // This will be checked after shape is created setTimeout(() => { if (!syncResolved) { - console.warn( + debugWarn( `[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s` ); resolveInitialSync(); @@ -450,7 +461,7 @@ export async function initElectric(userId: string): Promise { primaryKey, shapeKey, // Re-enabled for fast incremental sync (root cause in use-inbox.ts is fixed) onInitialSync: () => { - console.log( + debugLog( `[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite` ); resolveInitialSync(); @@ -466,7 +477,7 @@ export async function initElectric(userId: string): Promise { // Handle must-refetch: clear table data before Electric re-inserts from scratch // This prevents "duplicate key" errors when the shape is invalidated onMustRefetch: async (tx: Transaction) => { - console.log( + debugLog( `[Electric] ⚠️ Must refetch triggered for ${table} - clearing existing data` ); try { @@ -476,13 +487,13 @@ export async function initElectric(userId: string): Promise { // Parse the WHERE clause to build a DELETE statement // The WHERE clause is already validated and formatted await tx.exec(`DELETE FROM ${table} WHERE ${validatedWhere}`); - console.log( + debugLog( `[Electric] 🗑️ Cleared ${table} rows matching: ${validatedWhere}` ); } else { // No WHERE clause means we're syncing the entire table await tx.exec(`DELETE FROM ${table}`); - console.log(`[Electric] 🗑️ Cleared all rows from ${table}`); + debugLog(`[Electric] 🗑️ Cleared all rows from ${table}`); } } catch (cleanupError) { console.error( @@ -495,7 +506,7 @@ export async function initElectric(userId: string): Promise { }, }; - console.log( + debugLog( "[Electric] syncShapeToTable config:", JSON.stringify(shapeConfig, null, 2) ); @@ -508,7 +519,7 @@ export async function initElectric(userId: string): Promise { const errorMessage = syncError instanceof Error ? syncError.message : String(syncError); if (errorMessage.includes("Already syncing")) { - console.warn( + debugWarn( `[Electric] Already syncing ${table}, waiting for existing sync to settle...` ); @@ -518,12 +529,12 @@ export async function initElectric(userId: string): Promise { // Check if an active handle now exists (another sync might have completed) const existingHandle = activeSyncHandles.get(cacheKey); if (existingHandle) { - console.log(`[Electric] Found existing handle after waiting: ${cacheKey}`); + debugLog(`[Electric] Found existing handle after waiting: ${cacheKey}`); return existingHandle; } // Retry once after waiting - console.log(`[Electric] Retrying sync for ${table}...`); + debugLog(`[Electric] Retrying sync for ${table}...`); try { shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); } catch (retryError) { @@ -531,12 +542,12 @@ export async function initElectric(userId: string): Promise { retryError instanceof Error ? retryError.message : String(retryError); if (retryMessage.includes("Already syncing")) { // Still syncing - create a placeholder handle that indicates the table is being synced - console.warn( + debugWarn( `[Electric] ${table} still syncing, creating placeholder handle` ); const placeholderHandle: SyncHandle = { unsubscribe: () => { - console.log(`[Electric] Placeholder unsubscribe for: ${cacheKey}`); + debugLog(`[Electric] Placeholder unsubscribe for: ${cacheKey}`); activeSyncHandles.delete(cacheKey); }, get isUpToDate() { @@ -560,7 +571,7 @@ export async function initElectric(userId: string): Promise { } // Log the actual shape result structure - console.log("[Electric] Shape sync result (initial):", { + debugLog("[Electric] Shape sync result (initial):", { hasUnsubscribe: typeof shape?.unsubscribe === "function", isUpToDate: shape?.isUpToDate, hasStream: !!shape?.stream, @@ -569,7 +580,7 @@ export async function initElectric(userId: string): Promise { // Recommended Approach Step 1: Check isUpToDate immediately if (shape.isUpToDate) { - console.log( + debugLog( `[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)` ); resolveInitialSync(); @@ -577,7 +588,7 @@ export async function initElectric(userId: string): Promise { // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message if (shape?.stream) { const stream = shape.stream as any; - console.log("[Electric] Shape stream details:", { + debugLog("[Electric] Shape stream details:", { shapeHandle: stream?.shapeHandle, lastOffset: stream?.lastOffset, isUpToDate: stream?.isUpToDate, @@ -590,14 +601,14 @@ export async function initElectric(userId: string): Promise { // NOTE: We keep this subscription active - don't unsubscribe! // The stream is what Electric SQL uses for real-time updates if (typeof stream?.subscribe === "function") { - console.log( + debugLog( "[Electric] Subscribing to shape stream to watch for up-to-date message..." ); // Subscribe but don't store unsubscribe - we want it to stay active stream.subscribe((messages: unknown[]) => { // Continue receiving updates even after sync is resolved if (!syncResolved) { - console.log( + debugLog( "[Electric] 🔵 Shape stream received messages:", messages?.length || 0 ); @@ -614,14 +625,14 @@ export async function initElectric(userId: string): Promise { (typeof msg === "object" && "up-to-date" in msg) ) { if (!syncResolved) { - console.log(`[Electric] ✅ Received up-to-date message for ${table}`); + debugLog(`[Electric] ✅ Received up-to-date message for ${table}`); resolveInitialSync(); } // Continue listening for real-time updates - don't return! } } if (!syncResolved && messages.length > 0) { - console.log( + debugLog( "[Electric] First message:", JSON.stringify(messages[0], null, 2) ); @@ -630,14 +641,14 @@ export async function initElectric(userId: string): Promise { // Also check stream's isUpToDate property after receiving messages if (!syncResolved && stream?.isUpToDate) { - console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`); + debugLog(`[Electric] ✅ Stream isUpToDate is true for ${table}`); resolveInitialSync(); } }); // Also check stream's isUpToDate property immediately if (stream?.isUpToDate) { - console.log( + debugLog( `[Electric] ✅ Stream isUpToDate is true immediately for ${table}` ); resolveInitialSync(); @@ -652,7 +663,7 @@ export async function initElectric(userId: string): Promise { } if (shape.isUpToDate || stream?.isUpToDate) { - console.log( + debugLog( `[Electric] ✅ Sync completed (detected via polling) for ${table}` ); clearInterval(pollInterval); @@ -665,7 +676,7 @@ export async function initElectric(userId: string): Promise { clearInterval(pollInterval); }); } else { - console.warn( + debugWarn( `[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout` ); } @@ -674,7 +685,7 @@ export async function initElectric(userId: string): Promise { // Create the sync handle with proper cleanup const syncHandle: SyncHandle = { unsubscribe: () => { - console.log(`[Electric] Unsubscribing from: ${cacheKey}`); + debugLog(`[Electric] Unsubscribing from: ${cacheKey}`); // Remove from cache first activeSyncHandles.delete(cacheKey); // Then unsubscribe from the shape @@ -692,7 +703,7 @@ export async function initElectric(userId: string): Promise { // Cache the sync handle for reuse (memory optimization) activeSyncHandles.set(cacheKey, syncHandle); - console.log( + debugLog( `[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})` ); @@ -704,7 +715,7 @@ export async function initElectric(userId: string): Promise { const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { method: "GET", }); - console.log( + debugLog( "[Electric] Electric SQL server response:", response.status, response.statusText @@ -726,14 +737,14 @@ export async function initElectric(userId: string): Promise { // Clean up the pending sync when done (whether success or failure) syncPromise.finally(() => { pendingSyncs.delete(cacheKey); - console.log(`[Electric] Pending sync removed for: ${cacheKey}`); + debugLog(`[Electric] Pending sync removed for: ${cacheKey}`); }); return syncPromise; }, }; - console.log(`[Electric] ✅ Initialized successfully for user: ${userId}`); + debugLog(`[Electric] ✅ Initialized successfully for user: ${userId}`); return electricClient; } catch (error) { console.error("[Electric] Failed to initialize:", error); @@ -759,10 +770,10 @@ export async function cleanupElectric(): Promise { } const userIdToClean = currentUserId; - console.log(`[Electric] Cleaning up for user: ${userIdToClean}`); + debugLog(`[Electric] Cleaning up for user: ${userIdToClean}`); // Unsubscribe from all active sync handles first (memory cleanup) - console.log(`[Electric] Unsubscribing from ${activeSyncHandles.size} active sync handles`); + debugLog(`[Electric] Unsubscribing from ${activeSyncHandles.size} active sync handles`); // Copy keys to array to avoid mutation during iteration const handleKeys = Array.from(activeSyncHandles.keys()); for (const key of handleKeys) { @@ -771,7 +782,7 @@ export async function cleanupElectric(): Promise { try { handle.unsubscribe(); } catch (err) { - console.warn(`[Electric] Failed to unsubscribe from ${key}:`, err); + debugWarn(`[Electric] Failed to unsubscribe from ${key}:`, err); } } } @@ -782,7 +793,7 @@ export async function cleanupElectric(): Promise { try { // Close the PGlite database connection await electricClient.db.close(); - console.log("[Electric] Database closed"); + debugLog("[Electric] Database closed"); } catch (error) { console.error("[Electric] Error closing database:", error); } @@ -798,13 +809,13 @@ export async function cleanupElectric(): Promise { try { const dbName = `${DB_PREFIX}${userIdToClean}-v${SYNC_VERSION}`; window.indexedDB.deleteDatabase(dbName); - console.log(`[Electric] Deleted database: ${dbName}`); + debugLog(`[Electric] Deleted database: ${dbName}`); } catch (err) { - console.warn("[Electric] Failed to delete database:", err); + debugWarn("[Electric] Failed to delete database:", err); } } - console.log("[Electric] Cleanup complete"); + debugLog("[Electric] Cleanup complete"); } /**