diff --git a/surfsense_web/components/notifications/NotificationPopup.tsx b/surfsense_web/components/notifications/NotificationPopup.tsx index 562aee4d1..8bf6cce00 100644 --- a/surfsense_web/components/notifications/NotificationPopup.tsx +++ b/surfsense_web/components/notifications/NotificationPopup.tsx @@ -4,7 +4,6 @@ import { Bell, Check, CheckCheck, Loader2, AlertCircle, CheckCircle2 } from "luc import { Button } from "@/components/ui/button"; import { ScrollArea } from "@/components/ui/scroll-area"; import { Separator } from "@/components/ui/separator"; -import { Badge } from "@/components/ui/badge"; import type { Notification } from "@/hooks/use-notifications"; import { formatDistanceToNow } from "date-fns"; import { cn } from "@/lib/utils"; @@ -40,34 +39,18 @@ export function NotificationPopup({ } }; - const getStatusBadge = (notification: Notification) => { + const getStatusIcon = (notification: Notification) => { const status = notification.metadata?.status as string | undefined; - if (!status) return null; - + switch (status) { case "in_progress": - return ( - - - In Progress - - ); + return ; case "completed": - return ( - - - Completed - - ); + return ; case "failed": - return ( - - - Failed - - ); + return ; default: - return null; + return ; } }; @@ -76,13 +59,7 @@ export function NotificationPopup({ {/* Header */}
-

Notifications

- {unreadCount > 0 && ( - - {unreadCount > 99 ? "99+" : unreadCount} - - )}
{unreadCount > 0 && ( )} @@ -121,6 +98,9 @@ export function NotificationPopup({ )} >
+
+ {getStatusIcon(notification)} +

{notification.title}

-
- {getStatusBadge(notification)} - {!notification.read && ( -
- )} -

{notification.message} @@ -145,20 +119,6 @@ export function NotificationPopup({ {formatTime(notification.created_at)} - {!notification.read && ( - - )}

@@ -172,4 +132,3 @@ export function NotificationPopup({
); } - diff --git a/surfsense_web/hooks/use-notifications.ts b/surfsense_web/hooks/use-notifications.ts index d629503e9..de19a82fe 100644 --- a/surfsense_web/hooks/use-notifications.ts +++ b/surfsense_web/hooks/use-notifications.ts @@ -19,14 +19,18 @@ export interface Notification { export function useNotifications(userId: string | null) { const [electric, setElectric] = useState(null) const [notifications, setNotifications] = useState([]) - const [initialized, setInitialized] = useState(false) + const [loading, setLoading] = useState(true) const [error, setError] = useState(null) const syncHandleRef = useRef(null) const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null) + // Use ref instead of state to track initialization - prevents cleanup from running when set + const initializedRef = useRef(false) // Initialize Electric SQL and start syncing with real-time updates useEffect(() => { - if (!userId || initialized) return + // Use ref to prevent re-initialization without triggering cleanup + if (!userId || initializedRef.current) return + initializedRef.current = true let mounted = true @@ -55,12 +59,19 @@ export function useNotifications(userId: string | null) { hasInitialSyncPromise: !!handle.initialSyncPromise, }) - // Wait for initial sync to complete if the promise is available - if (handle.initialSyncPromise) { + // Optimized: Check if already up-to-date before waiting + if (handle.isUpToDate) { + console.log('Sync already up-to-date, skipping wait') + } else if (handle.initialSyncPromise) { + // Only wait if not already up-to-date console.log('Waiting for initial sync to complete...') try { - await handle.initialSyncPromise - console.log('Initial sync promise resolved, checking status:', { + // Use Promise.race with a shorter timeout to avoid long waits + await Promise.race([ + handle.initialSyncPromise, + new Promise(resolve => setTimeout(resolve, 2000)), // Max 2s wait + ]) + console.log('Initial sync promise resolved or timed out, checking status:', { isUpToDate: handle.isUpToDate, }) } catch (syncErr) { @@ -80,122 +91,74 @@ export function useNotifications(userId: string | null) { } syncHandleRef.current = handle - setInitialized(true) + setLoading(false) setError(null) // Fetch notifications after sync is complete (we already waited above) await fetchNotifications(electricClient.db) // Set up real-time updates using PGlite live queries - // Electric SQL syncs data to PGlite in real-time via WebSocket/HTTP + // Electric SQL syncs data to PGlite in real-time via HTTP streaming // PGlite live queries detect when the synced data changes and trigger callbacks try { // eslint-disable-next-line @typescript-eslint/no-explicit-any const db = electricClient.db as any // Use PGlite's live query API for real-time updates - // Based on latest PGlite docs: db.live.query(query, params, callback) + // CORRECT API: await db.live.query() then use .subscribe() if (db.live?.query && typeof db.live.query === 'function') { - const liveQuery = db.live.query( + // IMPORTANT: db.live.query() returns a Promise - must await it! + const liveQuery = await db.live.query( `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`, - [userId], - (result: { rows: Notification[] }) => { - // This callback fires automatically when Electric SQL syncs changes - if (mounted) { - setNotifications(result.rows) - } - } + [userId] ) - // Set initial results immediately - if (liveQuery.initialResults) { - setNotifications(liveQuery.initialResults.rows) + if (!mounted) { + liveQuery.unsubscribe?.() + return } - if (mounted && liveQuery && typeof liveQuery.unsubscribe === 'function') { - liveQueryRef.current = liveQuery + // Set initial results immediately from the resolved query + if (liveQuery.initialResults?.rows) { + console.log('📋 Initial live query results:', liveQuery.initialResults.rows.length) + setNotifications(liveQuery.initialResults.rows) + } else if (liveQuery.rows) { + // Some versions have rows directly on the result + console.log('📋 Initial live query results (direct):', liveQuery.rows.length) + setNotifications(liveQuery.rows) + } + + // Subscribe to changes - this is the correct API! + // The callback fires automatically when Electric SQL syncs new data to PGlite + if (typeof liveQuery.subscribe === 'function') { + liveQuery.subscribe((result: { rows: Notification[] }) => { + console.log('🔔 Live query update received:', result.rows?.length || 0, 'notifications') + if (mounted && result.rows) { + setNotifications(result.rows) + } + }) console.log('✅ Real-time notifications enabled via PGlite live queries') + } else { + console.warn('⚠️ Live query subscribe method not available') + } + + // Store for cleanup + if (typeof liveQuery.unsubscribe === 'function') { + liveQueryRef.current = liveQuery } } else { - // Fallback: Monitor sync handle for updates - // Electric SQL's syncShape should trigger updates, but we need to detect them - // This is a lightweight approach that only checks when sync indicates changes - console.warn('PGlite live queries not available - using sync-based change detection') - - let lastNotificationIds = new Set() - - const checkForSyncUpdates = async () => { - if (!mounted) return - - try { - const result = await electricClient.db.query( - `SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC`, - [userId] - ) - - // PGlite query returns { rows: [] } format - const rows = result.rows || [] - - // Only update if data actually changed - const currentIds = new Set(rows.map(r => r.id)) - const currentHash = JSON.stringify( - rows.map(r => ({ id: r.id, read: r.read, updated_at: r.updated_at })) - ) - - // Check if IDs changed (new/deleted notifications) - const idsChanged = - currentIds.size !== lastNotificationIds.size || - [...currentIds].some(id => !lastNotificationIds.has(id)) || - [...lastNotificationIds].some(id => !currentIds.has(id)) - - if (idsChanged) { - setNotifications(rows) - lastNotificationIds = currentIds - } else { - // Check if any notification properties changed (e.g., read status) - // Compare with current state - setNotifications(prev => { - const prevHash = JSON.stringify( - prev.map(r => ({ id: r.id, read: r.read, updated_at: r.updated_at })) - ) - if (prevHash !== currentHash) { - return rows - } - return prev - }) - } - } catch (err) { - console.error('Failed to check for notification updates:', err) - } - - // Check again after a short delay (Electric SQL syncs are fast) - if (mounted) { - setTimeout(checkForSyncUpdates, 500) // Check every 500ms - Electric SQL syncs are near-instant - } - } - - // Start monitoring - checkForSyncUpdates() - - liveQueryRef.current = { - unsubscribe: () => { - mounted = false - } - } + console.error('❌ PGlite live queries not available - db.live.query is not a function') + console.log('db.live:', db.live) } } catch (liveErr) { - console.warn('Failed to set up real-time updates:', liveErr) - // Minimal fallback - this should rarely be needed - liveQueryRef.current = { - unsubscribe: () => {} - } + console.error('❌ Failed to set up real-time updates:', liveErr) } } catch (err) { if (!mounted) return console.error('Failed to initialize Electric SQL:', err) setError(err instanceof Error ? err : new Error('Failed to initialize Electric SQL')) - // Still mark as initialized so the UI doesn't block - setInitialized(true) + // Still mark as loaded so the UI doesn't block + setLoading(false) } } @@ -231,6 +194,9 @@ export function useNotifications(userId: string | null) { return () => { mounted = false + // Reset initialization state so we can reinitialize with a new userId + initializedRef.current = false + setLoading(true) if (syncHandleRef.current) { syncHandleRef.current.unsubscribe() syncHandleRef.current = null @@ -240,7 +206,9 @@ export function useNotifications(userId: string | null) { liveQueryRef.current = null } } - }, [userId, initialized]) + // Only depend on userId - using ref for initialization tracking to prevent cleanup issues + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [userId]) // Mark notification as read (local only - needs backend sync) const markAsRead = useCallback( @@ -301,7 +269,7 @@ export function useNotifications(userId: string | null) { unreadCount, markAsRead, markAllAsRead, - loading: !initialized, + loading, error, } } diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 97a2c1ca0..509fd0231 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -167,16 +167,45 @@ export async function initElectric(): Promise { // Note: mapColumns is OPTIONAL per pglite-sync types.ts // Create a promise that resolves when initial sync is complete + // Using recommended approach: check isUpToDate immediately, watch stream, shorter timeout + // IMPORTANT: We don't unsubscribe from the stream - it must stay active for real-time updates let resolveInitialSync: () => void let rejectInitialSync: (error: Error) => void + let syncResolved = false + const initialSyncPromise = new Promise((resolve, reject) => { - resolveInitialSync = resolve - rejectInitialSync = reject - // Safety timeout - if sync doesn't complete in 30s, something is wrong - setTimeout(() => { - console.warn(`⚠️ Sync timeout for ${table} - sync did not complete in 30s`) - resolve() // Resolve anyway to not block, but log warning - }, 30000) + resolveInitialSync = () => { + if (!syncResolved) { + syncResolved = true + // DON'T unsubscribe from stream - it needs to stay active for real-time updates + resolve() + } + } + rejectInitialSync = (error: Error) => { + if (!syncResolved) { + syncResolved = true + // DON'T unsubscribe from stream even on error - let Electric handle it + reject(error) + } + } + + // Shorter timeout (5 seconds) as fallback + const timeoutId = setTimeout(() => { + if (!syncResolved) { + console.warn(`⚠️ Sync timeout for ${table} - checking isUpToDate one more time...`) + // Check isUpToDate one more time before resolving + // This will be checked after shape is created + setTimeout(() => { + if (!syncResolved) { + console.warn(`⚠️ Sync timeout for ${table} - resolving anyway after 5s`) + resolveInitialSync() + } + }, 100) + } + }, 5000) + + // Store timeout ID for cleanup if needed + // Note: timeout will be cleared if sync completes early }) const shapeConfig = { @@ -220,35 +249,91 @@ export async function initElectric(): Promise { streamType: typeof shape?.stream, }) - // Debug the stream if available - if (shape?.stream) { - const stream = shape.stream as any - console.log('Shape stream details:', { - shapeHandle: stream?.shapeHandle, - lastOffset: stream?.lastOffset, - isUpToDate: stream?.isUpToDate, - error: stream?.error, - hasSubscribe: typeof stream?.subscribe === 'function', - hasUnsubscribe: typeof stream?.unsubscribe === 'function', - }) - - // Try to subscribe to the stream to see if it's receiving messages - if (typeof stream?.subscribe === 'function') { - console.log('Subscribing to shape stream for debugging...') - stream.subscribe((messages: unknown[]) => { - console.log('🔵 Shape stream received messages:', messages?.length || 0) - if (messages && messages.length > 0) { - console.log('First message:', JSON.stringify(messages[0], null, 2)) - } + // Recommended Approach Step 1: Check isUpToDate immediately + if (shape.isUpToDate) { + console.log(`✅ Sync already up-to-date for ${table} (resuming from previous state)`) + resolveInitialSync() + } else { + // Recommended Approach Step 2: Subscribe to stream and watch for "up-to-date" message + if (shape?.stream) { + const stream = shape.stream as any + console.log('Shape stream details:', { + shapeHandle: stream?.shapeHandle, + lastOffset: stream?.lastOffset, + isUpToDate: stream?.isUpToDate, + error: stream?.error, + hasSubscribe: typeof stream?.subscribe === 'function', + hasUnsubscribe: typeof stream?.unsubscribe === 'function', }) + + // Subscribe to the stream to watch for "up-to-date" control message + // NOTE: We keep this subscription active - don't unsubscribe! + // The stream is what Electric SQL uses for real-time updates + if (typeof stream?.subscribe === 'function') { + console.log('Subscribing to shape stream to watch for up-to-date message...') + // Subscribe but don't store unsubscribe - we want it to stay active + stream.subscribe((messages: unknown[]) => { + // Continue receiving updates even after sync is resolved + if (!syncResolved) { + console.log('🔵 Shape stream received messages:', messages?.length || 0) + } + + // Check if any message indicates sync is complete + if (messages && messages.length > 0) { + for (const message of messages) { + const msg = message as any + // Check for "up-to-date" control message + if (msg?.headers?.control === 'up-to-date' || + msg?.headers?.electric_up_to_date === 'true' || + (typeof msg === 'object' && 'up-to-date' in msg)) { + if (!syncResolved) { + console.log(`✅ Received up-to-date message for ${table}`) + resolveInitialSync() + } + // Continue listening for real-time updates - don't return! + } + } + if (!syncResolved && messages.length > 0) { + console.log('First message:', JSON.stringify(messages[0], null, 2)) + } + } + + // Also check stream's isUpToDate property after receiving messages + if (!syncResolved && stream?.isUpToDate) { + console.log(`✅ Stream isUpToDate is true for ${table}`) + resolveInitialSync() + } + }) + + // Also check stream's isUpToDate property immediately + if (stream?.isUpToDate) { + console.log(`✅ Stream isUpToDate is true immediately for ${table}`) + resolveInitialSync() + } + } + + // Also poll isUpToDate periodically as a backup (every 200ms) + const pollInterval = setInterval(() => { + if (syncResolved) { + clearInterval(pollInterval) + return + } + + if (shape.isUpToDate || stream?.isUpToDate) { + console.log(`✅ Sync completed (detected via polling) for ${table}`) + clearInterval(pollInterval) + resolveInitialSync() + } + }, 200) + + // Clean up polling when promise resolves + initialSyncPromise.finally(() => { + clearInterval(pollInterval) + }) + } else { + console.warn(`⚠️ No stream available for ${table}, relying on callback and timeout`) } } - - // Wait briefly to see if sync starts - await new Promise(resolve => setTimeout(resolve, 100)) - console.log('Shape sync result (after 100ms):', { - isUpToDate: shape?.isUpToDate, - }) // Return the shape handle - isUpToDate is a getter that reflects current state return {