diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 5f7f568f6..28807e783 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -623,6 +623,28 @@ class MentionNotificationHandler(BaseNotificationHandler): def __init__(self): super().__init__("new_mention") + async def find_notification_by_mention( + self, + session: AsyncSession, + mention_id: int, + ) -> Notification | None: + """ + Find an existing notification by mention ID. + + Args: + session: Database session + mention_id: The mention ID to search for + + Returns: + Notification if found, None otherwise + """ + query = select(Notification).where( + Notification.type == self.notification_type, + Notification.notification_metadata["mention_id"].astext == str(mention_id), + ) + result = await session.execute(query) + return result.scalar_one_or_none() + async def notify_new_mention( self, session: AsyncSession, @@ -641,11 +663,12 @@ class MentionNotificationHandler(BaseNotificationHandler): ) -> Notification: """ Create notification when a user is @mentioned in a comment. + Uses mention_id for idempotency to prevent duplicate notifications. Args: session: Database session mentioned_user_id: User who was mentioned - mention_id: ID of the mention record + mention_id: ID of the mention record (used for idempotency) comment_id: ID of the comment containing the mention message_id: ID of the message being commented on thread_id: ID of the chat thread @@ -658,8 +681,16 @@ class MentionNotificationHandler(BaseNotificationHandler): search_space_id: Search space ID Returns: - Notification: The created notification + Notification: The created or existing notification """ + # Check if notification already exists for this mention (idempotency) + existing = await self.find_notification_by_mention(session, mention_id) + if existing: + logger.info( + f"Notification already exists for mention {mention_id}, returning existing" + ) + return existing + title = f"{author_name} mentioned you" message = content_preview[:100] + ("..." if len(content_preview) > 100 else "") @@ -676,21 +707,34 @@ class MentionNotificationHandler(BaseNotificationHandler): "content_preview": content_preview[:200], } - notification = Notification( - user_id=mentioned_user_id, - search_space_id=search_space_id, - type=self.notification_type, - title=title, - message=message, - notification_metadata=metadata, - ) - session.add(notification) - await session.commit() - await session.refresh(notification) - logger.info( - f"Created new_mention notification {notification.id} for user {mentioned_user_id}" - ) - return notification + try: + notification = Notification( + user_id=mentioned_user_id, + search_space_id=search_space_id, + type=self.notification_type, + title=title, + message=message, + notification_metadata=metadata, + ) + session.add(notification) + await session.commit() + await session.refresh(notification) + logger.info( + f"Created new_mention notification {notification.id} for user {mentioned_user_id}" + ) + return notification + except Exception as e: + # Handle race condition - if duplicate key error, try to fetch existing + await session.rollback() + if "duplicate key" in str(e).lower() or "unique constraint" in str(e).lower(): + logger.warning( + f"Duplicate notification detected for mention {mention_id}, fetching existing" + ) + existing = await self.find_notification_by_mention(session, mention_id) + if existing: + return existing + # Re-raise if not a duplicate key error or couldn't find existing + raise class NotificationService: diff --git a/surfsense_web/hooks/use-comments-electric.ts b/surfsense_web/hooks/use-comments-electric.ts index 83a019ef3..e4eb27d4a 100644 --- a/surfsense_web/hooks/use-comments-electric.ts +++ b/surfsense_web/hooks/use-comments-electric.ts @@ -15,6 +15,9 @@ import type { SyncHandle } from "@/lib/electric/client"; import { useElectricClient } from "@/lib/electric/context"; import { cacheKeys } from "@/lib/query-client/cache-keys"; +// Debounce delay for stream updates (ms) +const STREAM_UPDATE_DEBOUNCE_MS = 100; + // Raw comment from PGlite local database interface RawCommentRow { id: number; @@ -208,6 +211,7 @@ export function useCommentsElectric(threadId: number | null) { const syncHandleRef = useRef(null); const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null); const syncKeyRef = useRef(null); + const streamUpdateDebounceRef = useRef | null>(null); // Stable callback that uses refs for fresh values const updateReactQueryCache = useCallback((rows: RawCommentRow[]) => { @@ -275,6 +279,30 @@ export function useCommentsElectric(threadId: number | null) { // Set up live query for real-time updates await setupLiveQuery(); + + // Subscribe to the sync stream for real-time updates from Electric SQL + // This ensures we catch updates even if PGlite live query misses them + if (handle.stream) { + const stream = handle.stream as { subscribe?: (callback: (messages: unknown[]) => void) => void }; + if (typeof stream.subscribe === "function") { + stream.subscribe((messages: unknown[]) => { + if (!mounted) return; + // When Electric sync receives new data, refresh from PGlite + // This handles cases where live query might miss the update + if (messages && messages.length > 0) { + // Debounce the refresh to avoid excessive queries + if (streamUpdateDebounceRef.current) { + clearTimeout(streamUpdateDebounceRef.current); + } + streamUpdateDebounceRef.current = setTimeout(() => { + if (mounted) { + fetchAndUpdateCache(); + } + }, STREAM_UPDATE_DEBOUNCE_MS); + } + }); + } + } } catch { // Sync failed - will retry on next mount } @@ -348,6 +376,12 @@ export function useCommentsElectric(threadId: number | null) { mounted = false; syncKeyRef.current = null; + // Clear debounce timeout + if (streamUpdateDebounceRef.current) { + clearTimeout(streamUpdateDebounceRef.current); + streamUpdateDebounceRef.current = null; + } + if (syncHandleRef.current) { syncHandleRef.current.unsubscribe(); syncHandleRef.current = null;