diff --git a/surfsense_web/components/UserDropdown.tsx b/surfsense_web/components/UserDropdown.tsx index a7f9c89ac..3dac745cf 100644 --- a/surfsense_web/components/UserDropdown.tsx +++ b/surfsense_web/components/UserDropdown.tsx @@ -13,6 +13,7 @@ import { DropdownMenuSeparator, DropdownMenuTrigger, } from "@/components/ui/dropdown-menu"; +import { cleanupElectric } from "@/lib/electric/client"; import { resetUser, trackLogout } from "@/lib/posthog/events"; export function UserDropdown({ @@ -26,12 +27,20 @@ export function UserDropdown({ }) { const router = useRouter(); - const handleLogout = () => { + const handleLogout = async () => { try { // Track logout event and reset PostHog identity trackLogout(); resetUser(); + // Best-effort cleanup of Electric SQL / PGlite + // Even if this fails, login-time cleanup will handle it + try { + await cleanupElectric(); + } catch (err) { + console.warn("[Logout] Electric cleanup failed (will be handled on next login):", err); + } + if (typeof window !== "undefined") { localStorage.removeItem("surfsense_bearer_token"); window.location.href = "/"; @@ -40,7 +49,7 @@ export function UserDropdown({ console.error("Error during logout:", error); // Optionally, provide user feedback if (typeof window !== "undefined") { - alert("Logout failed. Please try again."); + localStorage.removeItem("surfsense_bearer_token"); window.location.href = "/"; } } diff --git a/surfsense_web/components/layout/providers/LayoutDataProvider.tsx b/surfsense_web/components/layout/providers/LayoutDataProvider.tsx index 95ff5d782..7f55e295f 100644 --- a/surfsense_web/components/layout/providers/LayoutDataProvider.tsx +++ b/surfsense_web/components/layout/providers/LayoutDataProvider.tsx @@ -21,6 +21,7 @@ import { } from "@/components/ui/dialog"; import { searchSpacesApiService } from "@/lib/apis/search-spaces-api.service"; import { deleteThread, fetchThreads } from "@/lib/chat/thread-persistence"; +import { cleanupElectric } from "@/lib/electric/client"; import { resetUser, trackLogout } from "@/lib/posthog/events"; import { cacheKeys } from "@/lib/query-client/cache-keys"; import type { ChatItem, NavItem, SearchSpace } from "../types/layout.types"; @@ -278,10 +279,19 @@ export function LayoutDataProvider({ router.push(`/dashboard/${searchSpaceId}/team`); }, [router, searchSpaceId]); - const handleLogout = useCallback(() => { + const handleLogout = useCallback(async () => { try { trackLogout(); resetUser(); + + // Best-effort cleanup of Electric SQL / PGlite + // Even if this fails, login-time cleanup will handle it + try { + await cleanupElectric(); + } catch (err) { + console.warn("[Logout] Electric cleanup failed (will be handled on next login):", err); + } + if (typeof window !== "undefined") { localStorage.removeItem("surfsense_bearer_token"); router.push("/"); diff --git a/surfsense_web/components/notifications/NotificationButton.tsx b/surfsense_web/components/notifications/NotificationButton.tsx index 0b81844ca..b1edd2254 100644 --- a/surfsense_web/components/notifications/NotificationButton.tsx +++ b/surfsense_web/components/notifications/NotificationButton.tsx @@ -9,12 +9,20 @@ import { useAtomValue } from "jotai"; import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { NotificationPopup } from "./NotificationPopup"; import { cn } from "@/lib/utils"; +import { useParams } from "next/navigation"; export function NotificationButton() { const { data: user } = useAtomValue(currentUserAtom); + const params = useParams(); + const userId = user?.id ? String(user.id) : null; + // Get searchSpaceId from URL params - the component is rendered within /dashboard/[search_space_id]/ + const searchSpaceId = params?.search_space_id + ? Number(params.search_space_id) + : null; + const { notifications, unreadCount, loading, markAsRead, markAllAsRead } = - useNotifications(userId); + useNotifications(userId, searchSpaceId); return ( diff --git a/surfsense_web/components/providers/ElectricProvider.tsx b/surfsense_web/components/providers/ElectricProvider.tsx index bff05d209..9a296f314 100644 --- a/surfsense_web/components/providers/ElectricProvider.tsx +++ b/surfsense_web/components/providers/ElectricProvider.tsx @@ -1,44 +1,95 @@ "use client"; -import { useEffect, useState } from "react"; -import { initElectric, isElectricInitialized } from "@/lib/electric/client"; +import { useEffect, useState, useRef } from "react"; +import { useAtomValue } from "jotai"; +import { currentUserAtom } from "@/atoms/user/user-query.atoms"; +import { + initElectric, + cleanupElectric, + isElectricInitialized, + type ElectricClient, +} from "@/lib/electric/client"; +import { ElectricContext } from "@/lib/electric/context"; interface ElectricProviderProps { children: React.ReactNode; } /** - * ElectricProvider initializes the Electric SQL client with PGlite + * ElectricProvider initializes the Electric SQL client with user-specific PGlite database + * and provides it to children via context. * - * This provider ensures Electric is initialized before rendering children, - * but doesn't block if initialization fails (app can still work without real-time sync) + * KEY BEHAVIORS: + * 1. Single initialization point - only this provider creates the Electric client + * 2. Creates user-specific database (isolated per user) + * 3. Cleans up other users' databases on login + * 4. Re-initializes when user changes + * 5. Provides client via context - hooks should use useElectricClient() */ export function ElectricProvider({ children }: ElectricProviderProps) { - const [initialized, setInitialized] = useState(false); + const [electricClient, setElectricClient] = useState(null); const [error, setError] = useState(null); + const { data: user, isSuccess: isUserLoaded, isError: isUserError } = useAtomValue(currentUserAtom); + const previousUserIdRef = useRef(null); + const initializingRef = useRef(false); useEffect(() => { - // Skip if already initialized - if (isElectricInitialized()) { - setInitialized(true); + // Skip on server side + if (typeof window === "undefined") return; + + // If no user is logged in, don't initialize Electric + // The app can still function without real-time sync for non-authenticated pages + if (!isUserLoaded || !user?.id) { + // If we had a previous user and now logged out, cleanup + if (previousUserIdRef.current && isElectricInitialized()) { + console.log("[ElectricProvider] User logged out, cleaning up..."); + cleanupElectric().then(() => { + previousUserIdRef.current = null; + setElectricClient(null); + }); + } return; } + const userId = String(user.id); + + // If already initialized for THIS user, skip + if (electricClient && previousUserIdRef.current === userId) { + return; + } + + // Prevent concurrent initialization attempts + if (initializingRef.current) { + return; + } + + // User changed or first initialization + initializingRef.current = true; let mounted = true; async function init() { try { - await initElectric(); + console.log(`[ElectricProvider] Initializing for user: ${userId}`); + + // If different user was previously initialized, cleanup will happen inside initElectric + const client = await initElectric(userId); + if (mounted) { - setInitialized(true); + previousUserIdRef.current = userId; + setElectricClient(client); setError(null); + console.log(`[ElectricProvider] ✅ Ready for user: ${userId}`); } } catch (err) { - console.error("Failed to initialize Electric SQL:", err); + console.error("[ElectricProvider] Failed to initialize:", err); if (mounted) { setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL")); - // Don't block rendering if Electric SQL fails - app can still work - setInitialized(true); + // Set client to null so hooks know initialization failed + setElectricClient(null); + } + } finally { + if (mounted) { + initializingRef.current = false; } } } @@ -48,22 +99,38 @@ export function ElectricProvider({ children }: ElectricProviderProps) { return () => { mounted = false; }; - }, []); + }, [user?.id, isUserLoaded, electricClient]); - // Show loading state only briefly, then render children - // Electric SQL will sync in the background - if (!initialized) { + // For non-authenticated pages (like landing page), render immediately with null context + // Also render immediately if user query failed (e.g., token expired) + if (!isUserLoaded || !user?.id || isUserError) { return ( -
-
Initializing...
-
+ + {children} + ); } - // If there's an error, still render children but log the error - if (error) { - console.warn("Electric SQL initialization failed, notifications may not sync:", error.message); + // Show loading state while initializing for authenticated users + if (!electricClient && !error) { + return ( + +
+
Initializing...
+
+
+ ); } - return <>{children}; + // If there's an error, still render but warn + if (error) { + console.warn("[ElectricProvider] Initialization failed, sync may not work:", error.message); + } + + // Provide the Electric client to children + return ( + + {children} + + ); } diff --git a/surfsense_web/hooks/use-connectors-electric.ts b/surfsense_web/hooks/use-connectors-electric.ts index d750cfdf3..77c18f0a5 100644 --- a/surfsense_web/hooks/use-connectors-electric.ts +++ b/surfsense_web/hooks/use-connectors-electric.ts @@ -1,24 +1,28 @@ "use client"; import { useEffect, useState, useCallback, useRef } from "react"; -import { - initElectric, - isElectricInitialized, - type ElectricClient, - type SyncHandle, -} from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; +import type { SyncHandle } from "@/lib/electric/client"; import type { SearchSourceConnector } from "@/contracts/types/connector.types"; +/** + * Hook for managing connectors with Electric SQL real-time sync + * + * Uses the Electric client from context (provided by ElectricProvider) + * instead of initializing its own - prevents race conditions and memory leaks + */ export function useConnectorsElectric(searchSpaceId: number | string | null) { - const [electric, setElectric] = useState(null); + // Get Electric client from context - ElectricProvider handles initialization + const electricClient = useElectricClient(); + const [connectors, setConnectors] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const syncHandleRef = useRef(null); const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); + const syncKeyRef = useRef(null); // Transform connector data from Electric SQL/PGlite to match expected format - // Converts Date objects to ISO strings as expected by Zod schema function transformConnector(connector: any): SearchSourceConnector { return { ...connector, @@ -36,69 +40,57 @@ export function useConnectorsElectric(searchSpaceId: number | string | null) { ? typeof connector.created_at === "string" ? connector.created_at : new Date(connector.created_at).toISOString() - : new Date().toISOString(), // fallback + : new Date().toISOString(), }; } - // Initialize Electric SQL and start syncing with real-time updates + // Start syncing when Electric client is available useEffect(() => { - if (!searchSpaceId) { - setLoading(false); - setConnectors([]); + // Wait for both searchSpaceId and Electric client to be available + if (!searchSpaceId || !electricClient) { + setLoading(!electricClient); // Still loading if waiting for Electric + if (!searchSpaceId) { + setConnectors([]); + } + return; + } + + // Create a unique key for this sync to prevent duplicate subscriptions + const syncKey = `connectors_${searchSpaceId}`; + if (syncKeyRef.current === syncKey) { + // Already syncing for this search space return; } let mounted = true; + syncKeyRef.current = syncKey; - async function init() { + async function startSync() { try { - const electricClient = await initElectric(); - if (!mounted) return; + console.log("[useConnectorsElectric] Starting sync for search space:", searchSpaceId); - setElectric(electricClient); - - // Start syncing connectors for this search space via Electric SQL - console.log("Starting Electric SQL sync for connectors, search_space_id:", searchSpaceId); - - // Use numeric format for WHERE clause (PGlite sync plugin expects this format) const handle = await electricClient.syncShape({ table: "search_source_connectors", where: `search_space_id = ${searchSpaceId}`, primaryKey: ["id"], }); - console.log("Electric SQL sync started for connectors:", { + console.log("[useConnectorsElectric] Sync started:", { isUpToDate: handle.isUpToDate, - hasStream: !!handle.stream, - hasInitialSyncPromise: !!handle.initialSyncPromise, }); - // Optimized: Check if already up-to-date before waiting - if (handle.isUpToDate) { - console.log("Connectors sync already up-to-date, skipping wait"); - } else if (handle.initialSyncPromise) { - // Only wait if not already up-to-date - console.log("Waiting for initial connectors sync to complete..."); + // Wait for initial sync with timeout + if (!handle.isUpToDate && handle.initialSyncPromise) { try { - // Use Promise.race with a shorter timeout to avoid long waits await Promise.race([ handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait + new Promise((resolve) => setTimeout(resolve, 2000)), ]); - console.log("Initial connectors sync promise resolved or timed out, checking status:", { - isUpToDate: handle.isUpToDate, - }); } catch (syncErr) { - console.error("Initial connectors sync failed:", syncErr); + console.error("[useConnectorsElectric] Initial sync failed:", syncErr); } } - // Check status after waiting - console.log("Connectors sync status after waiting:", { - isUpToDate: handle.isUpToDate, - hasStream: !!handle.stream, - }); - if (!mounted) { handle.unsubscribe(); return; @@ -108,108 +100,104 @@ export function useConnectorsElectric(searchSpaceId: number | string | null) { setLoading(false); setError(null); - // Fetch connectors after sync is complete (we already waited above) - await fetchConnectors(electricClient.db); + // Fetch initial connectors + await fetchConnectors(); - // Set up real-time updates using PGlite live queries - // Electric SQL syncs data to PGlite in real-time via HTTP streaming - // PGlite live queries detect when the synced data changes and trigger callbacks - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any; - - // Use PGlite's live query API for real-time updates - // CORRECT API: await db.live.query() then use .subscribe() - if (db.live?.query && typeof db.live.query === "function") { - // IMPORTANT: db.live.query() returns a Promise - must await it! - const liveQuery = await db.live.query( - `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - // Set initial results immediately from the resolved query - if (liveQuery.initialResults?.rows) { - console.log( - "📋 Initial live query results for connectors:", - liveQuery.initialResults.rows.length - ); - setConnectors(liveQuery.initialResults.rows.map(transformConnector)); - } else if (liveQuery.rows) { - // Some versions have rows directly on the result - console.log( - "📋 Initial live query results for connectors (direct):", - liveQuery.rows.length - ); - setConnectors(liveQuery.rows.map(transformConnector)); - } - - // Subscribe to changes - this is the correct API! - // The callback fires automatically when Electric SQL syncs new data to PGlite - if (typeof liveQuery.subscribe === "function") { - liveQuery.subscribe((result: { rows: any[] }) => { - if (mounted && result.rows) { - console.log("🔄 Connectors updated via live query:", result.rows.length); - setConnectors(result.rows.map(transformConnector)); - } - }); - - // Store unsubscribe function for cleanup - liveQueryRef.current = liveQuery; - } - } else { - console.warn("PGlite live query API not available, falling back to polling"); - } - } catch (liveQueryErr) { - console.error("Failed to set up live query for connectors:", liveQueryErr); - // Don't fail completely - we still have the initial fetch - } + // Set up live query for real-time updates + await setupLiveQuery(); } catch (err) { - console.error("Failed to initialize Electric SQL for connectors:", err); - if (mounted) { - setError( - err instanceof Error - ? err - : new Error("Failed to initialize Electric SQL for connectors") - ); - setLoading(false); - } + if (!mounted) return; + console.error("[useConnectorsElectric] Failed to start sync:", err); + setError(err instanceof Error ? err : new Error("Failed to sync connectors")); + setLoading(false); } } - init(); + async function fetchConnectors() { + try { + const result = await electricClient.db.query( + `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ); + if (mounted) { + setConnectors((result.rows || []).map(transformConnector)); + } + } catch (err) { + console.error("[useConnectorsElectric] Failed to fetch:", err); + } + } + + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = electricClient.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + const liveQuery = await db.live.query( + `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + // Set initial results + if (liveQuery.initialResults?.rows) { + setConnectors(liveQuery.initialResults.rows.map(transformConnector)); + } else if (liveQuery.rows) { + setConnectors(liveQuery.rows.map(transformConnector)); + } + + // Subscribe to changes + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: any[] }) => { + if (mounted && result.rows) { + setConnectors(result.rows.map(transformConnector)); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch (liveErr) { + console.error("[useConnectorsElectric] Failed to set up live query:", liveErr); + } + } + + startSync(); return () => { mounted = false; - syncHandleRef.current?.unsubscribe?.(); - liveQueryRef.current?.unsubscribe?.(); - syncHandleRef.current = null; - liveQueryRef.current = null; + syncKeyRef.current = null; + + if (syncHandleRef.current) { + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; + } + if (liveQueryRef.current) { + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; + } }; - }, [searchSpaceId]); - - async function fetchConnectors(db: any) { - try { - const result = await db.query( - `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - console.log("📋 Fetched connectors from PGlite:", result.rows?.length || 0); - setConnectors((result.rows || []).map(transformConnector)); - } catch (err) { - console.error("Failed to fetch connectors from PGlite:", err); - } - } + }, [searchSpaceId, electricClient]); // Manual refresh function (optional, for fallback) const refreshConnectors = useCallback(async () => { - if (!electric) return; - await fetchConnectors(electric.db); - }, [electric]); + if (!electricClient) return; + try { + const result = await electricClient.db.query( + `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ); + setConnectors((result.rows || []).map(transformConnector)); + } catch (err) { + console.error("[useConnectorsElectric] Failed to refresh:", err); + } + }, [electricClient, searchSpaceId]); return { connectors, loading, error, refreshConnectors }; } diff --git a/surfsense_web/hooks/use-documents-electric.ts b/surfsense_web/hooks/use-documents-electric.ts index 985b8c6c6..6e50c775c 100644 --- a/surfsense_web/hooks/use-documents-electric.ts +++ b/surfsense_web/hooks/use-documents-electric.ts @@ -1,7 +1,8 @@ "use client"; import { useEffect, useState, useRef, useMemo } from "react"; -import { initElectric, type ElectricClient, type SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; +import type { SyncHandle } from "@/lib/electric/client"; interface Document { id: number; @@ -10,13 +11,22 @@ interface Document { created_at: string; } +/** + * Hook for managing documents with Electric SQL real-time sync + * + * Uses the Electric client from context (provided by ElectricProvider) + * instead of initializing its own - prevents race conditions and memory leaks + */ export function useDocumentsElectric(searchSpaceId: number | string | null) { - const [electric, setElectric] = useState(null); + // Get Electric client from context - ElectricProvider handles initialization + const electricClient = useElectricClient(); + const [documents, setDocuments] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const syncHandleRef = useRef(null); const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); + const syncKeyRef = useRef(null); // Calculate document type counts from synced documents const documentTypeCounts = useMemo(() => { @@ -29,26 +39,30 @@ export function useDocumentsElectric(searchSpaceId: number | string | null) { return counts; }, [documents]); - // Initialize Electric SQL and start syncing with real-time updates + // Start syncing when Electric client is available useEffect(() => { - if (!searchSpaceId) { - setLoading(false); - setDocuments([]); + // Wait for both searchSpaceId and Electric client to be available + if (!searchSpaceId || !electricClient) { + setLoading(!electricClient); // Still loading if waiting for Electric + if (!searchSpaceId) { + setDocuments([]); + } + return; + } + + // Create a unique key for this sync to prevent duplicate subscriptions + const syncKey = `documents_${searchSpaceId}`; + if (syncKeyRef.current === syncKey) { + // Already syncing for this search space return; } let mounted = true; + syncKeyRef.current = syncKey; - async function init() { + async function startSync() { try { - const electricClient = await initElectric(); - if (!mounted) return; - - setElectric(electricClient); - - // Start syncing documents for this search space via Electric SQL - // Only sync id, document_type, search_space_id columns for efficiency - console.log("Starting Electric SQL sync for documents, search_space_id:", searchSpaceId); + console.log("[useDocumentsElectric] Starting sync for search space:", searchSpaceId); const handle = await electricClient.syncShape({ table: "documents", @@ -57,38 +71,22 @@ export function useDocumentsElectric(searchSpaceId: number | string | null) { primaryKey: ["id"], }); - console.log("Electric SQL sync started for documents:", { + console.log("[useDocumentsElectric] Sync started:", { isUpToDate: handle.isUpToDate, - hasStream: !!handle.stream, - hasInitialSyncPromise: !!handle.initialSyncPromise, }); - // Optimized: Check if already up-to-date before waiting - if (handle.isUpToDate) { - console.log("Documents sync already up-to-date, skipping wait"); - } else if (handle.initialSyncPromise) { - // Only wait if not already up-to-date - console.log("Waiting for initial documents sync to complete..."); + // Wait for initial sync with timeout + if (!handle.isUpToDate && handle.initialSyncPromise) { try { - // Use Promise.race with a shorter timeout to avoid long waits await Promise.race([ handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait + new Promise((resolve) => setTimeout(resolve, 2000)), ]); - console.log("Initial documents sync promise resolved or timed out, checking status:", { - isUpToDate: handle.isUpToDate, - }); } catch (syncErr) { - console.error("Initial documents sync failed:", syncErr); + console.error("[useDocumentsElectric] Initial sync failed:", syncErr); } } - // Check status after waiting - console.log("Documents sync status after waiting:", { - isUpToDate: handle.isUpToDate, - hasStream: !!handle.stream, - }); - if (!mounted) { handle.unsubscribe(); return; @@ -98,104 +96,90 @@ export function useDocumentsElectric(searchSpaceId: number | string | null) { setLoading(false); setError(null); - // Fetch documents after sync is complete (we already waited above) - await fetchDocuments(electricClient.db); + // Fetch initial documents + await fetchDocuments(); - // Set up real-time updates using PGlite live queries - // Electric SQL syncs data to PGlite in real-time via HTTP streaming - // PGlite live queries detect when the synced data changes and trigger callbacks - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any; - - // Use PGlite's live query API for real-time updates - // CORRECT API: await db.live.query() then use .subscribe() - if (db.live?.query && typeof db.live.query === "function") { - // IMPORTANT: db.live.query() returns a Promise - must await it! - const liveQuery = await db.live.query( - `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - // Set initial results immediately from the resolved query - if (liveQuery.initialResults?.rows) { - console.log( - "📋 Initial live query results for documents:", - liveQuery.initialResults.rows.length - ); - setDocuments(liveQuery.initialResults.rows); - } else if (liveQuery.rows) { - // Some versions have rows directly on the result - console.log( - "📋 Initial live query results for documents (direct):", - liveQuery.rows.length - ); - setDocuments(liveQuery.rows); - } - - // Subscribe to changes - this is the correct API! - // The callback fires automatically when Electric SQL syncs new data to PGlite - if (typeof liveQuery.subscribe === "function") { - liveQuery.subscribe((result: { rows: Document[] }) => { - if (mounted && result.rows) { - console.log("🔄 Documents updated via live query:", result.rows.length); - setDocuments(result.rows); - } - }); - - // Store unsubscribe function for cleanup - liveQueryRef.current = liveQuery; - } - } else { - console.warn( - "PGlite live query API not available for documents, falling back to polling" - ); - } - } catch (liveQueryErr) { - console.error("Failed to set up live query for documents:", liveQueryErr); - // Don't fail completely - we still have the initial fetch - } + // Set up live query for real-time updates + await setupLiveQuery(); } catch (err) { - console.error("Failed to initialize Electric SQL for documents:", err); - if (mounted) { - setError( - err instanceof Error - ? err - : new Error("Failed to initialize Electric SQL for documents") - ); - setLoading(false); - } + if (!mounted) return; + console.error("[useDocumentsElectric] Failed to start sync:", err); + setError(err instanceof Error ? err : new Error("Failed to sync documents")); + setLoading(false); } } - init(); + async function fetchDocuments() { + try { + const result = await electricClient.db.query( + `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ); + if (mounted) { + setDocuments(result.rows || []); + } + } catch (err) { + console.error("[useDocumentsElectric] Failed to fetch:", err); + } + } + + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = electricClient.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + const liveQuery = await db.live.query( + `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, + [searchSpaceId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + // Set initial results + if (liveQuery.initialResults?.rows) { + setDocuments(liveQuery.initialResults.rows); + } else if (liveQuery.rows) { + setDocuments(liveQuery.rows); + } + + // Subscribe to changes + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: Document[] }) => { + if (mounted && result.rows) { + setDocuments(result.rows); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch (liveErr) { + console.error("[useDocumentsElectric] Failed to set up live query:", liveErr); + } + } + + startSync(); return () => { mounted = false; - syncHandleRef.current?.unsubscribe?.(); - liveQueryRef.current?.unsubscribe?.(); - syncHandleRef.current = null; - liveQueryRef.current = null; + syncKeyRef.current = null; + + if (syncHandleRef.current) { + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; + } + if (liveQueryRef.current) { + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; + } }; - }, [searchSpaceId]); - - async function fetchDocuments(db: any) { - try { - const result = await db.query( - `SELECT id, document_type, search_space_id, created_at FROM documents WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - console.log("📋 Fetched documents from PGlite:", result.rows?.length || 0); - setDocuments(result.rows || []); - } catch (err) { - console.error("Failed to fetch documents from PGlite:", err); - } - } + }, [searchSpaceId, electricClient]); return { documentTypeCounts, loading, error }; } diff --git a/surfsense_web/hooks/use-notifications.ts b/surfsense_web/hooks/use-notifications.ts index 7906fc500..eaf646e76 100644 --- a/surfsense_web/hooks/use-notifications.ts +++ b/surfsense_web/hooks/use-notifications.ts @@ -1,81 +1,81 @@ "use client"; import { useEffect, useState, useCallback, useRef } from "react"; -import { initElectric, type ElectricClient, type SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; +import type { SyncHandle } from "@/lib/electric/client"; import type { Notification } from "@/contracts/types/notification.types"; import { authenticatedFetch } from "@/lib/auth-utils"; export type { Notification } from "@/contracts/types/notification.types"; -export function useNotifications(userId: string | null) { - const [electric, setElectric] = useState(null); +/** + * Hook for managing notifications with Electric SQL real-time sync + * + * Uses the Electric client from context (provided by ElectricProvider) + * instead of initializing its own - prevents race conditions and memory leaks + * + * @param userId - The user ID to fetch notifications for + * @param searchSpaceId - The search space ID to filter notifications (null shows global notifications only) + */ +export function useNotifications(userId: string | null, searchSpaceId: number | null) { + // Get Electric client from context - ElectricProvider handles initialization + const electricClient = useElectricClient(); + const [notifications, setNotifications] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const syncHandleRef = useRef(null); const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); - // Use ref instead of state to track initialization - prevents cleanup from running when set - const initializedRef = useRef(false); + const syncKeyRef = useRef(null); - // Initialize Electric SQL and start syncing with real-time updates + // Start syncing when Electric client is available useEffect(() => { - // Use ref to prevent re-initialization without triggering cleanup - if (!userId || initializedRef.current) return; - initializedRef.current = true; + // Wait for both userId and Electric client to be available + if (!userId || !electricClient) { + setLoading(!electricClient); // Still loading if waiting for Electric + return; + } + + // Create a unique key for this sync - includes searchSpaceId for proper tracking + // Note: We sync ALL user notifications but filter by searchSpaceId in queries (memory efficient) + const syncKey = `notifications_${userId}_space_${searchSpaceId ?? "global"}`; + if (syncKeyRef.current === syncKey) { + // Already syncing for this user/searchSpace combo + return; + } let mounted = true; + syncKeyRef.current = syncKey; - async function init() { + async function startSync() { try { - const electricClient = await initElectric(); - if (!mounted) return; + console.log("[useNotifications] Starting sync for user:", userId, "searchSpace:", searchSpaceId); - setElectric(electricClient); - - // Start syncing notifications for this user via Electric SQL - // Note: user_id is stored as TEXT in PGlite (UUID from backend is converted) - console.log("Starting Electric SQL sync for user:", userId); - - // Use string format for WHERE clause (PGlite sync plugin expects this format) - // The user_id is a UUID string, so we need to quote it properly + // Sync ALL notifications for this user (one subscription for all search spaces) + // This is memory efficient - we filter by searchSpaceId in queries only const handle = await electricClient.syncShape({ table: "notifications", where: `user_id = '${userId}'`, primaryKey: ["id"], }); - console.log("Electric SQL sync started:", { + console.log("[useNotifications] Sync started:", { isUpToDate: handle.isUpToDate, hasStream: !!handle.stream, - hasInitialSyncPromise: !!handle.initialSyncPromise, }); - // Optimized: Check if already up-to-date before waiting - if (handle.isUpToDate) { - console.log("Sync already up-to-date, skipping wait"); - } else if (handle.initialSyncPromise) { - // Only wait if not already up-to-date - console.log("Waiting for initial sync to complete..."); + // Wait for initial sync with timeout + if (!handle.isUpToDate && handle.initialSyncPromise) { try { - // Use Promise.race with a shorter timeout to avoid long waits await Promise.race([ handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 2000)), // Max 2s wait + new Promise((resolve) => setTimeout(resolve, 2000)), ]); - console.log("Initial sync promise resolved or timed out, checking status:", { - isUpToDate: handle.isUpToDate, - }); } catch (syncErr) { - console.error("Initial sync failed:", syncErr); + console.error("[useNotifications] Initial sync failed:", syncErr); } } - // Check status after waiting - console.log("Sync status after waiting:", { - isUpToDate: handle.isUpToDate, - hasStream: !!handle.stream, - }); - if (!mounted) { handle.unsubscribe(); return; @@ -85,119 +85,88 @@ export function useNotifications(userId: string | null) { setLoading(false); setError(null); - // Fetch notifications after sync is complete (we already waited above) - await fetchNotifications(electricClient.db); + // Fetch initial notifications + await fetchNotifications(); - // Set up real-time updates using PGlite live queries - // Electric SQL syncs data to PGlite in real-time via HTTP streaming - // PGlite live queries detect when the synced data changes and trigger callbacks - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any; - - // Use PGlite's live query API for real-time updates - // CORRECT API: await db.live.query() then use .subscribe() - if (db.live?.query && typeof db.live.query === "function") { - // IMPORTANT: db.live.query() returns a Promise - must await it! - const liveQuery = await db.live.query( - `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`, - [userId] - ); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - // Set initial results immediately from the resolved query - if (liveQuery.initialResults?.rows) { - console.log("📋 Initial live query results:", liveQuery.initialResults.rows.length); - setNotifications(liveQuery.initialResults.rows); - } else if (liveQuery.rows) { - // Some versions have rows directly on the result - console.log("📋 Initial live query results (direct):", liveQuery.rows.length); - setNotifications(liveQuery.rows); - } - - // Subscribe to changes - this is the correct API! - // The callback fires automatically when Electric SQL syncs new data to PGlite - if (typeof liveQuery.subscribe === "function") { - liveQuery.subscribe((result: { rows: Notification[] }) => { - console.log( - "🔔 Live query update received:", - result.rows?.length || 0, - "notifications" - ); - if (mounted && result.rows) { - setNotifications(result.rows); - } - }); - console.log("✅ Real-time notifications enabled via PGlite live queries"); - } else { - console.warn("⚠️ Live query subscribe method not available"); - } - - // Store for cleanup - if (typeof liveQuery.unsubscribe === "function") { - liveQueryRef.current = liveQuery; - } - } else { - console.error("❌ PGlite live queries not available - db.live.query is not a function"); - console.log("db.live:", db.live); - } - } catch (liveErr) { - console.error("❌ Failed to set up real-time updates:", liveErr); - } + // Set up live query for real-time updates + await setupLiveQuery(); } catch (err) { if (!mounted) return; - console.error("Failed to initialize Electric SQL:", err); - setError(err instanceof Error ? err : new Error("Failed to initialize Electric SQL")); - // Still mark as loaded so the UI doesn't block + console.error("[useNotifications] Failed to start sync:", err); + setError(err instanceof Error ? err : new Error("Failed to sync notifications")); setLoading(false); } } - async function fetchNotifications( - db: InstanceType - ) { + async function fetchNotifications() { try { - // Debug: Check all notifications first - const allNotifications = await db.query( - `SELECT * FROM notifications ORDER BY created_at DESC` - ); - console.log( - "All notifications in PGlite:", - allNotifications.rows?.length || 0, - allNotifications.rows - ); - - // Use PGlite's query method (not exec for SELECT queries) - const result = await db.query( + // Filter by user_id AND searchSpaceId (or global notifications where search_space_id IS NULL) + const result = await electricClient.db.query( `SELECT * FROM notifications WHERE user_id = $1 + AND (search_space_id = $2 OR search_space_id IS NULL) ORDER BY created_at DESC`, - [userId] + [userId, searchSpaceId] ); - console.log(`Notifications for user ${userId}:`, result.rows?.length || 0, result.rows); - if (mounted) { - // PGlite query returns { rows: [] } format setNotifications(result.rows || []); } } catch (err) { - console.error("Failed to fetch notifications:", err); - // Log more details for debugging - console.error("Error details:", err); + console.error("[useNotifications] Failed to fetch:", err); } } - init(); + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = electricClient.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + // Filter by user_id AND searchSpaceId (or global notifications) + const liveQuery = await db.live.query( + `SELECT * FROM notifications + WHERE user_id = $1 + AND (search_space_id = $2 OR search_space_id IS NULL) + ORDER BY created_at DESC`, + [userId, searchSpaceId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + // Set initial results + if (liveQuery.initialResults?.rows) { + setNotifications(liveQuery.initialResults.rows); + } else if (liveQuery.rows) { + setNotifications(liveQuery.rows); + } + + // Subscribe to changes + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: Notification[] }) => { + if (mounted && result.rows) { + setNotifications(result.rows); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch (liveErr) { + console.error("[useNotifications] Failed to set up live query:", liveErr); + } + } + + startSync(); return () => { mounted = false; - // Reset initialization state so we can reinitialize with a new userId - initializedRef.current = false; - setLoading(true); + syncKeyRef.current = null; + if (syncHandleRef.current) { syncHandleRef.current.unsubscribe(); syncHandleRef.current = null; @@ -207,15 +176,11 @@ export function useNotifications(userId: string | null) { liveQueryRef.current = null; } }; - // Only depend on userId - using ref for initialization tracking to prevent cleanup issues - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [userId]); + }, [userId, searchSpaceId, electricClient]); // Mark notification as read via backend API - // Electric SQL will automatically sync the change to all clients const markAsRead = useCallback(async (notificationId: number) => { try { - // Call backend API - Electric SQL will sync the change automatically const response = await authenticatedFetch( `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/notifications/${notificationId}/read`, { method: "PATCH" } @@ -226,8 +191,6 @@ export function useNotifications(userId: string | null) { throw new Error(error.detail || "Failed to mark notification as read"); } - // Electric SQL will sync the change from PostgreSQL to PGlite automatically - // The live query subscription will update the UI return true; } catch (err) { console.error("Failed to mark notification as read:", err); @@ -236,10 +199,8 @@ export function useNotifications(userId: string | null) { }, []); // Mark all notifications as read via backend API - // Electric SQL will automatically sync the changes to all clients const markAllAsRead = useCallback(async () => { try { - // Call backend API - Electric SQL will sync all changes automatically const response = await authenticatedFetch( `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/notifications/read-all`, { method: "PATCH" } @@ -250,8 +211,6 @@ export function useNotifications(userId: string | null) { throw new Error(error.detail || "Failed to mark all notifications as read"); } - // Electric SQL will sync the changes from PostgreSQL to PGlite automatically - // The live query subscription will update the UI return true; } catch (err) { console.error("Failed to mark all notifications as read:", err); diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 4b888f12f..801b7e963 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -1,10 +1,15 @@ /** * Electric SQL client setup for ElectricSQL 1.x with PGlite * - * This uses the new ElectricSQL 1.x architecture: - * - PGlite: In-browser PostgreSQL database (local storage) - * - @electric-sql/pglite-sync: Sync plugin to sync Electric shapes into PGlite - * - @electric-sql/client: HTTP client for subscribing to shapes + * USER-SPECIFIC DATABASE ARCHITECTURE: + * - Each user gets their own IndexedDB database: idb://surfsense-{userId}-v{version} + * - On login: cleanup databases from other users, then initialize current user's DB + * - On logout: best-effort cleanup (not relied upon) + * + * This ensures: + * 1. Complete user isolation (data can never leak between users) + * 2. Self-healing on login (stale databases are cleaned up) + * 3. Works even if logout cleanup fails */ import { PGlite } from "@electric-sql/pglite"; @@ -14,6 +19,7 @@ import { live } from "@electric-sql/pglite/live"; // Types export interface ElectricClient { db: PGlite; + userId: string; syncShape: (options: SyncShapeOptions) => Promise; } @@ -33,14 +39,21 @@ export interface SyncHandle { initialSyncPromise?: Promise; } -// Singleton instance +// Singleton state - now tracks the user ID let electricClient: ElectricClient | null = null; +let currentUserId: string | null = null; let isInitializing = false; let initPromise: Promise | null = null; +// Cache for sync handles to prevent duplicate subscriptions (memory optimization) +const activeSyncHandles = new Map(); + // Version for sync state - increment this to force fresh sync when Electric config changes -// Incremented to v4 to fix sync completion issues -const SYNC_VERSION = 4; +// Incremented to v5 for user-specific database architecture +const SYNC_VERSION = 5; + +// Database name prefix for identifying SurfSense databases +const DB_PREFIX = "surfsense-"; // Get Electric URL from environment function getElectricUrl(): string { @@ -51,24 +64,100 @@ function getElectricUrl(): string { } /** - * Initialize the Electric SQL client with PGlite and sync plugin + * Get the database name for a specific user */ -export async function initElectric(): Promise { - if (electricClient) { +function getDbName(userId: string): string { + return `idb://${DB_PREFIX}${userId}-v${SYNC_VERSION}`; +} + +/** + * Clean up databases from OTHER users (not the current user) + * This is called on login to ensure clean state + */ +async function cleanupOtherUserDatabases(currentUserId: string): Promise { + if (typeof window === "undefined" || !window.indexedDB) { + return; + } + + try { + // Try to list all databases (not supported in all browsers) + if (typeof window.indexedDB.databases === "function") { + const databases = await window.indexedDB.databases(); + + for (const dbInfo of databases) { + const dbName = dbInfo.name; + if (!dbName) continue; + + // Check if this is a SurfSense database + if (dbName.startsWith(DB_PREFIX) || dbName.includes("surfsense")) { + // Don't delete current user's database + if (dbName.includes(currentUserId)) { + console.log(`[Electric] Keeping current user's database: ${dbName}`); + continue; + } + + // Delete databases from other users + try { + console.log(`[Electric] Deleting stale database: ${dbName}`); + window.indexedDB.deleteDatabase(dbName); + } catch (deleteErr) { + console.warn(`[Electric] Failed to delete database ${dbName}:`, deleteErr); + } + } + } + } + } catch (err) { + // indexedDB.databases() not supported - that's okay, login cleanup is best-effort + console.warn("[Electric] Could not enumerate databases for cleanup:", err); + } +} + +/** + * Initialize the Electric SQL client for a specific user + * + * KEY BEHAVIORS: + * 1. If already initialized for the SAME user, returns existing client + * 2. If initialized for a DIFFERENT user, closes old client and creates new one + * 3. On first init, cleans up databases from other users + * + * @param userId - The current user's ID (required) + */ +export async function initElectric(userId: string): Promise { + if (!userId) { + throw new Error("userId is required for Electric initialization"); + } + + // If already initialized for this user, return existing client + if (electricClient && currentUserId === userId) { return electricClient; } + // If initialized for a different user, close the old client first + if (electricClient && currentUserId !== userId) { + console.log(`[Electric] User changed from ${currentUserId} to ${userId}, reinitializing...`); + await cleanupElectric(); + } + + // If already initializing, wait for it if (isInitializing && initPromise) { return initPromise; } isInitializing = true; + currentUserId = userId; + initPromise = (async () => { try { - // Create PGlite instance with Electric sync plugin and live queries - // Include version in database name to force fresh sync when Electric config changes + // STEP 1: Clean up databases from other users (login-time cleanup) + console.log("[Electric] Cleaning up databases from other users..."); + await cleanupOtherUserDatabases(userId); + + // STEP 2: Create user-specific PGlite database + const dbName = getDbName(userId); + console.log(`[Electric] Initializing database: ${dbName}`); + const db = await PGlite.create({ - dataDir: `idb://surfsense-notifications-v${SYNC_VERSION}`, + dataDir: dbName, relaxedDurability: true, extensions: { // Enable debug mode in electricSync to see detailed sync logs @@ -77,7 +166,7 @@ export async function initElectric(): Promise { }, }); - // Create the notifications table schema in PGlite + // STEP 3: Create the notifications table schema in PGlite // This matches the backend schema await db.exec(` CREATE TABLE IF NOT EXISTS notifications ( @@ -137,12 +226,23 @@ export async function initElectric(): Promise { const electricUrl = getElectricUrl(); - // Create the client wrapper + // STEP 4: Create the client wrapper electricClient = { db, + userId, syncShape: async (options: SyncShapeOptions): Promise => { const { table, where, columns, primaryKey = ["id"] } = options; + // Create cache key for this sync shape + const cacheKey = `${table}_${where || "all"}_${columns?.join(",") || "all"}`; + + // Check if we already have an active sync for this shape (memory optimization) + const existingHandle = activeSyncHandles.get(cacheKey); + if (existingHandle) { + console.log(`[Electric] Reusing existing sync handle for: ${cacheKey}`); + return existingHandle; + } + // Build params for the shape request // Electric SQL expects params as URL query parameters const params: Record = { table }; @@ -177,15 +277,15 @@ export async function initElectric(): Promise { if (columns) params.columns = columns.join(","); - console.log("Syncing shape with params:", params); - console.log("Electric URL:", `${electricUrl}/v1/shape`); - console.log("Where clause:", where, "Validated:", validatedWhere); + console.log("[Electric] Syncing shape with params:", params); + console.log("[Electric] Electric URL:", `${electricUrl}/v1/shape`); + console.log("[Electric] Where clause:", where, "Validated:", validatedWhere); try { // Debug: Test Electric SQL connection directly first // Use validatedWhere to ensure proper URL encoding const testUrl = `${electricUrl}/v1/shape?table=${table}&offset=-1${validatedWhere ? `&where=${encodeURIComponent(validatedWhere)}` : ""}`; - console.log("Testing Electric SQL directly:", testUrl); + console.log("[Electric] Testing Electric SQL directly:", testUrl); try { const testResponse = await fetch(testUrl); const testHeaders = { @@ -193,15 +293,15 @@ export async function initElectric(): Promise { offset: testResponse.headers.get("electric-offset"), upToDate: testResponse.headers.get("electric-up-to-date"), }; - console.log("Direct Electric SQL response headers:", testHeaders); + console.log("[Electric] Direct Electric SQL response headers:", testHeaders); const testData = await testResponse.json(); console.log( - "Direct Electric SQL data count:", + "[Electric] Direct Electric SQL data count:", Array.isArray(testData) ? testData.length : "not array", testData ); } catch (testErr) { - console.error("Direct Electric SQL test failed:", testErr); + console.error("[Electric] Direct Electric SQL test failed:", testErr); } // Use PGlite's electric sync plugin to sync the shape @@ -211,9 +311,10 @@ export async function initElectric(): Promise { // Create a promise that resolves when initial sync is complete // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates - let resolveInitialSync: () => void; - let rejectInitialSync: (error: Error) => void; let syncResolved = false; + // Initialize with no-op functions to satisfy TypeScript + let resolveInitialSync: () => void = () => {}; + let rejectInitialSync: (error: Error) => void = () => {}; const initialSyncPromise = new Promise((resolve, reject) => { resolveInitialSync = () => { @@ -232,26 +333,26 @@ export async function initElectric(): Promise { }; // Shorter timeout (5 seconds) as fallback - const timeoutId = setTimeout(() => { + setTimeout(() => { if (!syncResolved) { console.warn( - `⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` + `[Electric] ⚠️ Sync timeout for ${table} - checking isUpToDate one more time...` ); // Check isUpToDate one more time before resolving // This will be checked after shape is created setTimeout(() => { if (!syncResolved) { - console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`); + console.warn( + `[Electric] ⚠️ Sync timeout for ${table} - resolving anyway after 5s` + ); resolveInitialSync(); } }, 100); } }, 5000); - - // Store timeout ID for cleanup if needed - // Note: timeout will be cleared if sync completes early }); + // Include userId in shapeKey for user-specific sync state const shapeConfig = { shape: { url: `${electricUrl}/v1/shape`, @@ -263,22 +364,24 @@ export async function initElectric(): Promise { }, table, primaryKey, - shapeKey: `v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // Versioned key to force fresh sync when needed + shapeKey: `${userId}_v${SYNC_VERSION}_${table}_${where?.replace(/[^a-zA-Z0-9]/g, "_") || "all"}`, // User-specific versioned key onInitialSync: () => { - console.log(`✅ Initial sync complete for ${table} - data should now be in PGlite`); + console.log( + `[Electric] ✅ Initial sync complete for ${table} - data should now be in PGlite` + ); resolveInitialSync(); }, onError: (error: Error) => { - console.error(`❌ Shape sync error for ${table}:`, error); + console.error(`[Electric] ❌ Shape sync error for ${table}:`, error); console.error( - "Error details:", + "[Electric] Error details:", JSON.stringify(error, Object.getOwnPropertyNames(error)) ); rejectInitialSync(error); }, }; - console.log("syncShapeToTable config:", JSON.stringify(shapeConfig, null, 2)); + console.log("[Electric] syncShapeToTable config:", JSON.stringify(shapeConfig, null, 2)); // Type assertion to PGlite with electric extension const pgWithElectric = db as PGlite & { @@ -295,7 +398,7 @@ export async function initElectric(): Promise { } // Log the actual shape result structure - console.log("Shape sync result (initial):", { + console.log("[Electric] Shape sync result (initial):", { hasUnsubscribe: typeof shape?.unsubscribe === "function", isUpToDate: shape?.isUpToDate, hasStream: !!shape?.stream, @@ -304,13 +407,15 @@ export async function initElectric(): Promise { // Recommended Approach Step 1: Check isUpToDate immediately if (shape.isUpToDate) { - console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`); + console.log( + `[Electric] ✅ Sync already up-to-date for ${table} (resuming from previous state)` + ); resolveInitialSync(); } else { // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message if (shape?.stream) { const stream = shape.stream as any; - console.log("Shape stream details:", { + console.log("[Electric] Shape stream details:", { shapeHandle: stream?.shapeHandle, lastOffset: stream?.lastOffset, isUpToDate: stream?.isUpToDate, @@ -323,12 +428,17 @@ export async function initElectric(): Promise { // NOTE: We keep this subscription active - don't unsubscribe! // The stream is what Electric SQL uses for real-time updates if (typeof stream?.subscribe === "function") { - console.log("Subscribing to shape stream to watch for up-to-date message..."); + console.log( + "[Electric] Subscribing to shape stream to watch for up-to-date message..." + ); // Subscribe but don't store unsubscribe - we want it to stay active stream.subscribe((messages: unknown[]) => { // Continue receiving updates even after sync is resolved if (!syncResolved) { - console.log("🔵 Shape stream received messages:", messages?.length || 0); + console.log( + "[Electric] 🔵 Shape stream received messages:", + messages?.length || 0 + ); } // Check if any message indicates sync is complete @@ -342,27 +452,34 @@ export async function initElectric(): Promise { (typeof msg === "object" && "up-to-date" in msg) ) { if (!syncResolved) { - console.log(`✅ Received up-to-date message for ${table}`); + console.log( + `[Electric] ✅ Received up-to-date message for ${table}` + ); resolveInitialSync(); } // Continue listening for real-time updates - don't return! } } if (!syncResolved && messages.length > 0) { - console.log("First message:", JSON.stringify(messages[0], null, 2)); + console.log( + "[Electric] First message:", + JSON.stringify(messages[0], null, 2) + ); } } // Also check stream's isUpToDate property after receiving messages if (!syncResolved && stream?.isUpToDate) { - console.log(`✅ Stream isUpToDate is true for ${table}`); + console.log(`[Electric] ✅ Stream isUpToDate is true for ${table}`); resolveInitialSync(); } }); // Also check stream's isUpToDate property immediately if (stream?.isUpToDate) { - console.log(`✅ Stream isUpToDate is true immediately for ${table}`); + console.log( + `[Electric] ✅ Stream isUpToDate is true immediately for ${table}` + ); resolveInitialSync(); } } @@ -375,7 +492,7 @@ export async function initElectric(): Promise { } if (shape.isUpToDate || stream?.isUpToDate) { - console.log(`✅ Sync completed (detected via polling) for ${table}`); + console.log(`[Electric] ✅ Sync completed (detected via polling) for ${table}`); clearInterval(pollInterval); resolveInitialSync(); } @@ -386,14 +503,19 @@ export async function initElectric(): Promise { clearInterval(pollInterval); }); } else { - console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`); + console.warn( + `[Electric] ⚠️ No stream available for ${table}, relying on callback and timeout` + ); } } - // Return the shape handle - isUpToDate is a getter that reflects current state - return { + // Create the sync handle with proper cleanup + const syncHandle: SyncHandle = { unsubscribe: () => { - console.log("unsubscribing"); + console.log(`[Electric] Unsubscribing from: ${cacheKey}`); + // Remove from cache first + activeSyncHandles.delete(cacheKey); + // Then unsubscribe from the shape if (shape && typeof shape.unsubscribe === "function") { shape.unsubscribe(); } @@ -405,30 +527,43 @@ export async function initElectric(): Promise { stream: shape?.stream, initialSyncPromise, // Expose promise so callers can wait for sync }; + + // Cache the sync handle for reuse (memory optimization) + activeSyncHandles.set(cacheKey, syncHandle); + console.log(`[Electric] Cached sync handle for: ${cacheKey} (total cached: ${activeSyncHandles.size})`); + + return syncHandle; } catch (error) { - console.error("Failed to sync shape:", error); + console.error("[Electric] Failed to sync shape:", error); // Check if Electric SQL server is reachable try { const response = await fetch(`${electricUrl}/v1/shape?table=${table}&offset=-1`, { method: "GET", }); - console.log("Electric SQL server response:", response.status, response.statusText); + console.log( + "[Electric] Electric SQL server response:", + response.status, + response.statusText + ); if (!response.ok) { - console.error("Electric SQL server error:", await response.text()); + console.error("[Electric] Electric SQL server error:", await response.text()); } } catch (fetchError) { - console.error("Cannot reach Electric SQL server:", fetchError); - console.error("Make sure Electric SQL is running at:", electricUrl); + console.error("[Electric] Cannot reach Electric SQL server:", fetchError); + console.error("[Electric] Make sure Electric SQL is running at:", electricUrl); } throw error; } }, }; - console.log("Electric SQL initialized successfully with PGlite"); + console.log(`[Electric] ✅ Initialized successfully for user: ${userId}`); return electricClient; } catch (error) { - console.error("Failed to initialize Electric SQL:", error); + console.error("[Electric] Failed to initialize:", error); + // Reset state on failure + electricClient = null; + currentUserId = null; throw error; } finally { isInitializing = false; @@ -438,21 +573,87 @@ export async function initElectric(): Promise { return initPromise; } +/** + * Cleanup Electric SQL - close database and reset singleton + * Called on logout (best-effort) and when switching users + */ +export async function cleanupElectric(): Promise { + if (!electricClient) { + return; + } + + const userIdToClean = currentUserId; + console.log(`[Electric] Cleaning up for user: ${userIdToClean}`); + + // Unsubscribe from all active sync handles first (memory cleanup) + console.log(`[Electric] Unsubscribing from ${activeSyncHandles.size} active sync handles`); + // Copy keys to array to avoid mutation during iteration + const handleKeys = Array.from(activeSyncHandles.keys()); + for (const key of handleKeys) { + const handle = activeSyncHandles.get(key); + if (handle) { + try { + handle.unsubscribe(); + } catch (err) { + console.warn(`[Electric] Failed to unsubscribe from ${key}:`, err); + } + } + } + // Ensure cache is empty + activeSyncHandles.clear(); + + try { + // Close the PGlite database connection + await electricClient.db.close(); + console.log("[Electric] Database closed"); + } catch (error) { + console.error("[Electric] Error closing database:", error); + } + + // Reset singleton state + electricClient = null; + currentUserId = null; + isInitializing = false; + initPromise = null; + + // Delete the user's IndexedDB database (best-effort cleanup on logout) + if (typeof window !== "undefined" && window.indexedDB && userIdToClean) { + try { + const dbName = `${DB_PREFIX}${userIdToClean}-v${SYNC_VERSION}`; + window.indexedDB.deleteDatabase(dbName); + console.log(`[Electric] Deleted database: ${dbName}`); + } catch (err) { + console.warn("[Electric] Failed to delete database:", err); + } + } + + console.log("[Electric] Cleanup complete"); +} + /** * Get the Electric client (throws if not initialized) */ export function getElectric(): ElectricClient { if (!electricClient) { - throw new Error("Electric not initialized. Call initElectric() first."); + throw new Error("Electric not initialized. Call initElectric(userId) first."); } return electricClient; } /** - * Check if Electric is initialized + * Check if Electric is initialized for a specific user */ -export function isElectricInitialized(): boolean { - return electricClient !== null; +export function isElectricInitialized(userId?: string): boolean { + if (!electricClient) return false; + if (userId && currentUserId !== userId) return false; + return true; +} + +/** + * Get the current user ID that Electric is initialized for + */ +export function getCurrentElectricUserId(): string | null { + return currentUserId; } /** diff --git a/surfsense_web/lib/electric/context.ts b/surfsense_web/lib/electric/context.ts new file mode 100644 index 000000000..010959ed2 --- /dev/null +++ b/surfsense_web/lib/electric/context.ts @@ -0,0 +1,37 @@ +"use client"; + +import { createContext, useContext } from "react"; +import type { ElectricClient } from "./client"; + +/** + * Context for sharing the Electric SQL client across the app + * + * This ensures: + * 1. Single initialization point (ElectricProvider only) + * 2. No race conditions (hooks wait for context) + * 3. Clean cleanup (ElectricProvider manages lifecycle) + */ +export const ElectricContext = createContext(null); + +/** + * Hook to get the Electric client from context + * Returns null if Electric is not initialized yet + */ +export function useElectricClient(): ElectricClient | null { + return useContext(ElectricContext); +} + +/** + * Hook to get the Electric client, throwing if not available + * Use this when you're sure Electric should be initialized + */ +export function useElectricClientOrThrow(): ElectricClient { + const client = useContext(ElectricContext); + if (!client) { + throw new Error( + "Electric client not available. Make sure you're inside ElectricProvider and user is authenticated." + ); + } + return client; +} +