From 2839501503f5baec78b5ab14f4a80e208a2c701d Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Mon, 19 Jan 2026 17:28:44 +0200 Subject: [PATCH] feat: add Electric SQL replication for chat_comment_mentions --- .../71_add_comments_electric_replication.py | 28 ++- surfsense_web/hooks/use-mentions-electric.ts | 167 ++++++++++++++++++ surfsense_web/lib/electric/client.ts | 13 ++ 3 files changed, 191 insertions(+), 17 deletions(-) create mode 100644 surfsense_web/hooks/use-mentions-electric.ts diff --git a/surfsense_backend/alembic/versions/71_add_comments_electric_replication.py b/surfsense_backend/alembic/versions/71_add_comments_electric_replication.py index 0f6348525..d99843584 100644 --- a/surfsense_backend/alembic/versions/71_add_comments_electric_replication.py +++ b/surfsense_backend/alembic/versions/71_add_comments_electric_replication.py @@ -1,17 +1,16 @@ -"""Add Electric SQL replication for chat_comments table +"""Add Electric SQL replication for chat_comment_mentions table Revision ID: 71 Revises: 70 -Enables Electric SQL replication for the chat_comments table to support -real-time live updates for comments in chat threads. +Enables Electric SQL replication for the chat_comment_mentions table to support +real-time live updates for mentions. """ from collections.abc import Sequence from alembic import op -# revision identifiers, used by Alembic. revision: str = "71" down_revision: str | None = "70" branch_labels: str | Sequence[str] | None = None @@ -19,22 +18,19 @@ depends_on: str | Sequence[str] | None = None def upgrade() -> None: - """Enable Electric SQL replication for chat_comments table.""" - # Set REPLICA IDENTITY FULL (required by Electric SQL for replication) - op.execute("ALTER TABLE chat_comments REPLICA IDENTITY FULL;") + """Enable Electric SQL replication for chat_comment_mentions table.""" + op.execute("ALTER TABLE chat_comment_mentions REPLICA IDENTITY FULL;") - # Add chat_comments to Electric SQL publication for replication op.execute( """ DO $$ BEGIN - -- Add chat_comments if not already added IF NOT EXISTS ( SELECT 1 FROM pg_publication_tables WHERE pubname = 'electric_publication_default' - AND tablename = 'chat_comments' + AND tablename = 'chat_comment_mentions' ) THEN - ALTER PUBLICATION electric_publication_default ADD TABLE chat_comments; + ALTER PUBLICATION electric_publication_default ADD TABLE chat_comment_mentions; END IF; END $$; @@ -43,8 +39,7 @@ def upgrade() -> None: def downgrade() -> None: - """Remove chat_comments from Electric SQL replication.""" - # Remove chat_comments from publication + """Remove chat_comment_mentions from Electric SQL replication.""" op.execute( """ DO $$ @@ -52,14 +47,13 @@ def downgrade() -> None: IF EXISTS ( SELECT 1 FROM pg_publication_tables WHERE pubname = 'electric_publication_default' - AND tablename = 'chat_comments' + AND tablename = 'chat_comment_mentions' ) THEN - ALTER PUBLICATION electric_publication_default DROP TABLE chat_comments; + ALTER PUBLICATION electric_publication_default DROP TABLE chat_comment_mentions; END IF; END $$; """ ) - # Reset REPLICA IDENTITY to default - op.execute("ALTER TABLE chat_comments REPLICA IDENTITY DEFAULT;") + op.execute("ALTER TABLE chat_comment_mentions REPLICA IDENTITY DEFAULT;") diff --git a/surfsense_web/hooks/use-mentions-electric.ts b/surfsense_web/hooks/use-mentions-electric.ts new file mode 100644 index 000000000..d56891018 --- /dev/null +++ b/surfsense_web/hooks/use-mentions-electric.ts @@ -0,0 +1,167 @@ +"use client"; + +import { useEffect, useState, useRef } from "react"; +import { useElectricClient } from "@/lib/electric/context"; +import type { SyncHandle } from "@/lib/electric/client"; + +export interface ElectricMention { + id: number; + comment_id: number; + mentioned_user_id: string; + created_at: string; +} + +/** + * Hook for syncing mentions with Electric SQL for real-time updates. + * Syncs all mentions for the current user. + * @param userId - The user ID to sync mentions for + */ +export function useMentionsElectric(userId: string | null) { + const electricClient = useElectricClient(); + + const [mentions, setMentions] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const syncHandleRef = useRef(null); + const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); + const syncKeyRef = useRef(null); + + useEffect(() => { + if (!electricClient) { + setLoading(false); + setError(new Error("Electric SQL not configured")); + return; + } + + if (!userId) { + setMentions([]); + setLoading(false); + return; + } + + const syncKey = `mentions_${userId}`; + if (syncKeyRef.current === syncKey) { + return; + } + + let mounted = true; + syncKeyRef.current = syncKey; + + const client = electricClient; + + async function startSync() { + try { + const handle = await client.syncShape({ + table: "chat_comment_mentions", + where: `mentioned_user_id = '${userId}'`, + primaryKey: ["id"], + }); + + if (!handle.isUpToDate && handle.initialSyncPromise) { + try { + await Promise.race([ + handle.initialSyncPromise, + new Promise((resolve) => setTimeout(resolve, 2000)), + ]); + } catch (syncErr) { + console.error("[useMentionsElectric] Initial sync failed:", syncErr); + } + } + + if (!mounted) { + handle.unsubscribe(); + return; + } + + syncHandleRef.current = handle; + setLoading(false); + setError(null); + + await fetchMentions(); + await setupLiveQuery(); + } catch (err) { + if (!mounted) return; + console.error("[useMentionsElectric] Failed to start sync:", err); + setError(err instanceof Error ? err : new Error("Failed to sync mentions")); + setLoading(false); + } + } + + async function fetchMentions() { + try { + const result = await client.db.query( + `SELECT id, comment_id, mentioned_user_id, created_at + FROM chat_comment_mentions + WHERE mentioned_user_id = $1 + ORDER BY created_at DESC`, + [userId] + ); + if (mounted) { + setMentions(result.rows || []); + } + } catch (err) { + console.error("[useMentionsElectric] Failed to fetch:", err); + } + } + + async function setupLiveQuery() { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const db = client.db as any; + + if (db.live?.query && typeof db.live.query === "function") { + const liveQuery = await db.live.query( + `SELECT id, comment_id, mentioned_user_id, created_at + FROM chat_comment_mentions + WHERE mentioned_user_id = $1 + ORDER BY created_at DESC`, + [userId] + ); + + if (!mounted) { + liveQuery.unsubscribe?.(); + return; + } + + if (liveQuery.initialResults?.rows) { + setMentions(liveQuery.initialResults.rows); + } else if (liveQuery.rows) { + setMentions(liveQuery.rows); + } + + if (typeof liveQuery.subscribe === "function") { + liveQuery.subscribe((result: { rows: ElectricMention[] }) => { + if (mounted && result.rows) { + setMentions(result.rows); + } + }); + } + + if (typeof liveQuery.unsubscribe === "function") { + liveQueryRef.current = liveQuery; + } + } + } catch (liveErr) { + console.error("[useMentionsElectric] Failed to set up live query:", liveErr); + } + } + + startSync(); + + return () => { + mounted = false; + syncKeyRef.current = null; + + if (syncHandleRef.current) { + syncHandleRef.current.unsubscribe(); + syncHandleRef.current = null; + } + if (liveQueryRef.current) { + liveQueryRef.current.unsubscribe(); + liveQueryRef.current = null; + } + }; + }, [userId, electricClient]); + + return { mentions, loading, error }; +} diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index 0a881c235..68d8e8e7b 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -224,6 +224,19 @@ export async function initElectric(userId: string): Promise { CREATE INDEX IF NOT EXISTS idx_documents_search_space_type ON documents(search_space_id, document_type); `); + // Create the chat_comment_mentions table schema in PGlite + await db.exec(` + CREATE TABLE IF NOT EXISTS chat_comment_mentions ( + id INTEGER PRIMARY KEY, + comment_id INTEGER NOT NULL, + mentioned_user_id TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS idx_chat_comment_mentions_user_id ON chat_comment_mentions(mentioned_user_id); + CREATE INDEX IF NOT EXISTS idx_chat_comment_mentions_comment_id ON chat_comment_mentions(comment_id); + `); + const electricUrl = getElectricUrl(); // STEP 4: Create the client wrapper