diff --git a/surfsense_web/hooks/use-documents-processing.ts b/surfsense_web/hooks/use-documents-processing.ts index bb9901e64..7075e9dae 100644 --- a/surfsense_web/hooks/use-documents-processing.ts +++ b/surfsense_web/hooks/use-documents-processing.ts @@ -1,7 +1,8 @@ "use client"; import { useEffect, useRef, useState } from "react"; -import { useElectricClient } from "@/lib/electric/context"; +import { queries } from "@/zero/queries"; +import { useQuery } from "@rocicorp/zero/react"; export type DocumentsProcessingStatus = "idle" | "processing" | "success" | "error"; @@ -15,152 +16,66 @@ const SUCCESS_LINGER_MS = 5000; * - "idle" — nothing noteworthy (show normal icon) */ export function useDocumentsProcessing(searchSpaceId: number | null): DocumentsProcessingStatus { - const electricClient = useElectricClient(); const [status, setStatus] = useState("idle"); - const liveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null); const wasProcessingRef = useRef(false); const successTimerRef = useRef | null>(null); + const [documents] = useQuery( + queries.documents.bySpace({ searchSpaceId: searchSpaceId ?? -1 }) + ); + useEffect(() => { - if (!searchSpaceId || !electricClient) return; + if (!searchSpaceId || !documents) return; - const spaceId = searchSpaceId; - const client = electricClient; - let mounted = true; + let processingCount = 0; + let failedCount = 0; - async function setup() { - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may be closed */ - } - liveQueryRef.current = null; - } - - try { - const handle = await client.syncShape({ - table: "documents", - where: `search_space_id = ${spaceId}`, - columns: [ - "id", - "document_type", - "search_space_id", - "title", - "created_by_id", - "created_at", - "status", - ], - primaryKey: ["id"], - }); - - if (!mounted) return; - - if (!handle.isUpToDate && handle.initialSyncPromise) { - await Promise.race([ - handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 5000)), - ]); - } - - if (!mounted) return; - - const db = client.db as { - live?: { - query: ( - sql: string, - params?: (number | string)[] - ) => Promise<{ - subscribe: (cb: (result: { rows: T[] }) => void) => void; - unsubscribe?: () => void; - }>; - }; - }; - - if (!db.live?.query) return; - - const liveQuery = await db.live.query<{ - processing_count: number | string; - failed_count: number | string; - }>( - `SELECT - SUM(CASE WHEN status->>'state' IN ('pending', 'processing') THEN 1 ELSE 0 END) AS processing_count, - SUM(CASE WHEN status->>'state' = 'failed' THEN 1 ELSE 0 END) AS failed_count - FROM documents - WHERE search_space_id = $1`, - [spaceId] - ); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - liveQuery.subscribe( - (result: { - rows: Array<{ processing_count: number | string; failed_count: number | string }>; - }) => { - if (!mounted || !result.rows?.[0]) return; - - const processingCount = Number(result.rows[0].processing_count) || 0; - const failedCount = Number(result.rows[0].failed_count) || 0; - - if (processingCount > 0) { - wasProcessingRef.current = true; - if (successTimerRef.current) { - clearTimeout(successTimerRef.current); - successTimerRef.current = null; - } - setStatus("processing"); - } else if (failedCount > 0) { - wasProcessingRef.current = false; - if (successTimerRef.current) { - clearTimeout(successTimerRef.current); - successTimerRef.current = null; - } - setStatus("error"); - } else if (wasProcessingRef.current) { - wasProcessingRef.current = false; - setStatus("success"); - if (successTimerRef.current) { - clearTimeout(successTimerRef.current); - } - successTimerRef.current = setTimeout(() => { - if (mounted) { - setStatus("idle"); - successTimerRef.current = null; - } - }, SUCCESS_LINGER_MS); - } else { - setStatus("idle"); - } - } - ); - - liveQueryRef.current = liveQuery; - } catch (err) { - console.error("[useDocumentsProcessing] Electric setup failed:", err); + for (const doc of documents) { + const state = (doc.status as { state?: string } | null)?.state; + if (state === "pending" || state === "processing") { + processingCount++; + } else if (state === "failed") { + failedCount++; } } - setup(); - - return () => { - mounted = false; + if (processingCount > 0) { + wasProcessingRef.current = true; if (successTimerRef.current) { clearTimeout(successTimerRef.current); successTimerRef.current = null; } - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe?.(); - } catch { - /* PGlite may be closed */ - } - liveQueryRef.current = null; + setStatus("processing"); + } else if (failedCount > 0) { + wasProcessingRef.current = false; + if (successTimerRef.current) { + clearTimeout(successTimerRef.current); + successTimerRef.current = null; + } + setStatus("error"); + } else if (wasProcessingRef.current) { + wasProcessingRef.current = false; + setStatus("success"); + if (successTimerRef.current) { + clearTimeout(successTimerRef.current); + } + successTimerRef.current = setTimeout(() => { + setStatus("idle"); + successTimerRef.current = null; + }, SUCCESS_LINGER_MS); + } else { + setStatus("idle"); + } + }, [searchSpaceId, documents]); + + useEffect(() => { + return () => { + if (successTimerRef.current) { + clearTimeout(successTimerRef.current); + successTimerRef.current = null; } }; - }, [searchSpaceId, electricClient]); + }, []); return status; }