mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-26 21:39:43 +02:00
fix: live comment sync in shared chats
- Added a method to find existing notifications by mention ID to prevent duplicate notifications. - Updated notify_new_mention to check for existing notifications before creating a new one, ensuring idempotency. - Implemented error handling for race conditions during notification creation, allowing retrieval of existing notifications in case of duplicate key errors. - Improved documentation for clarity on the use of mention_id for idempotency.
This commit is contained in:
parent
99b8a6c970
commit
ed931bb404
2 changed files with 95 additions and 17 deletions
|
|
@ -623,6 +623,28 @@ class MentionNotificationHandler(BaseNotificationHandler):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("new_mention")
|
super().__init__("new_mention")
|
||||||
|
|
||||||
|
async def find_notification_by_mention(
|
||||||
|
self,
|
||||||
|
session: AsyncSession,
|
||||||
|
mention_id: int,
|
||||||
|
) -> Notification | None:
|
||||||
|
"""
|
||||||
|
Find an existing notification by mention ID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: Database session
|
||||||
|
mention_id: The mention ID to search for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Notification if found, None otherwise
|
||||||
|
"""
|
||||||
|
query = select(Notification).where(
|
||||||
|
Notification.type == self.notification_type,
|
||||||
|
Notification.notification_metadata["mention_id"].astext == str(mention_id),
|
||||||
|
)
|
||||||
|
result = await session.execute(query)
|
||||||
|
return result.scalar_one_or_none()
|
||||||
|
|
||||||
async def notify_new_mention(
|
async def notify_new_mention(
|
||||||
self,
|
self,
|
||||||
session: AsyncSession,
|
session: AsyncSession,
|
||||||
|
|
@ -641,11 +663,12 @@ class MentionNotificationHandler(BaseNotificationHandler):
|
||||||
) -> Notification:
|
) -> Notification:
|
||||||
"""
|
"""
|
||||||
Create notification when a user is @mentioned in a comment.
|
Create notification when a user is @mentioned in a comment.
|
||||||
|
Uses mention_id for idempotency to prevent duplicate notifications.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
session: Database session
|
session: Database session
|
||||||
mentioned_user_id: User who was mentioned
|
mentioned_user_id: User who was mentioned
|
||||||
mention_id: ID of the mention record
|
mention_id: ID of the mention record (used for idempotency)
|
||||||
comment_id: ID of the comment containing the mention
|
comment_id: ID of the comment containing the mention
|
||||||
message_id: ID of the message being commented on
|
message_id: ID of the message being commented on
|
||||||
thread_id: ID of the chat thread
|
thread_id: ID of the chat thread
|
||||||
|
|
@ -658,8 +681,16 @@ class MentionNotificationHandler(BaseNotificationHandler):
|
||||||
search_space_id: Search space ID
|
search_space_id: Search space ID
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Notification: The created notification
|
Notification: The created or existing notification
|
||||||
"""
|
"""
|
||||||
|
# Check if notification already exists for this mention (idempotency)
|
||||||
|
existing = await self.find_notification_by_mention(session, mention_id)
|
||||||
|
if existing:
|
||||||
|
logger.info(
|
||||||
|
f"Notification already exists for mention {mention_id}, returning existing"
|
||||||
|
)
|
||||||
|
return existing
|
||||||
|
|
||||||
title = f"{author_name} mentioned you"
|
title = f"{author_name} mentioned you"
|
||||||
message = content_preview[:100] + ("..." if len(content_preview) > 100 else "")
|
message = content_preview[:100] + ("..." if len(content_preview) > 100 else "")
|
||||||
|
|
||||||
|
|
@ -676,21 +707,34 @@ class MentionNotificationHandler(BaseNotificationHandler):
|
||||||
"content_preview": content_preview[:200],
|
"content_preview": content_preview[:200],
|
||||||
}
|
}
|
||||||
|
|
||||||
notification = Notification(
|
try:
|
||||||
user_id=mentioned_user_id,
|
notification = Notification(
|
||||||
search_space_id=search_space_id,
|
user_id=mentioned_user_id,
|
||||||
type=self.notification_type,
|
search_space_id=search_space_id,
|
||||||
title=title,
|
type=self.notification_type,
|
||||||
message=message,
|
title=title,
|
||||||
notification_metadata=metadata,
|
message=message,
|
||||||
)
|
notification_metadata=metadata,
|
||||||
session.add(notification)
|
)
|
||||||
await session.commit()
|
session.add(notification)
|
||||||
await session.refresh(notification)
|
await session.commit()
|
||||||
logger.info(
|
await session.refresh(notification)
|
||||||
f"Created new_mention notification {notification.id} for user {mentioned_user_id}"
|
logger.info(
|
||||||
)
|
f"Created new_mention notification {notification.id} for user {mentioned_user_id}"
|
||||||
return notification
|
)
|
||||||
|
return notification
|
||||||
|
except Exception as e:
|
||||||
|
# Handle race condition - if duplicate key error, try to fetch existing
|
||||||
|
await session.rollback()
|
||||||
|
if "duplicate key" in str(e).lower() or "unique constraint" in str(e).lower():
|
||||||
|
logger.warning(
|
||||||
|
f"Duplicate notification detected for mention {mention_id}, fetching existing"
|
||||||
|
)
|
||||||
|
existing = await self.find_notification_by_mention(session, mention_id)
|
||||||
|
if existing:
|
||||||
|
return existing
|
||||||
|
# Re-raise if not a duplicate key error or couldn't find existing
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
class NotificationService:
|
class NotificationService:
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,9 @@ import type { SyncHandle } from "@/lib/electric/client";
|
||||||
import { useElectricClient } from "@/lib/electric/context";
|
import { useElectricClient } from "@/lib/electric/context";
|
||||||
import { cacheKeys } from "@/lib/query-client/cache-keys";
|
import { cacheKeys } from "@/lib/query-client/cache-keys";
|
||||||
|
|
||||||
|
// Debounce delay for stream updates (ms)
|
||||||
|
const STREAM_UPDATE_DEBOUNCE_MS = 100;
|
||||||
|
|
||||||
// Raw comment from PGlite local database
|
// Raw comment from PGlite local database
|
||||||
interface RawCommentRow {
|
interface RawCommentRow {
|
||||||
id: number;
|
id: number;
|
||||||
|
|
@ -208,6 +211,7 @@ export function useCommentsElectric(threadId: number | null) {
|
||||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||||
const syncKeyRef = useRef<string | null>(null);
|
const syncKeyRef = useRef<string | null>(null);
|
||||||
|
const streamUpdateDebounceRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||||
|
|
||||||
// Stable callback that uses refs for fresh values
|
// Stable callback that uses refs for fresh values
|
||||||
const updateReactQueryCache = useCallback((rows: RawCommentRow[]) => {
|
const updateReactQueryCache = useCallback((rows: RawCommentRow[]) => {
|
||||||
|
|
@ -275,6 +279,30 @@ export function useCommentsElectric(threadId: number | null) {
|
||||||
|
|
||||||
// Set up live query for real-time updates
|
// Set up live query for real-time updates
|
||||||
await setupLiveQuery();
|
await setupLiveQuery();
|
||||||
|
|
||||||
|
// Subscribe to the sync stream for real-time updates from Electric SQL
|
||||||
|
// This ensures we catch updates even if PGlite live query misses them
|
||||||
|
if (handle.stream) {
|
||||||
|
const stream = handle.stream as { subscribe?: (callback: (messages: unknown[]) => void) => void };
|
||||||
|
if (typeof stream.subscribe === "function") {
|
||||||
|
stream.subscribe((messages: unknown[]) => {
|
||||||
|
if (!mounted) return;
|
||||||
|
// When Electric sync receives new data, refresh from PGlite
|
||||||
|
// This handles cases where live query might miss the update
|
||||||
|
if (messages && messages.length > 0) {
|
||||||
|
// Debounce the refresh to avoid excessive queries
|
||||||
|
if (streamUpdateDebounceRef.current) {
|
||||||
|
clearTimeout(streamUpdateDebounceRef.current);
|
||||||
|
}
|
||||||
|
streamUpdateDebounceRef.current = setTimeout(() => {
|
||||||
|
if (mounted) {
|
||||||
|
fetchAndUpdateCache();
|
||||||
|
}
|
||||||
|
}, STREAM_UPDATE_DEBOUNCE_MS);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// Sync failed - will retry on next mount
|
// Sync failed - will retry on next mount
|
||||||
}
|
}
|
||||||
|
|
@ -348,6 +376,12 @@ export function useCommentsElectric(threadId: number | null) {
|
||||||
mounted = false;
|
mounted = false;
|
||||||
syncKeyRef.current = null;
|
syncKeyRef.current = null;
|
||||||
|
|
||||||
|
// Clear debounce timeout
|
||||||
|
if (streamUpdateDebounceRef.current) {
|
||||||
|
clearTimeout(streamUpdateDebounceRef.current);
|
||||||
|
streamUpdateDebounceRef.current = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (syncHandleRef.current) {
|
if (syncHandleRef.current) {
|
||||||
syncHandleRef.current.unsubscribe();
|
syncHandleRef.current.unsubscribe();
|
||||||
syncHandleRef.current = null;
|
syncHandleRef.current = null;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue