diff --git a/surfsense_web/contracts/types/chat-messages.types.ts b/surfsense_web/contracts/types/chat-messages.types.ts index faba71bff..78bf7b043 100644 --- a/surfsense_web/contracts/types/chat-messages.types.ts +++ b/surfsense_web/contracts/types/chat-messages.types.ts @@ -10,7 +10,6 @@ export const rawMessage = z.object({ content: z.unknown(), author_id: z.string().nullable(), created_at: z.string(), - updated_at: z.string(), }); export type RawMessage = z.infer; diff --git a/surfsense_web/hooks/use-chat-messages-live.ts b/surfsense_web/hooks/use-chat-messages-live.ts deleted file mode 100644 index 4a8ae97e6..000000000 --- a/surfsense_web/hooks/use-chat-messages-live.ts +++ /dev/null @@ -1,82 +0,0 @@ -"use client"; - -import { useShape } from "@electric-sql/react"; -import { useAtomValue } from "jotai"; -import { useMemo } from "react"; -import { membersAtom } from "@/atoms/members/members-query.atoms"; -import type { RawMessage } from "@/contracts/types/chat-messages.types"; -import type { Membership } from "@/contracts/types/members.types"; -import type { MessageRecord } from "@/lib/chat/thread-persistence"; - -const ELECTRIC_URL = process.env.NEXT_PUBLIC_ELECTRIC_URL || "http://localhost:5133"; - -/** - * Member info for building author data - derived from Membership - */ -type MemberInfo = Pick; - -/** - * Hook to get live chat messages for real-time sync. - * Uses Electric SQL for messages + membersAtom (API) for author info. - */ -export function useChatMessagesLive(threadId: number | null) { - - const { - data: messagesData, - isLoading: messagesLoading, - isError: messagesError, - error: messagesErrorDetails, - } = useShape({ - url: `${ELECTRIC_URL}/v1/shape`, - params: { - table: "new_chat_messages", - where: `thread_id = ${threadId}`, - }, - }); - - - const { data: membersData, isLoading: membersLoading } = useAtomValue(membersAtom); - - - const messages = useMemo(() => { - if (!messagesData) return []; - - // Build member lookup map - const memberMap = new Map(); - if (membersData) { - for (const member of membersData) { - memberMap.set(member.user_id, { - user_display_name: member.user_display_name, - user_avatar_url: member.user_avatar_url, - }); - } - } - - // Transform raw messages to MessageRecord with author info - return [...messagesData].map((msg): MessageRecord => { - const author = msg.author_id ? memberMap.get(msg.author_id) : null; - - const role = (typeof msg.role === "string" ? msg.role.toLowerCase() : msg.role) as - | "user" - | "assistant" - | "system"; - return { - id: msg.id, - thread_id: msg.thread_id, - role, - content: msg.content, - created_at: msg.created_at, - author_id: msg.author_id, - author_display_name: author?.user_display_name ?? null, - author_avatar_url: author?.user_avatar_url ?? null, - }; - }); - }, [messagesData, membersData]); - - return { - messages, - isLoading: messagesLoading || membersLoading, - isError: messagesError, - error: messagesError ? messagesErrorDetails : null, - }; -} diff --git a/surfsense_web/hooks/use-messages-electric.ts b/surfsense_web/hooks/use-messages-electric.ts new file mode 100644 index 000000000..e8c82e92b --- /dev/null +++ b/surfsense_web/hooks/use-messages-electric.ts @@ -0,0 +1,154 @@ +"use client"; + +import { useCallback, useEffect, useRef } from "react"; +import type { RawMessage } from "@/contracts/types/chat-messages.types"; +import type { SyncHandle } from "@/lib/electric/client"; +import { useElectricClient } from "@/lib/electric/context"; + +/** + * Syncs chat messages for a thread via Electric SQL. + * Calls onMessagesUpdate when messages change. + */ +export function useMessagesElectric( + threadId: number | null, + onMessagesUpdate: (messages: RawMessage[]) => void +) { + const electricClient = useElectricClient(); + + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); + const syncKeyRef = useRef(null); + const onMessagesUpdateRef = useRef(onMessagesUpdate); + + useEffect(() => { + onMessagesUpdateRef.current = onMessagesUpdate; + }, [onMessagesUpdate]); + + const handleMessagesUpdate = useCallback((rows: RawMessage[]) => { + onMessagesUpdateRef.current(rows); + }, []); + + useEffect(() => { + if (!threadId || !electricClient) { + return; + } + + const syncKey = `messages_${threadId}`; + if (syncKeyRef.current === syncKey) { + return; + } + + const client = electricClient; + let mounted = true; + syncKeyRef.current = syncKey; + + async function startSync() { + try { + const handle = await client.syncShape({ + table: "new_chat_messages", + where: `thread_id = ${threadId}`, + columns: ["id", "thread_id", "role", "content", "author_id", "created_at"], + primaryKey: ["id"], + }); + + if (!handle.isUpToDate && handle.initialSyncPromise) { + try { + await Promise.race([ + handle.initialSyncPromise, + new Promise((resolve) => setTimeout(resolve, 3000)), + ]); + } catch { + // Timeout + } + } + + if (!mounted) { + handle.unsubscribe(); + return; + } + + syncHandleRef.current = handle; + await fetchMessages(); + await setupLiveQuery(); + } catch { + // Sync failed + } + } + + async function fetchMessages() { + try { + const result = await client.db.query( + `SELECT id, thread_id, role, content, author_id, created_at + FROM new_chat_messages + WHERE thread_id = $1 + ORDER BY created_at ASC`, + [threadId] + ); + + if (mounted && result.rows) { + handleMessagesUpdate(result.rows); + } + } catch { + // Query failed + } + } + + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = client.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + const liveQuery = await db.live.query( + `SELECT id, thread_id, role, content, author_id, created_at + FROM new_chat_messages + WHERE thread_id = $1 + ORDER BY created_at ASC`, + [threadId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + if (liveQuery.initialResults?.rows) { + handleMessagesUpdate(liveQuery.initialResults.rows); + } else if (liveQuery.rows) { + handleMessagesUpdate(liveQuery.rows); + } + + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: RawMessage[] }) => { + if (mounted && result.rows) { + handleMessagesUpdate(result.rows); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch { + // Live query failed + } + } + + startSync(); + + return () => { + mounted = false; + syncKeyRef.current = null; + + if (syncHandleRef.current) { + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; + } + if (liveQueryRef.current) { + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; + } + }; + }, [threadId, electricClient, handleMessagesUpdate]); +} diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 84df6c905..479026120 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -258,6 +258,21 @@ export async function initElectric(userId: string): Promise { CREATE INDEX IF NOT EXISTS idx_chat_comments_parent_id ON chat_comments(parent_id); `); + // Create new_chat_messages table for live message sync + await db.exec(` + CREATE TABLE IF NOT EXISTS new_chat_messages ( + id INTEGER PRIMARY KEY, + thread_id INTEGER NOT NULL, + role TEXT NOT NULL, + content JSONB NOT NULL, + author_id TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_new_chat_messages_thread_id ON new_chat_messages(thread_id); + CREATE INDEX IF NOT EXISTS idx_new_chat_messages_created_at ON new_chat_messages(created_at); + `); + const electricUrl = getElectricUrl(); // STEP 4: Create the client wrapper