diff --git a/surfsense_web/hooks/use-inbox.ts b/surfsense_web/hooks/use-inbox.ts index f301dc90e..d15a47a39 100644 --- a/surfsense_web/hooks/use-inbox.ts +++ b/surfsense_web/hooks/use-inbox.ts @@ -3,8 +3,8 @@ import { useCallback, useEffect, useRef, useState } from "react"; import type { InboxItem, NotificationCategory } from "@/contracts/types/inbox.types"; import { notificationsApiService } from "@/lib/apis/notifications-api.service"; -import { filterNewElectricItems, getNewestTimestamp } from "@/lib/electric/baseline"; -import { useElectricClient } from "@/lib/electric/context"; +import { queries } from "@/zero/queries"; +import { useQuery } from "@rocicorp/zero/react"; export type { InboxItem, @@ -16,17 +16,16 @@ const INITIAL_PAGE_SIZE = 50; const SCROLL_PAGE_SIZE = 30; const SYNC_WINDOW_DAYS = 4; -const CATEGORY_TYPE_SQL: Record = { - comments: "AND type IN ('new_mention', 'comment_reply')", - status: - "AND type IN ('connector_indexing', 'connector_deletion', 'document_processing', 'page_limit_exceeded')", +const CATEGORY_TYPES: Record = { + comments: ["new_mention", "comment_reply"], + status: [ + "connector_indexing", + "connector_deletion", + "document_processing", + "page_limit_exceeded", + ], }; -/** - * Calculate the cutoff date for sync window. - * Rounds to the start of the day (midnight UTC) to ensure stable values - * across re-renders. - */ function getSyncCutoffDate(): string { const cutoff = new Date(); cutoff.setDate(cutoff.getDate() - SYNC_WINDOW_DAYS); @@ -35,24 +34,12 @@ function getSyncCutoffDate(): string { } /** - * Hook for managing inbox items with API-first architecture + Electric real-time deltas. + * Hook for managing inbox items with API-first architecture + Zero real-time deltas. * - * Architecture (Documents pattern, per-tab): + * Architecture: * 1. API is the PRIMARY data source — fetches first page on mount with category filter - * 2. Electric provides REAL-TIME updates (new items, status changes, read state) - * 3. Baseline pattern prevents duplicates between API and Electric - * 4. Electric sync shape is SHARED across instances (client-level caching) - * — each instance creates its own type-filtered live queries - * - * Unread count strategy: - * - API provides the category-filtered total on mount (ground truth across all time) - * - Electric live query counts unread within SYNC_WINDOW_DAYS (filtered by type) - * - olderUnreadOffsetRef bridges the gap: total = offset + recent - * - Optimistic updates adjust both the count and the offset (for old items) - * - * @param userId - The user ID to fetch inbox items for - * @param searchSpaceId - The search space ID to filter inbox items - * @param category - Which tab: "comments" or "status" + * 2. Zero provides REAL-TIME updates (new items, status changes, read state) + * 3. Unread count = olderUnreadOffset + recent unread from Zero */ export function useInbox( userId: string | null, @@ -61,8 +48,6 @@ export function useInbox( prefetchedUnread?: { total_unread: number; recent_unread: number } | null, prefetchedUnreadReady = true ) { - const electricClient = useElectricClient(); - const [inboxItems, setInboxItems] = useState([]); const [loading, setLoading] = useState(true); const [loadingMore, setLoadingMore] = useState(false); @@ -71,17 +56,12 @@ export function useInbox( const [unreadCount, setUnreadCount] = useState(0); const initialLoadDoneRef = useRef(false); - const electricBaselineIdsRef = useRef | null>(null); - const newestApiTimestampRef = useRef(null); - const liveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null); - const unreadLiveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null); - const olderUnreadOffsetRef = useRef(null); const apiUnreadTotalRef = useRef(0); - // EFFECT 1: Fetch first page + unread count from API with category filter. - // When prefetchedUnreadReady=false, we wait for the batch query to settle - // before deciding whether we need an individual unread-count fallback call. + const categoryTypes = CATEGORY_TYPES[category]; + + // EFFECT 1: Fetch first page + unread count from API with category filter useEffect(() => { if (!userId || !searchSpaceId) return; if (!prefetchedUnreadReady) return; @@ -92,8 +72,6 @@ export function useInbox( setInboxItems([]); setHasMore(false); initialLoadDoneRef.current = false; - electricBaselineIdsRef.current = null; - newestApiTimestampRef.current = null; olderUnreadOffsetRef.current = null; apiUnreadTotalRef.current = 0; @@ -107,7 +85,6 @@ export function useInbox( }, }); - // Use prefetched counts when available, otherwise fetch individually. const unreadPromise = prefetchedUnread ? Promise.resolve(prefetchedUnread) : notificationsApiService.getUnreadCount(searchSpaceId, undefined, category); @@ -123,7 +100,6 @@ export function useInbox( setHasMore(notificationsResponse.has_more); setUnreadCount(unreadResponse.total_unread); apiUnreadTotalRef.current = unreadResponse.total_unread; - newestApiTimestampRef.current = getNewestTimestamp(notificationsResponse.items); setError(null); initialLoadDoneRef.current = true; } catch (err) { @@ -141,208 +117,87 @@ export function useInbox( }; }, [userId, searchSpaceId, category, prefetchedUnread, prefetchedUnreadReady]); - // EFFECT 2: Electric sync (shared shape) + per-instance type-filtered live queries + // EFFECT 2: Zero real-time sync for notification updates + const [zeroNotifications] = useQuery( + queries.notifications.byUser({ userId: userId ?? "" }) + ); + useEffect(() => { - if (!userId || !searchSpaceId || !electricClient) return; + if (!userId || !searchSpaceId || !zeroNotifications || !initialLoadDoneRef.current) return; - const uid = userId; - const spaceId = searchSpaceId; - const client = electricClient; - const typeFilter = CATEGORY_TYPE_SQL[category]; - let mounted = true; + const cutoff = new Date(getSyncCutoffDate()); - async function setupElectricRealtime() { - // Clean up previous live queries (NOT the sync shape — it's shared) - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may be closed */ + const validItems = zeroNotifications.filter((item) => { + if (item.id == null) return false; + if (!categoryTypes.includes(item.type)) return false; + if (item.searchSpaceId !== null && item.searchSpaceId !== searchSpaceId) return false; + return true; + }); + + const recentItems = validItems.filter( + (item) => new Date(item.createdAt) > cutoff + ); + + const liveIds = new Set(recentItems.map((d) => d.id)); + + setInboxItems((prev) => { + const prevIds = new Set(prev.map((d) => d.id)); + + const newItems: InboxItem[] = recentItems + .filter((d) => !prevIds.has(d.id)) + .map((item) => ({ + id: item.id, + user_id: item.userId, + search_space_id: item.searchSpaceId ?? undefined, + type: item.type, + title: item.title, + message: item.message, + read: item.read, + metadata: item.metadata as unknown as Record, + created_at: String(item.createdAt), + updated_at: item.updatedAt ? String(item.updatedAt) : undefined, + } as InboxItem)); + + let updated = prev.map((existing) => { + const liveItem = recentItems.find((v) => v.id === existing.id); + if (liveItem) { + return { + ...existing, + read: liveItem.read, + title: liveItem.title, + message: liveItem.message, + metadata: liveItem.metadata as unknown as Record, + } as InboxItem; } - liveQueryRef.current = null; - } - if (unreadLiveQueryRef.current) { - try { - unreadLiveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may be closed */ - } - unreadLiveQueryRef.current = null; + return existing; + }); + + updated = updated.filter((item) => { + if (new Date(item.created_at) < cutoff) return true; + return liveIds.has(item.id); + }); + + if (newItems.length > 0) { + return [...newItems, ...updated]; } - try { - const cutoffDate = getSyncCutoffDate(); + return updated; + }); - // Sync shape is cached by the Electric client — multiple hook instances - // calling syncShape with the same params get the same handle. - const handle = await client.syncShape({ - table: "notifications", - where: `user_id = '${uid}' AND created_at > '${cutoffDate}'`, - primaryKey: ["id"], - }); - - if (!mounted) return; - - if (!handle.isUpToDate && handle.initialSyncPromise) { - await Promise.race([ - handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 5000)), - ]); - } - - if (!mounted) return; - - const db = client.db as { - live?: { - query: ( - sql: string, - params?: (number | string)[] - ) => Promise<{ - subscribe: (cb: (result: { rows: T[] }) => void) => void; - unsubscribe?: () => void; - }>; - }; - }; - - if (!db.live?.query) return; - - // Per-instance live query filtered by category types - const itemsQuery = `SELECT * FROM notifications - WHERE user_id = $1 - AND (search_space_id = $2 OR search_space_id IS NULL) - AND created_at > '${cutoffDate}' - ${typeFilter} - ORDER BY created_at DESC`; - - const liveQuery = await db.live.query(itemsQuery, [uid, spaceId]); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - liveQuery.subscribe((result: { rows: InboxItem[] }) => { - if (!mounted || !result.rows || !initialLoadDoneRef.current) return; - - const validItems = result.rows.filter((item) => item.id != null && item.title != null); - const cutoff = new Date(getSyncCutoffDate()); - - const liveItemMap = new Map(validItems.map((d) => [d.id, d])); - const liveIds = new Set(liveItemMap.keys()); - - setInboxItems((prev) => { - const prevIds = new Set(prev.map((d) => d.id)); - - const newItems = filterNewElectricItems( - validItems, - liveIds, - prevIds, - electricBaselineIdsRef, - newestApiTimestampRef.current - ); - - let updated = prev.map((item) => { - const liveItem = liveItemMap.get(item.id); - if (liveItem) return liveItem; - return item; - }); - - const isFullySynced = handle.isUpToDate; - if (isFullySynced) { - updated = updated.filter((item) => { - if (new Date(item.created_at) < cutoff) return true; - return liveIds.has(item.id); - }); - } - - if (newItems.length > 0) { - return [...newItems, ...updated]; - } - - return updated; - }); - - // Calibrate the older-unread offset using baseline items - // (items present in both Electric and the API-loaded list). - // This avoids the timing bug where new items arriving between - // the API fetch and Electric's first callback would be absorbed - // into the offset, making the count appear unchanged. - const baseline = electricBaselineIdsRef.current; - if (olderUnreadOffsetRef.current === null && baseline !== null) { - const baselineUnreadCount = validItems.filter( - (item) => baseline.has(item.id) && !item.read - ).length; - olderUnreadOffsetRef.current = Math.max( - 0, - apiUnreadTotalRef.current - baselineUnreadCount - ); - } - - // Derive unread count from all Electric items + the older offset - if (olderUnreadOffsetRef.current !== null) { - const electricUnreadCount = validItems.filter((item) => !item.read).length; - setUnreadCount(olderUnreadOffsetRef.current + electricUnreadCount); - } - }); - - liveQueryRef.current = liveQuery; - - // Per-instance unread count live query filtered by category types. - // Acts as a secondary reactive path for read-status changes that - // may not trigger the items live query in all edge cases. - const countQuery = `SELECT COUNT(*) as count FROM notifications - WHERE user_id = $1 - AND (search_space_id = $2 OR search_space_id IS NULL) - AND created_at > '${cutoffDate}' - AND read = false - ${typeFilter}`; - - const countLiveQuery = await db.live.query<{ count: number | string }>(countQuery, [ - uid, - spaceId, - ]); - - if (!mounted) { - countLiveQuery.unsubscribe?.(); - return; - } - - countLiveQuery.subscribe((result: { rows: Array<{ count: number | string }> }) => { - if (!mounted || !result.rows?.[0] || !initialLoadDoneRef.current) return; - if (olderUnreadOffsetRef.current === null) return; - const liveRecentUnread = Number(result.rows[0].count) || 0; - setUnreadCount(olderUnreadOffsetRef.current + liveRecentUnread); - }); - - unreadLiveQueryRef.current = countLiveQuery; - } catch (err) { - console.error(`[useInbox:${category}] Electric setup failed:`, err); - } + // Calibrate older-unread offset on first Zero data + if (olderUnreadOffsetRef.current === null) { + const recentUnreadCount = recentItems.filter((item) => !item.read).length; + olderUnreadOffsetRef.current = Math.max( + 0, + apiUnreadTotalRef.current - recentUnreadCount + ); } - setupElectricRealtime(); - - return () => { - mounted = false; - // Only clean up live queries — sync shape is shared across instances - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may be closed */ - } - liveQueryRef.current = null; - } - if (unreadLiveQueryRef.current) { - try { - unreadLiveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may be closed */ - } - unreadLiveQueryRef.current = null; - } - }; - }, [userId, searchSpaceId, electricClient, category]); + if (olderUnreadOffsetRef.current !== null) { + const recentUnreadCount = recentItems.filter((item) => !item.read).length; + setUnreadCount(olderUnreadOffsetRef.current + recentUnreadCount); + } + }, [userId, searchSpaceId, zeroNotifications, categoryTypes]); // Load more pages via API (cursor-based using before_date) const loadMore = useCallback(async () => {