diff --git a/surfsense_web/hooks/use-connectors-electric.ts b/surfsense_web/hooks/use-connectors-electric.ts index 951f1d15a..3714e4af0 100644 --- a/surfsense_web/hooks/use-connectors-electric.ts +++ b/surfsense_web/hooks/use-connectors-electric.ts @@ -1,216 +1,45 @@ "use client"; -import { useCallback, useEffect, useRef, useState } from "react"; +import { useMemo } from "react"; import type { SearchSourceConnector } from "@/contracts/types/connector.types"; -import type { SyncHandle } from "@/lib/electric/client"; -import { useElectricClient } from "@/lib/electric/context"; +import { queries } from "@/zero/queries"; +import { useQuery } from "@rocicorp/zero/react"; /** - * Hook for managing connectors with Electric SQL real-time sync - * - * Uses the Electric client from context (provided by ElectricProvider) - * instead of initializing its own - prevents race conditions and memory leaks + * Syncs connectors for a search space via Zero. + * Returns connectors, loading state, error, and a refresh function. */ export function useConnectorsElectric(searchSpaceId: number | string | null) { - // Get Electric client from context - ElectricProvider handles initialization - const electricClient = useElectricClient(); + const spaceId = searchSpaceId ? Number(searchSpaceId) : -1; - const [connectors, setConnectors] = useState([]); - const [loading, setLoading] = useState(true); - const [error, setError] = useState(null); - const syncHandleRef = useRef(null); - const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); - const syncKeyRef = useRef(null); + const [data, result] = useQuery(queries.connectors.bySpace({ searchSpaceId: spaceId })); - // Transform connector data from Electric SQL/PGlite to match expected format - function transformConnector(connector: any): SearchSourceConnector { - return { - ...connector, - last_indexed_at: connector.last_indexed_at - ? typeof connector.last_indexed_at === "string" - ? connector.last_indexed_at - : new Date(connector.last_indexed_at).toISOString() - : null, - next_scheduled_at: connector.next_scheduled_at - ? typeof connector.next_scheduled_at === "string" - ? connector.next_scheduled_at - : new Date(connector.next_scheduled_at).toISOString() - : null, - created_at: connector.created_at - ? typeof connector.created_at === "string" - ? connector.created_at - : new Date(connector.created_at).toISOString() - : new Date().toISOString(), - }; - } + const connectors: SearchSourceConnector[] = useMemo(() => { + if (!searchSpaceId || !data) return []; + return data.map((c) => ({ + id: c.id, + name: c.name, + connector_type: c.connectorType as SearchSourceConnector["connector_type"], + is_indexable: c.isIndexable, + is_active: true, + last_indexed_at: c.lastIndexedAt ? new Date(c.lastIndexedAt).toISOString() : null, + config: (c.config as Record) ?? {}, + enable_summary: c.enableSummary, + periodic_indexing_enabled: c.periodicIndexingEnabled, + indexing_frequency_minutes: c.indexingFrequencyMinutes ?? null, + next_scheduled_at: c.nextScheduledAt ? new Date(c.nextScheduledAt).toISOString() : null, + search_space_id: c.searchSpaceId, + user_id: c.userId, + created_at: c.createdAt ? new Date(c.createdAt).toISOString() : new Date().toISOString(), + })); + }, [searchSpaceId, data]); - // Start syncing when Electric client is available - useEffect(() => { - // If no Electric client available, immediately mark as not loading (disabled) - if (!electricClient) { - setLoading(false); - setError(new Error("Electric SQL not configured")); - return; - } + const loading = !searchSpaceId ? false : result.type !== "complete"; + const error = !searchSpaceId ? null : null; - // Wait for searchSpaceId to be available - if (!searchSpaceId) { - setConnectors([]); - setLoading(false); - return; - } - - // Create a unique key for this sync to prevent duplicate subscriptions - const syncKey = `connectors_${searchSpaceId}`; - if (syncKeyRef.current === syncKey) { - // Already syncing for this search space - return; - } - - let mounted = true; - syncKeyRef.current = syncKey; - - async function startSync() { - try { - console.log("[useConnectorsElectric] Starting sync for search space:", searchSpaceId); - - const handle = await electricClient.syncShape({ - table: "search_source_connectors", - where: `search_space_id = ${searchSpaceId}`, - primaryKey: ["id"], - }); - - console.log("[useConnectorsElectric] Sync started:", { - isUpToDate: handle.isUpToDate, - }); - - // Wait for initial sync with timeout - if (!handle.isUpToDate && handle.initialSyncPromise) { - try { - await Promise.race([ - handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 2000)), - ]); - } catch (syncErr) { - console.error("[useConnectorsElectric] Initial sync failed:", syncErr); - } - } - - if (!mounted) { - handle.unsubscribe(); - return; - } - - syncHandleRef.current = handle; - setLoading(false); - setError(null); - - // Fetch initial connectors - await fetchConnectors(); - - // Set up live query for real-time updates - await setupLiveQuery(); - } catch (err) { - if (!mounted) return; - console.error("[useConnectorsElectric] Failed to start sync:", err); - setError(err instanceof Error ? err : new Error("Failed to sync connectors")); - setLoading(false); - } - } - - async function fetchConnectors() { - try { - const result = await electricClient.db.query( - `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - if (mounted) { - setConnectors((result.rows || []).map(transformConnector)); - } - } catch (err) { - console.error("[useConnectorsElectric] Failed to fetch:", err); - } - } - - async function setupLiveQuery() { - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = electricClient.db as any; - - if (db.live?.query && typeof db.live.query === "function") { - const liveQuery = await db.live.query( - `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - // Set initial results - if (liveQuery.initialResults?.rows) { - setConnectors(liveQuery.initialResults.rows.map(transformConnector)); - } else if (liveQuery.rows) { - setConnectors(liveQuery.rows.map(transformConnector)); - } - - // Subscribe to changes - if (typeof liveQuery.subscribe === "function") { - liveQuery.subscribe((result: { rows: any[] }) => { - if (mounted && result.rows) { - setConnectors(result.rows.map(transformConnector)); - } - }); - } - - if (typeof liveQuery.unsubscribe === "function") { - liveQueryRef.current = liveQuery; - } - } - } catch (liveErr) { - console.error("[useConnectorsElectric] Failed to set up live query:", liveErr); - } - } - - startSync(); - - return () => { - mounted = false; - syncKeyRef.current = null; - - if (syncHandleRef.current) { - try { - syncHandleRef.current.unsubscribe(); - } catch { - // PGlite may already be closed during cleanup - } - syncHandleRef.current = null; - } - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe(); - } catch { - // PGlite may already be closed during cleanup - } - liveQueryRef.current = null; - } - }; - }, [searchSpaceId, electricClient]); - - // Manual refresh function (optional, for fallback) - const refreshConnectors = useCallback(async () => { - if (!electricClient) return; - try { - const result = await electricClient.db.query( - `SELECT * FROM search_source_connectors WHERE search_space_id = $1 ORDER BY created_at DESC`, - [searchSpaceId] - ); - setConnectors((result.rows || []).map(transformConnector)); - } catch (err) { - console.error("[useConnectorsElectric] Failed to refresh:", err); - } - }, [electricClient, searchSpaceId]); + const refreshConnectors = async () => { + // Zero handles reactivity automatically — no manual refresh needed + }; return { connectors, loading, error, refreshConnectors }; }