From 5ad25d10ea065dcd8cb856a074741801ada5ec7c Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Mon, 23 Mar 2026 19:00:20 +0200 Subject: [PATCH] feat: rewrite use-messages-electric hook from Electric to Zero MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace PGlite sync+live query with Zero useQuery. Maps camelCase output to snake_case to preserve consumer contract. 162 → 38 lines. --- surfsense_web/hooks/use-messages-electric.ts | 156 ++----------------- 1 file changed, 16 insertions(+), 140 deletions(-) diff --git a/surfsense_web/hooks/use-messages-electric.ts b/surfsense_web/hooks/use-messages-electric.ts index 728503de9..151c3ae2a 100644 --- a/surfsense_web/hooks/use-messages-electric.ts +++ b/surfsense_web/hooks/use-messages-electric.ts @@ -1,162 +1,38 @@ "use client"; -import { useCallback, useEffect, useRef } from "react"; +import { useEffect, useRef } from "react"; import type { RawMessage } from "@/contracts/types/chat-messages.types"; -import type { SyncHandle } from "@/lib/electric/client"; -import { useElectricClient } from "@/lib/electric/context"; +import { queries } from "@/zero/queries"; +import { useQuery } from "@rocicorp/zero/react"; /** - * Syncs chat messages for a thread via Electric SQL. + * Syncs chat messages for a thread via Zero. * Calls onMessagesUpdate when messages change. */ export function useMessagesElectric( threadId: number | null, onMessagesUpdate: (messages: RawMessage[]) => void ) { - const electricClient = useElectricClient(); - - const syncHandleRef = useRef(null); - const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); - const syncKeyRef = useRef(null); const onMessagesUpdateRef = useRef(onMessagesUpdate); useEffect(() => { onMessagesUpdateRef.current = onMessagesUpdate; }, [onMessagesUpdate]); - const handleMessagesUpdate = useCallback((rows: RawMessage[]) => { - onMessagesUpdateRef.current(rows); - }, []); + const [messages] = useQuery(queries.messages.byThread({ threadId: threadId ?? -1 })); useEffect(() => { - if (!threadId || !electricClient) { - return; - } + if (!threadId || !messages) return; - const syncKey = `messages_${threadId}`; - if (syncKeyRef.current === syncKey) { - return; - } + const mapped: RawMessage[] = messages.map((msg) => ({ + id: msg.id, + thread_id: msg.threadId, + role: msg.role, + content: msg.content, + author_id: msg.authorId ?? null, + created_at: String(msg.createdAt), + })); - const client = electricClient; - let mounted = true; - syncKeyRef.current = syncKey; - - async function startSync() { - try { - const handle = await client.syncShape({ - table: "new_chat_messages", - where: `thread_id = ${threadId}`, - columns: ["id", "thread_id", "role", "content", "author_id", "created_at"], - primaryKey: ["id"], - }); - - if (!handle.isUpToDate && handle.initialSyncPromise) { - try { - await Promise.race([ - handle.initialSyncPromise, - new Promise((resolve) => setTimeout(resolve, 3000)), - ]); - } catch { - // Timeout - } - } - - if (!mounted) { - handle.unsubscribe(); - return; - } - - syncHandleRef.current = handle; - await fetchMessages(); - await setupLiveQuery(); - } catch { - // Sync failed - } - } - - async function fetchMessages() { - try { - const result = await client.db.query( - `SELECT id, thread_id, role, content, author_id, created_at - FROM new_chat_messages - WHERE thread_id = $1 - ORDER BY created_at ASC`, - [threadId] - ); - - if (mounted && result.rows) { - handleMessagesUpdate(result.rows); - } - } catch { - // Query failed - } - } - - async function setupLiveQuery() { - try { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const db = client.db as any; - - if (db.live?.query && typeof db.live.query === "function") { - const liveQuery = await db.live.query( - `SELECT id, thread_id, role, content, author_id, created_at - FROM new_chat_messages - WHERE thread_id = $1 - ORDER BY created_at ASC`, - [threadId] - ); - - if (!mounted) { - liveQuery.unsubscribe?.(); - return; - } - - if (liveQuery.initialResults?.rows) { - handleMessagesUpdate(liveQuery.initialResults.rows); - } else if (liveQuery.rows) { - handleMessagesUpdate(liveQuery.rows); - } - - if (typeof liveQuery.subscribe === "function") { - liveQuery.subscribe((result: { rows: RawMessage[] }) => { - if (mounted && result.rows) { - handleMessagesUpdate(result.rows); - } - }); - } - - if (typeof liveQuery.unsubscribe === "function") { - liveQueryRef.current = liveQuery; - } - } - } catch { - // Live query failed - } - } - - startSync(); - - return () => { - mounted = false; - syncKeyRef.current = null; - - if (syncHandleRef.current) { - try { - syncHandleRef.current.unsubscribe(); - } catch { - // PGlite may already be closed during cleanup - } - syncHandleRef.current = null; - } - if (liveQueryRef.current) { - try { - liveQueryRef.current.unsubscribe(); - } catch { - // PGlite may already be closed during cleanup - } - liveQueryRef.current = null; - } - }; - }, [threadId, electricClient, handleMessagesUpdate]); + onMessagesUpdateRef.current(mapped); + }, [threadId, messages]); }