diff --git a/surfsense_web/hooks/use-documents.ts b/surfsense_web/hooks/use-documents.ts index 5fee85d01..41461a3b2 100644 --- a/surfsense_web/hooks/use-documents.ts +++ b/surfsense_web/hooks/use-documents.ts @@ -3,25 +3,14 @@ import { useCallback, useEffect, useRef, useState } from "react"; import type { DocumentSortBy, DocumentTypeEnum, SortOrder } from "@/contracts/types/document.types"; import { documentsApiService } from "@/lib/apis/documents-api.service"; -import { filterNewElectricItems, getNewestTimestamp } from "@/lib/electric/baseline"; -import type { SyncHandle } from "@/lib/electric/client"; -import { useElectricClient } from "@/lib/electric/context"; +import { queries } from "@/zero/queries"; +import { useQuery } from "@rocicorp/zero/react"; export interface DocumentStatusType { state: "ready" | "pending" | "processing" | "failed"; reason?: string; } -interface DocumentElectric { - id: number; - search_space_id: number; - document_type: string; - title: string; - created_by_id: string | null; - created_at: string; - status: DocumentStatusType | null; -} - export interface DocumentDisplay { id: number; search_space_id: number; @@ -64,23 +53,14 @@ const EMPTY_TYPE_FILTER: DocumentTypeEnum[] = []; const INITIAL_PAGE_SIZE = 50; const SCROLL_PAGE_SIZE = 5; -function isValidDocument(doc: DocumentElectric): boolean { - return doc.id != null && doc.title != null && doc.title !== ""; -} - /** - * Paginated documents hook with Electric SQL real-time updates. + * Paginated documents hook with Zero real-time updates. * * Architecture: * 1. API is the PRIMARY data source — fetches pages on demand * 2. Type counts come from a dedicated lightweight API endpoint - * 3. Electric provides REAL-TIME updates (new docs, deletions, status changes) + * 3. Zero provides REAL-TIME updates (new docs, deletions, status changes) * 4. Server-side sorting via sort_by + sort_order params - * - * @param searchSpaceId - The search space to load documents for - * @param typeFilter - Document types to filter by (server-side) - * @param sortBy - Column to sort by (server-side) - * @param sortOrder - Sort direction (server-side) */ export function useDocuments( searchSpaceId: number | null, @@ -88,8 +68,6 @@ export function useDocuments( sortBy: DocumentSortBy = "created_at", sortOrder: SortOrder = "desc" ) { - const electricClient = useElectricClient(); - const [documents, setDocuments] = useState([]); const [typeCounts, setTypeCounts] = useState>({}); const [total, setTotal] = useState(0); @@ -103,14 +81,8 @@ export function useDocuments( const prevParamsRef = useRef<{ sortBy: string; sortOrder: string; typeFilterKey: string } | null>( null ); - // Snapshot of all doc IDs from Electric's first callback after initial load. - // Anything appearing in subsequent callbacks NOT in this set is genuinely new. - const electricBaselineIdsRef = useRef | null>(null); - const newestApiTimestampRef = useRef(null); const userCacheRef = useRef>(new Map()); const emailCacheRef = useRef>(new Map()); - const syncHandleRef = useRef(null); - const liveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null); const typeFilterKey = typeFilter.join(","); @@ -141,20 +113,6 @@ export function useDocuments( [] ); - const electricToDisplayDoc = useCallback( - (doc: DocumentElectric): DocumentDisplay => ({ - ...doc, - created_by_name: doc.created_by_id - ? (userCacheRef.current.get(doc.created_by_id) ?? null) - : null, - created_by_email: doc.created_by_id - ? (emailCacheRef.current.get(doc.created_by_id) ?? null) - : null, - status: doc.status ?? { state: "ready" }, - }), - [] - ); - // EFFECT 1: Fetch first page + type counts when params change // biome-ignore lint/correctness/useExhaustiveDependencies: typeFilterKey serializes typeFilter useEffect(() => { @@ -178,8 +136,6 @@ export function useDocuments( } apiLoadedCountRef.current = 0; initialLoadDoneRef.current = false; - electricBaselineIdsRef.current = null; - newestApiTimestampRef.current = null; const fetchInitialData = async () => { try { @@ -209,7 +165,6 @@ export function useDocuments( setTypeCounts(countsResponse); setError(null); apiLoadedCountRef.current = docsResponse.items.length; - newestApiTimestampRef.current = getNewestTimestamp(docsResponse.items); initialLoadDoneRef.current = true; } catch (err) { if (cancelled) return; @@ -226,207 +181,104 @@ export function useDocuments( }; }, [searchSpaceId, typeFilterKey, sortBy, sortOrder, populateUserCache, apiToDisplayDoc]); - // EFFECT 2: Electric sync + live query for real-time updates + // EFFECT 2: Zero real-time sync for document updates + const [zeroDocuments] = useQuery( + queries.documents.bySpace({ searchSpaceId: searchSpaceId ?? -1 }) + ); + useEffect(() => { - if (!searchSpaceId || !electricClient) return; + if (!searchSpaceId || !zeroDocuments || !initialLoadDoneRef.current) return; - const spaceId = searchSpaceId; - const client = electricClient; - let mounted = true; + const validItems = zeroDocuments.filter( + (doc) => doc.id != null && doc.title != null && doc.title !== "" + ); - async function setupElectricRealtime() { - if (syncHandleRef.current) { - try { - syncHandleRef.current.unsubscribe(); - } catch { - /* PGlite may already be closed */ - } - syncHandleRef.current = null; - } - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may already be closed */ - } - liveQueryRef.current = null; - } + const unknownUserIds = validItems.filter( + (doc) => doc.createdById !== null && !userCacheRef.current.has(doc.createdById!) + ); - try { - const handle = await client.syncShape({ - table: "documents", - where: `search_space_id = ${spaceId}`, - columns: [ - "id", - "document_type", - "search_space_id", - "title", - "created_by_id", - "created_at", - "status", - ], - primaryKey: ["id"], - }); - - if (!mounted) { - handle.unsubscribe(); - return; - } - - syncHandleRef.current = handle; - - if (!handle.isUpToDate && handle.initialSyncPromise) { - await Promise.race([ - handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 5000)), - ]); - } - - if (!mounted) return; - - const db = client.db as { - live?: { - query: ( - sql: string, - params?: (number | string)[] - ) => Promise<{ - subscribe: (cb: (result: { rows: T[] }) => void) => void; - unsubscribe?: () => void; - }>; - }; - }; - - if (!db.live?.query) return; - - const query = `SELECT id, document_type, search_space_id, title, created_by_id, created_at, status - FROM documents - WHERE search_space_id = $1 - ORDER BY created_at DESC`; - - const liveQuery = await db.live.query(query, [spaceId]); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - liveQuery.subscribe((result: { rows: DocumentElectric[] }) => { - if (!mounted || !result.rows || !initialLoadDoneRef.current) return; - - const validItems = result.rows.filter(isValidDocument); - const isFullySynced = syncHandleRef.current?.isUpToDate ?? false; - - const unknownUserIds = validItems - .filter( - (doc): doc is DocumentElectric & { created_by_id: string } => - doc.created_by_id !== null && !userCacheRef.current.has(doc.created_by_id) - ) - .map((doc) => doc.created_by_id); - - if (unknownUserIds.length > 0) { - documentsApiService - .getDocuments({ - queryParams: { - search_space_id: spaceId, - page: 0, - page_size: 20, - }, - }) - .then((response) => { - populateUserCache(response.items); - if (mounted) { - setDocuments((prev) => - prev.map((doc) => ({ - ...doc, - created_by_name: doc.created_by_id - ? (userCacheRef.current.get(doc.created_by_id) ?? null) - : null, - created_by_email: doc.created_by_id - ? (emailCacheRef.current.get(doc.created_by_id) ?? null) - : null, - })) - ); - } - }) - .catch(() => {}); - } - - setDocuments((prev) => { - const liveIds = new Set(validItems.map((d) => d.id)); - const prevIds = new Set(prev.map((d) => d.id)); - - const newItems = filterNewElectricItems( - validItems, - liveIds, - prevIds, - electricBaselineIdsRef, - newestApiTimestampRef.current - ).map(electricToDisplayDoc); - - // Update existing docs (status changes, title edits) - let updated = prev.map((doc) => { - if (liveIds.has(doc.id)) { - const liveItem = validItems.find((v) => v.id === doc.id); - if (liveItem) { - return electricToDisplayDoc(liveItem); - } - } - return doc; - }); - - // Remove deleted docs (only when fully synced) - if (isFullySynced) { - updated = updated.filter((doc) => liveIds.has(doc.id)); - } - - if (newItems.length > 0) { - return [...newItems, ...updated]; - } - - return updated; - }); - - // Update type counts when Electric detects changes - if (isFullySynced && validItems.length > 0) { - const counts: Record = {}; - for (const item of validItems) { - counts[item.document_type] = (counts[item.document_type] || 0) + 1; - } - setTypeCounts(counts); - setTotal(validItems.length); - } - }); - - liveQueryRef.current = liveQuery; - } catch (err) { - console.error("[useDocuments] Electric setup failed:", err); - } + if (unknownUserIds.length > 0) { + documentsApiService + .getDocuments({ + queryParams: { + search_space_id: searchSpaceId, + page: 0, + page_size: 20, + }, + }) + .then((response) => { + populateUserCache(response.items); + setDocuments((prev) => + prev.map((doc) => ({ + ...doc, + created_by_name: doc.created_by_id + ? (userCacheRef.current.get(doc.created_by_id) ?? null) + : null, + created_by_email: doc.created_by_id + ? (emailCacheRef.current.get(doc.created_by_id) ?? null) + : null, + })) + ); + }) + .catch(() => {}); } - setupElectricRealtime(); + const liveIds = new Set(validItems.map((d) => d.id)); - return () => { - mounted = false; - if (syncHandleRef.current) { - try { - syncHandleRef.current.unsubscribe(); - } catch { - /* PGlite may already be closed */ - } - syncHandleRef.current = null; - } - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may already be closed */ - } - liveQueryRef.current = null; - } - }; - }, [searchSpaceId, electricClient, electricToDisplayDoc, populateUserCache]); + setDocuments((prev) => { + const prevIds = new Set(prev.map((d) => d.id)); - // Reset on search space change + const newItems: DocumentDisplay[] = validItems + .filter((d) => !prevIds.has(d.id)) + .map((doc) => ({ + id: doc.id, + search_space_id: doc.searchSpaceId, + document_type: doc.documentType, + title: doc.title, + created_by_id: doc.createdById ?? null, + created_by_name: doc.createdById + ? (userCacheRef.current.get(doc.createdById) ?? null) + : null, + created_by_email: doc.createdById + ? (emailCacheRef.current.get(doc.createdById) ?? null) + : null, + created_at: String(doc.createdAt), + status: (doc.status as unknown as DocumentStatusType) ?? { state: "ready" }, + })); + + let updated = prev.map((existing) => { + if (liveIds.has(existing.id)) { + const liveItem = validItems.find((v) => v.id === existing.id); + if (liveItem) { + return { + ...existing, + title: liveItem.title, + document_type: liveItem.documentType, + status: (liveItem.status as unknown as DocumentStatusType) ?? { state: "ready" as const }, + }; + } + } + return existing; + }); + + updated = updated.filter((doc) => liveIds.has(doc.id)); + + if (newItems.length > 0) { + return [...newItems, ...updated]; + } + + return updated; + }); + + const counts: Record = {}; + for (const item of validItems) { + counts[item.documentType] = (counts[item.documentType] || 0) + 1; + } + setTypeCounts(counts); + setTotal(validItems.length); + }, [searchSpaceId, zeroDocuments, populateUserCache]); + + // EFFECT 3: Reset on search space change const prevSearchSpaceIdRef = useRef(null); useEffect(() => { @@ -437,8 +289,6 @@ export function useDocuments( setHasMore(false); apiLoadedCountRef.current = 0; initialLoadDoneRef.current = false; - electricBaselineIdsRef.current = null; - newestApiTimestampRef.current = null; userCacheRef.current.clear(); emailCacheRef.current.clear(); }