feat: implement debug logging for development environment in Electric client

This commit is contained in:
Anish Sarkar 2026-02-04 20:08:13 +05:30
parent dec85b6417
commit d0a490fa50

View file

@ -16,6 +16,17 @@ import { PGlite, type Transaction } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live"; import { live } from "@electric-sql/pglite/live";
import { electricSync } from "@electric-sql/pglite-sync"; import { electricSync } from "@electric-sql/pglite-sync";
// Debug logging - only logs in development, silent in production
const IS_DEV = process.env.NODE_ENV === "development";
function debugLog(...args: unknown[]) {
if (IS_DEV) console.log(...args);
}
function debugWarn(...args: unknown[]) {
if (IS_DEV) console.warn(...args);
}
// Types // Types
export interface ElectricClient { export interface ElectricClient {
db: PGlite; db: PGlite;
@ -106,23 +117,23 @@ async function cleanupOtherUserDatabases(currentUserId: string): Promise<void> {
// Check if this is the current database // Check if this is the current database
// PGlite stores with "/pglite/" prefix, so we check if the name ENDS WITH our identifier // PGlite stores with "/pglite/" prefix, so we check if the name ENDS WITH our identifier
if (dbName.endsWith(currentDbIdentifier)) { if (dbName.endsWith(currentDbIdentifier)) {
console.log(`[Electric] Keeping current database: ${dbName}`); debugLog(`[Electric] Keeping current database: ${dbName}`);
continue; continue;
} }
// Delete ALL other databases (other users OR old versions of current user) // Delete ALL other databases (other users OR old versions of current user)
try { try {
console.log(`[Electric] Deleting stale database: ${dbName}`); debugLog(`[Electric] Deleting stale database: ${dbName}`);
window.indexedDB.deleteDatabase(dbName); window.indexedDB.deleteDatabase(dbName);
} catch (deleteErr) { } catch (deleteErr) {
console.warn(`[Electric] Failed to delete database ${dbName}:`, deleteErr); debugWarn(`[Electric] Failed to delete database ${dbName}:`, deleteErr);
} }
} }
} }
} }
} catch (err) { } catch (err) {
// indexedDB.databases() not supported - that's okay, login cleanup is best-effort // indexedDB.databases() not supported - that's okay, login cleanup is best-effort
console.warn("[Electric] Could not enumerate databases for cleanup:", err); debugWarn("[Electric] Could not enumerate databases for cleanup:", err);
} }
} }
@ -148,7 +159,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// If initialized for a different user, close the old client first // If initialized for a different user, close the old client first
if (electricClient && currentUserId !== userId) { if (electricClient && currentUserId !== userId) {
console.log(`[Electric] User changed from ${currentUserId} to ${userId}, reinitializing...`); debugLog(`[Electric] User changed from ${currentUserId} to ${userId}, reinitializing...`);
await cleanupElectric(); await cleanupElectric();
} }
@ -163,12 +174,12 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
initPromise = (async () => { initPromise = (async () => {
try { try {
// STEP 1: Clean up databases from other users (login-time cleanup) // STEP 1: Clean up databases from other users (login-time cleanup)
console.log("[Electric] Cleaning up databases from other users..."); debugLog("[Electric] Cleaning up databases from other users...");
await cleanupOtherUserDatabases(userId); await cleanupOtherUserDatabases(userId);
// STEP 2: Create user-specific PGlite database // STEP 2: Create user-specific PGlite database
const dbName = getDbName(userId); const dbName = getDbName(userId);
console.log(`[Electric] Initializing database: ${dbName}`); debugLog(`[Electric] Initializing database: ${dbName}`);
const db = await PGlite.create({ const db = await PGlite.create({
dataDir: dbName, dataDir: dbName,
@ -298,14 +309,14 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Check if we already have an active sync for this shape (memory optimization) // Check if we already have an active sync for this shape (memory optimization)
const existingHandle = activeSyncHandles.get(cacheKey); const existingHandle = activeSyncHandles.get(cacheKey);
if (existingHandle) { if (existingHandle) {
console.log(`[Electric] Reusing existing sync handle for: ${cacheKey}`); debugLog(`[Electric] Reusing existing sync handle for: ${cacheKey}`);
return existingHandle; return existingHandle;
} }
// Check if there's already a pending sync for this shape (prevent race condition) // Check if there's already a pending sync for this shape (prevent race condition)
const pendingSync = pendingSyncs.get(cacheKey); const pendingSync = pendingSyncs.get(cacheKey);
if (pendingSync) { if (pendingSync) {
console.log(`[Electric] Waiting for pending sync to complete: ${cacheKey}`); debugLog(`[Electric] Waiting for pending sync to complete: ${cacheKey}`);
return pendingSync; return pendingSync;
} }
@ -331,7 +342,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
if (singleQuoteCount % 2 !== 0) { if (singleQuoteCount % 2 !== 0) {
// Odd number of quotes means unterminated string literal // Odd number of quotes means unterminated string literal
console.warn("Where clause has unmatched quotes, fixing:", where); debugWarn("Where clause has unmatched quotes, fixing:", where);
// Add closing quote at the end // Add closing quote at the end
validatedWhere = `${where}'`; validatedWhere = `${where}'`;
params.where = validatedWhere; params.where = validatedWhere;
@ -345,15 +356,15 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
if (columns) params.columns = columns.join(","); if (columns) params.columns = columns.join(",");
console.log("[Electric] Syncing shape with params:", params); debugLog("[Electric] Syncing shape with params:", params);
console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`); debugLog("[Electric] Electric URL:", `${electricUrl}/v1/shape`);
console.log("[Electric] Where clause:", where, "Validated:", validatedWhere); debugLog("[Electric] Where clause:", where, "Validated:", validatedWhere);
try { try {
// Debug: Test Electric SQL connection directly first (DEV ONLY - skipped in production) // Debug: Test Electric SQL connection directly first (DEV ONLY - skipped in production)
if (process.env.NODE_ENV === "development") { if (process.env.NODE_ENV === "development") {
const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`; const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`;
console.log("[Electric] Testing Electric SQL directly:", testUrl); debugLog("[Electric] Testing Electric SQL directly:", testUrl);
try { try {
const testResponse = await fetch(testUrl); const testResponse = await fetch(testUrl);
const testHeaders = { const testHeaders = {
@ -361,9 +372,9 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
offset: testResponse.headers.get("electric-offset"), offset: testResponse.headers.get("electric-offset"),
upToDate: testResponse.headers.get("electric-up-to-date"), upToDate: testResponse.headers.get("electric-up-to-date"),
}; };
console.log("[Electric] Direct Electric SQL response headers:", testHeaders); debugLog("[Electric] Direct Electric SQL response headers:", testHeaders);
const testData = await testResponse.json(); const testData = await testResponse.json();
console.log( debugLog(
"[Electric] Direct Electric SQL data count:", "[Electric] Direct Electric SQL data count:",
Array.isArray(testData) ? testData.length : "not array", Array.isArray(testData) ? testData.length : "not array",
testData testData
@ -404,14 +415,14 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Shorter timeout (5 seconds) as fallback // Shorter timeout (5 seconds) as fallback
setTimeout(() => { setTimeout(() => {
if (!syncResolved) { if (!syncResolved) {
console.warn( debugWarn(
`[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` `[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`
); );
// Check isUpToDate one more time before resolving // Check isUpToDate one more time before resolving
// This will be checked after shape is created // This will be checked after shape is created
setTimeout(() => { setTimeout(() => {
if (!syncResolved) { if (!syncResolved) {
console.warn( debugWarn(
`[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s` `[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s`
); );
resolveInitialSync(); resolveInitialSync();
@ -450,7 +461,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
primaryKey, primaryKey,
shapeKey, // Re-enabled for fast incremental sync (root cause in use-inbox.ts is fixed) shapeKey, // Re-enabled for fast incremental sync (root cause in use-inbox.ts is fixed)
onInitialSync: () => { onInitialSync: () => {
console.log( debugLog(
`[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite` `[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite`
); );
resolveInitialSync(); resolveInitialSync();
@ -466,7 +477,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Handle must-refetch: clear table data before Electric re-inserts from scratch // Handle must-refetch: clear table data before Electric re-inserts from scratch
// This prevents "duplicate key" errors when the shape is invalidated // This prevents "duplicate key" errors when the shape is invalidated
onMustRefetch: async (tx: Transaction) => { onMustRefetch: async (tx: Transaction) => {
console.log( debugLog(
`[Electric] ⚠️ Must refetch triggered for ${table} - clearing existing data` `[Electric] ⚠️ Must refetch triggered for ${table} - clearing existing data`
); );
try { try {
@ -476,13 +487,13 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Parse the WHERE clause to build a DELETE statement // Parse the WHERE clause to build a DELETE statement
// The WHERE clause is already validated and formatted // The WHERE clause is already validated and formatted
await tx.exec(`DELETE FROM ${table} WHERE ${validatedWhere}`); await tx.exec(`DELETE FROM ${table} WHERE ${validatedWhere}`);
console.log( debugLog(
`[Electric] 🗑️ Cleared ${table} rows matching: ${validatedWhere}` `[Electric] 🗑️ Cleared ${table} rows matching: ${validatedWhere}`
); );
} else { } else {
// No WHERE clause means we're syncing the entire table // No WHERE clause means we're syncing the entire table
await tx.exec(`DELETE FROM ${table}`); await tx.exec(`DELETE FROM ${table}`);
console.log(`[Electric] 🗑️ Cleared all rows from ${table}`); debugLog(`[Electric] 🗑️ Cleared all rows from ${table}`);
} }
} catch (cleanupError) { } catch (cleanupError) {
console.error( console.error(
@ -495,7 +506,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
}, },
}; };
console.log( debugLog(
"[Electric] syncShapeToTable config:", "[Electric] syncShapeToTable config:",
JSON.stringify(shapeConfig, null, 2) JSON.stringify(shapeConfig, null, 2)
); );
@ -508,7 +519,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
const errorMessage = const errorMessage =
syncError instanceof Error ? syncError.message : String(syncError); syncError instanceof Error ? syncError.message : String(syncError);
if (errorMessage.includes("Already syncing")) { if (errorMessage.includes("Already syncing")) {
console.warn( debugWarn(
`[Electric] Already syncing ${table}, waiting for existing sync to settle...` `[Electric] Already syncing ${table}, waiting for existing sync to settle...`
); );
@ -518,12 +529,12 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Check if an active handle now exists (another sync might have completed) // Check if an active handle now exists (another sync might have completed)
const existingHandle = activeSyncHandles.get(cacheKey); const existingHandle = activeSyncHandles.get(cacheKey);
if (existingHandle) { if (existingHandle) {
console.log(`[Electric] Found existing handle after waiting: ${cacheKey}`); debugLog(`[Electric] Found existing handle after waiting: ${cacheKey}`);
return existingHandle; return existingHandle;
} }
// Retry once after waiting // Retry once after waiting
console.log(`[Electric] Retrying sync for ${table}...`); debugLog(`[Electric] Retrying sync for ${table}...`);
try { try {
shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig); shape = await pgWithElectric.electric.syncShapeToTable(shapeConfig);
} catch (retryError) { } catch (retryError) {
@ -531,12 +542,12 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
retryError instanceof Error ? retryError.message : String(retryError); retryError instanceof Error ? retryError.message : String(retryError);
if (retryMessage.includes("Already syncing")) { if (retryMessage.includes("Already syncing")) {
// Still syncing - create a placeholder handle that indicates the table is being synced // Still syncing - create a placeholder handle that indicates the table is being synced
console.warn( debugWarn(
`[Electric] ${table} still syncing, creating placeholder handle` `[Electric] ${table} still syncing, creating placeholder handle`
); );
const placeholderHandle: SyncHandle = { const placeholderHandle: SyncHandle = {
unsubscribe: () => { unsubscribe: () => {
console.log(`[Electric] Placeholder unsubscribe for: ${cacheKey}`); debugLog(`[Electric] Placeholder unsubscribe for: ${cacheKey}`);
activeSyncHandles.delete(cacheKey); activeSyncHandles.delete(cacheKey);
}, },
get isUpToDate() { get isUpToDate() {
@ -560,7 +571,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
} }
// Log the actual shape result structure // Log the actual shape result structure
console.log("[Electric] Shape sync result (initial):", { debugLog("[Electric] Shape sync result (initial):", {
hasUnsubscribe: typeof shape?.unsubscribe === "function", hasUnsubscribe: typeof shape?.unsubscribe === "function",
isUpToDate: shape?.isUpToDate, isUpToDate: shape?.isUpToDate,
hasStream: !!shape?.stream, hasStream: !!shape?.stream,
@ -569,7 +580,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Recommended Approach Step 1: Check isUpToDate immediately // Recommended Approach Step 1: Check isUpToDate immediately
if (shape.isUpToDate) { if (shape.isUpToDate) {
console.log( debugLog(
`[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)` `[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)`
); );
resolveInitialSync(); resolveInitialSync();
@ -577,7 +588,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message
if (shape?.stream) { if (shape?.stream) {
const stream = shape.stream as any; const stream = shape.stream as any;
console.log("[Electric] Shape stream details:", { debugLog("[Electric] Shape stream details:", {
shapeHandle: stream?.shapeHandle, shapeHandle: stream?.shapeHandle,
lastOffset: stream?.lastOffset, lastOffset: stream?.lastOffset,
isUpToDate: stream?.isUpToDate, isUpToDate: stream?.isUpToDate,
@ -590,14 +601,14 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// NOTE: We keep this subscription active - don't unsubscribe! // NOTE: We keep this subscription active - don't unsubscribe!
// The stream is what Electric SQL uses for real-time updates // The stream is what Electric SQL uses for real-time updates
if (typeof stream?.subscribe === "function") { if (typeof stream?.subscribe === "function") {
console.log( debugLog(
"[Electric] Subscribing to shape stream to watch for up-to-date message..." "[Electric] Subscribing to shape stream to watch for up-to-date message..."
); );
// Subscribe but don't store unsubscribe - we want it to stay active // Subscribe but don't store unsubscribe - we want it to stay active
stream.subscribe((messages: unknown[]) => { stream.subscribe((messages: unknown[]) => {
// Continue receiving updates even after sync is resolved // Continue receiving updates even after sync is resolved
if (!syncResolved) { if (!syncResolved) {
console.log( debugLog(
"[Electric] 🔵 Shape stream received messages:", "[Electric] 🔵 Shape stream received messages:",
messages?.length || 0 messages?.length || 0
); );
@ -614,14 +625,14 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
(typeof msg === "object" && "up-to-date" in msg) (typeof msg === "object" && "up-to-date" in msg)
) { ) {
if (!syncResolved) { if (!syncResolved) {
console.log(`[Electric] ✅ Received up-to-date message for ${table}`); debugLog(`[Electric] ✅ Received up-to-date message for ${table}`);
resolveInitialSync(); resolveInitialSync();
} }
// Continue listening for real-time updates - don't return! // Continue listening for real-time updates - don't return!
} }
} }
if (!syncResolved && messages.length > 0) { if (!syncResolved && messages.length > 0) {
console.log( debugLog(
"[Electric] First message:", "[Electric] First message:",
JSON.stringify(messages[0], null, 2) JSON.stringify(messages[0], null, 2)
); );
@ -630,14 +641,14 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Also check stream's isUpToDate property after receiving messages // Also check stream's isUpToDate property after receiving messages
if (!syncResolved && stream?.isUpToDate) { if (!syncResolved && stream?.isUpToDate) {
console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`); debugLog(`[Electric] ✅ Stream isUpToDate is true for ${table}`);
resolveInitialSync(); resolveInitialSync();
} }
}); });
// Also check stream's isUpToDate property immediately // Also check stream's isUpToDate property immediately
if (stream?.isUpToDate) { if (stream?.isUpToDate) {
console.log( debugLog(
`[Electric] ✅ Stream isUpToDate is true immediately for ${table}` `[Electric] ✅ Stream isUpToDate is true immediately for ${table}`
); );
resolveInitialSync(); resolveInitialSync();
@ -652,7 +663,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
} }
if (shape.isUpToDate || stream?.isUpToDate) { if (shape.isUpToDate || stream?.isUpToDate) {
console.log( debugLog(
`[Electric] ✅ Sync completed (detected via polling) for ${table}` `[Electric] ✅ Sync completed (detected via polling) for ${table}`
); );
clearInterval(pollInterval); clearInterval(pollInterval);
@ -665,7 +676,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
clearInterval(pollInterval); clearInterval(pollInterval);
}); });
} else { } else {
console.warn( debugWarn(
`[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout` `[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout`
); );
} }
@ -674,7 +685,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Create the sync handle with proper cleanup // Create the sync handle with proper cleanup
const syncHandle: SyncHandle = { const syncHandle: SyncHandle = {
unsubscribe: () => { unsubscribe: () => {
console.log(`[Electric] Unsubscribing from: ${cacheKey}`); debugLog(`[Electric] Unsubscribing from: ${cacheKey}`);
// Remove from cache first // Remove from cache first
activeSyncHandles.delete(cacheKey); activeSyncHandles.delete(cacheKey);
// Then unsubscribe from the shape // Then unsubscribe from the shape
@ -692,7 +703,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Cache the sync handle for reuse (memory optimization) // Cache the sync handle for reuse (memory optimization)
activeSyncHandles.set(cacheKey, syncHandle); activeSyncHandles.set(cacheKey, syncHandle);
console.log( debugLog(
`[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})` `[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})`
); );
@ -704,7 +715,7 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, {
method: "GET", method: "GET",
}); });
console.log( debugLog(
"[Electric] Electric SQL server response:", "[Electric] Electric SQL server response:",
response.status, response.status,
response.statusText response.statusText
@ -726,14 +737,14 @@ export async function initElectric(userId: string): Promise<ElectricClient> {
// Clean up the pending sync when done (whether success or failure) // Clean up the pending sync when done (whether success or failure)
syncPromise.finally(() => { syncPromise.finally(() => {
pendingSyncs.delete(cacheKey); pendingSyncs.delete(cacheKey);
console.log(`[Electric] Pending sync removed for: ${cacheKey}`); debugLog(`[Electric] Pending sync removed for: ${cacheKey}`);
}); });
return syncPromise; return syncPromise;
}, },
}; };
console.log(`[Electric] ✅ Initialized successfully for user: ${userId}`); debugLog(`[Electric] ✅ Initialized successfully for user: ${userId}`);
return electricClient; return electricClient;
} catch (error) { } catch (error) {
console.error("[Electric] Failed to initialize:", error); console.error("[Electric] Failed to initialize:", error);
@ -759,10 +770,10 @@ export async function cleanupElectric(): Promise<void> {
} }
const userIdToClean = currentUserId; const userIdToClean = currentUserId;
console.log(`[Electric] Cleaning up for user: ${userIdToClean}`); debugLog(`[Electric] Cleaning up for user: ${userIdToClean}`);
// Unsubscribe from all active sync handles first (memory cleanup) // Unsubscribe from all active sync handles first (memory cleanup)
console.log(`[Electric] Unsubscribing from ${activeSyncHandles.size} active sync handles`); debugLog(`[Electric] Unsubscribing from ${activeSyncHandles.size} active sync handles`);
// Copy keys to array to avoid mutation during iteration // Copy keys to array to avoid mutation during iteration
const handleKeys = Array.from(activeSyncHandles.keys()); const handleKeys = Array.from(activeSyncHandles.keys());
for (const key of handleKeys) { for (const key of handleKeys) {
@ -771,7 +782,7 @@ export async function cleanupElectric(): Promise<void> {
try { try {
handle.unsubscribe(); handle.unsubscribe();
} catch (err) { } catch (err) {
console.warn(`[Electric] Failed to unsubscribe from ${key}:`, err); debugWarn(`[Electric] Failed to unsubscribe from ${key}:`, err);
} }
} }
} }
@ -782,7 +793,7 @@ export async function cleanupElectric(): Promise<void> {
try { try {
// Close the PGlite database connection // Close the PGlite database connection
await electricClient.db.close(); await electricClient.db.close();
console.log("[Electric] Database closed"); debugLog("[Electric] Database closed");
} catch (error) { } catch (error) {
console.error("[Electric] Error closing database:", error); console.error("[Electric] Error closing database:", error);
} }
@ -798,13 +809,13 @@ export async function cleanupElectric(): Promise<void> {
try { try {
const dbName = `${DB_PREFIX}${userIdToClean}-v${SYNC_VERSION}`; const dbName = `${DB_PREFIX}${userIdToClean}-v${SYNC_VERSION}`;
window.indexedDB.deleteDatabase(dbName); window.indexedDB.deleteDatabase(dbName);
console.log(`[Electric] Deleted database: ${dbName}`); debugLog(`[Electric] Deleted database: ${dbName}`);
} catch (err) { } catch (err) {
console.warn("[Electric] Failed to delete database:", err); debugWarn("[Electric] Failed to delete database:", err);
} }
} }
console.log("[Electric] Cleanup complete"); debugLog("[Electric] Cleanup complete");
} }
/** /**