mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-27 01:36:30 +02:00
refactor: consolidate inbox data handling in LayoutDataProvider and related components, streamlining state management and improving performance by using a single data source for inbox items
This commit is contained in:
parent
bd783cc2d0
commit
eb775fea11
5 changed files with 350 additions and 677 deletions
|
|
@ -1,497 +1,367 @@
|
|||
"use client";
|
||||
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import type { InboxItem, InboxItemTypeEnum } from "@/contracts/types/inbox.types";
|
||||
import type { InboxItem } from "@/contracts/types/inbox.types";
|
||||
import { notificationsApiService } from "@/lib/apis/notifications-api.service";
|
||||
import type { SyncHandle } from "@/lib/electric/client";
|
||||
import { useElectricClient } from "@/lib/electric/context";
|
||||
|
||||
export type { InboxItem, InboxItemTypeEnum } from "@/contracts/types/inbox.types";
|
||||
|
||||
const PAGE_SIZE = 50;
|
||||
const SYNC_WINDOW_DAYS = 14;
|
||||
const INITIAL_PAGE_SIZE = 50;
|
||||
const SCROLL_PAGE_SIZE = 30;
|
||||
const SYNC_WINDOW_DAYS = 4;
|
||||
|
||||
/**
|
||||
* Check if an item is older than the sync window
|
||||
*/
|
||||
function isOlderThanSyncWindow(createdAt: string): boolean {
|
||||
const cutoffDate = new Date();
|
||||
cutoffDate.setDate(cutoffDate.getDate() - SYNC_WINDOW_DAYS);
|
||||
return new Date(createdAt) < cutoffDate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduplicate by ID and sort by created_at descending.
|
||||
* This is the SINGLE source of truth for deduplication - prevents race conditions.
|
||||
*/
|
||||
function deduplicateAndSort(items: InboxItem[]): InboxItem[] {
|
||||
const seen = new Map<number, InboxItem>();
|
||||
for (const item of items) {
|
||||
if (!seen.has(item.id)) {
|
||||
seen.set(item.id, item);
|
||||
}
|
||||
}
|
||||
return Array.from(seen.values()).sort(
|
||||
(a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the cutoff date for sync window
|
||||
* IMPORTANT: Rounds to the start of the day (midnight UTC) to ensure stable values
|
||||
* across re-renders. Without this, millisecond differences cause multiple syncs!
|
||||
* Calculate the cutoff date for sync window.
|
||||
* Rounds to the start of the day (midnight UTC) to ensure stable values
|
||||
* across re-renders.
|
||||
*/
|
||||
function getSyncCutoffDate(): string {
|
||||
const cutoff = new Date();
|
||||
cutoff.setDate(cutoff.getDate() - SYNC_WINDOW_DAYS);
|
||||
// Round to start of day to prevent millisecond differences causing duplicate syncs
|
||||
cutoff.setUTCHours(0, 0, 0, 0);
|
||||
return cutoff.toISOString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a date value to ISO string format
|
||||
*/
|
||||
function toISOString(date: string | Date | null | undefined): string | null {
|
||||
if (!date) return null;
|
||||
if (date instanceof Date) return date.toISOString();
|
||||
if (typeof date === "string") {
|
||||
if (date.includes("T")) return date;
|
||||
try {
|
||||
return new Date(date).toISOString();
|
||||
} catch {
|
||||
return date;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook for managing inbox items with Electric SQL real-time sync + API fallback
|
||||
* Hook for managing inbox items with API-first architecture + Electric real-time deltas.
|
||||
*
|
||||
* Architecture (Simplified & Race-Condition Free):
|
||||
* - Electric SQL: Syncs recent items (within SYNC_WINDOW_DAYS) for real-time updates
|
||||
* - Live Query: Provides reactive first page from PGLite
|
||||
* - API: Handles all pagination (more reliable than mixing with Electric)
|
||||
* Architecture (Documents pattern):
|
||||
* 1. API is the PRIMARY data source — fetches first page on mount
|
||||
* 2. Electric provides REAL-TIME updates (new items, status changes, read state)
|
||||
* 3. Baseline pattern prevents duplicates between API and Electric
|
||||
* 4. Single instance serves both Comments and Status tabs
|
||||
*
|
||||
* Key Design Decisions:
|
||||
* 1. No mutable refs for cursor - cursor computed from current state
|
||||
* 2. Single deduplicateAndSort function - prevents inconsistencies
|
||||
* 3. Filter-based preservation in live query - prevents data loss
|
||||
* 4. Auto-fetch from API when Electric returns 0 items
|
||||
* Unread count strategy:
|
||||
* - API provides the total on mount (ground truth across all time)
|
||||
* - Electric live query counts unread within SYNC_WINDOW_DAYS
|
||||
* - olderUnreadOffsetRef bridges the gap: total = offset + recent
|
||||
* - Optimistic updates adjust both the count and the offset (for old items)
|
||||
*
|
||||
* @param userId - The user ID to fetch inbox items for
|
||||
* @param searchSpaceId - The search space ID to filter inbox items
|
||||
* @param typeFilter - Optional inbox item type to filter by
|
||||
*/
|
||||
export function useInbox(
|
||||
userId: string | null,
|
||||
searchSpaceId: number | null,
|
||||
typeFilter: InboxItemTypeEnum | null = null
|
||||
) {
|
||||
const electricClient = useElectricClient();
|
||||
|
||||
const [inboxItems, setInboxItems] = useState<InboxItem[]>([]);
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [loadingMore, setLoadingMore] = useState(false);
|
||||
const [hasMore, setHasMore] = useState(true);
|
||||
const [hasMore, setHasMore] = useState(false);
|
||||
const [error, setError] = useState<Error | null>(null);
|
||||
const [unreadCount, setUnreadCount] = useState(0);
|
||||
|
||||
// Split unread count tracking for accurate counts with 14-day sync window
|
||||
// olderUnreadCount = unread items OLDER than sync window (from server, static until reconciliation)
|
||||
// recentUnreadCount = unread items within sync window (from live query, real-time)
|
||||
const [olderUnreadCount, setOlderUnreadCount] = useState(0);
|
||||
const [recentUnreadCount, setRecentUnreadCount] = useState(0);
|
||||
|
||||
const initialLoadDoneRef = useRef(false);
|
||||
const electricBaselineIdsRef = useRef<Set<number> | null>(null);
|
||||
const syncHandleRef = useRef<SyncHandle | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
const userSyncKeyRef = useRef<string | null>(null);
|
||||
const unreadCountLiveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
|
||||
const liveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null);
|
||||
const unreadLiveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null);
|
||||
|
||||
// Total unread = older (static from server) + recent (live from Electric)
|
||||
const totalUnreadCount = olderUnreadCount + recentUnreadCount;
|
||||
// Unread count offset: number of unread items OLDER than the sync window.
|
||||
// Computed once from (API total - first Electric recent count), then adjusted
|
||||
// when the user marks old items as read.
|
||||
const olderUnreadOffsetRef = useRef<number | null>(null);
|
||||
const apiUnreadTotalRef = useRef(0);
|
||||
|
||||
// EFFECT 1: Electric SQL sync for real-time updates
|
||||
// EFFECT 1: Fetch first page + unread count from API when params change
|
||||
useEffect(() => {
|
||||
if (!userId || !electricClient) {
|
||||
setLoading(!electricClient);
|
||||
return;
|
||||
}
|
||||
if (!userId || !searchSpaceId) return;
|
||||
|
||||
let cancelled = false;
|
||||
|
||||
setLoading(true);
|
||||
setInboxItems([]);
|
||||
setHasMore(false);
|
||||
initialLoadDoneRef.current = false;
|
||||
electricBaselineIdsRef.current = null;
|
||||
olderUnreadOffsetRef.current = null;
|
||||
apiUnreadTotalRef.current = 0;
|
||||
|
||||
const fetchInitialData = async () => {
|
||||
try {
|
||||
const [notificationsResponse, unreadResponse] = await Promise.all([
|
||||
notificationsApiService.getNotifications({
|
||||
queryParams: {
|
||||
search_space_id: searchSpaceId,
|
||||
limit: INITIAL_PAGE_SIZE,
|
||||
},
|
||||
}),
|
||||
notificationsApiService.getUnreadCount(searchSpaceId),
|
||||
]);
|
||||
|
||||
if (cancelled) return;
|
||||
|
||||
setInboxItems(notificationsResponse.items);
|
||||
setHasMore(notificationsResponse.has_more);
|
||||
setUnreadCount(unreadResponse.total_unread);
|
||||
apiUnreadTotalRef.current = unreadResponse.total_unread;
|
||||
setError(null);
|
||||
initialLoadDoneRef.current = true;
|
||||
} catch (err) {
|
||||
if (cancelled) return;
|
||||
console.error("[useInbox] Initial load failed:", err);
|
||||
setError(err instanceof Error ? err : new Error("Failed to load notifications"));
|
||||
} finally {
|
||||
if (!cancelled) setLoading(false);
|
||||
}
|
||||
};
|
||||
|
||||
fetchInitialData();
|
||||
return () => { cancelled = true; };
|
||||
}, [userId, searchSpaceId]);
|
||||
|
||||
// EFFECT 2: Electric sync + live query for real-time updates
|
||||
useEffect(() => {
|
||||
if (!userId || !searchSpaceId || !electricClient) return;
|
||||
|
||||
const uid = userId;
|
||||
const spaceId = searchSpaceId;
|
||||
const client = electricClient;
|
||||
let mounted = true;
|
||||
|
||||
async function startSync() {
|
||||
async function setupElectricRealtime() {
|
||||
if (syncHandleRef.current) {
|
||||
try { syncHandleRef.current.unsubscribe(); } catch { /* PGlite may be closed */ }
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
if (liveQueryRef.current) {
|
||||
try { liveQueryRef.current.unsubscribe?.(); } catch { /* PGlite may be closed */ }
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
if (unreadLiveQueryRef.current) {
|
||||
try { unreadLiveQueryRef.current.unsubscribe?.(); } catch { /* PGlite may be closed */ }
|
||||
unreadLiveQueryRef.current = null;
|
||||
}
|
||||
|
||||
try {
|
||||
const cutoffDate = getSyncCutoffDate();
|
||||
const userSyncKey = `inbox_${userId}_${cutoffDate}`;
|
||||
|
||||
// Skip if already syncing with this key
|
||||
if (userSyncKeyRef.current === userSyncKey) return;
|
||||
|
||||
// Clean up previous sync
|
||||
if (syncHandleRef.current) {
|
||||
try {
|
||||
syncHandleRef.current.unsubscribe();
|
||||
} catch {
|
||||
// PGlite may already be closed during cleanup
|
||||
}
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
|
||||
console.log("[useInbox] Starting sync for:", userId);
|
||||
userSyncKeyRef.current = userSyncKey;
|
||||
|
||||
const handle = await client.syncShape({
|
||||
table: "notifications",
|
||||
where: `user_id = '${userId}' AND created_at > '${cutoffDate}'`,
|
||||
where: `user_id = '${uid}' AND created_at > '${cutoffDate}'`,
|
||||
primaryKey: ["id"],
|
||||
});
|
||||
|
||||
// Wait for initial sync with timeout
|
||||
if (!handle.isUpToDate && handle.initialSyncPromise) {
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise((resolve) => setTimeout(resolve, 3000)),
|
||||
]);
|
||||
}
|
||||
|
||||
if (!mounted) {
|
||||
handle.unsubscribe();
|
||||
return;
|
||||
}
|
||||
|
||||
syncHandleRef.current = handle;
|
||||
setLoading(false);
|
||||
setError(null);
|
||||
} catch (err) {
|
||||
|
||||
if (!handle.isUpToDate && handle.initialSyncPromise) {
|
||||
await Promise.race([
|
||||
handle.initialSyncPromise,
|
||||
new Promise((resolve) => setTimeout(resolve, 5000)),
|
||||
]);
|
||||
}
|
||||
|
||||
if (!mounted) return;
|
||||
console.error("[useInbox] Sync failed:", err);
|
||||
setError(err instanceof Error ? err : new Error("Sync failed"));
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
|
||||
startSync();
|
||||
const db = client.db as {
|
||||
live?: {
|
||||
query: <T>(
|
||||
sql: string,
|
||||
params?: (number | string)[]
|
||||
) => Promise<{
|
||||
subscribe: (cb: (result: { rows: T[] }) => void) => void;
|
||||
unsubscribe?: () => void;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
|
||||
return () => {
|
||||
mounted = false;
|
||||
userSyncKeyRef.current = null;
|
||||
if (syncHandleRef.current) {
|
||||
try {
|
||||
syncHandleRef.current.unsubscribe();
|
||||
} catch {
|
||||
// PGlite may already be closed during cleanup
|
||||
}
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [userId, electricClient]);
|
||||
if (!db.live?.query) return;
|
||||
|
||||
// Reset when filters change
|
||||
useEffect(() => {
|
||||
setHasMore(true);
|
||||
setInboxItems([]);
|
||||
// Reset count states - will be refetched by the unread count effect
|
||||
setOlderUnreadCount(0);
|
||||
setRecentUnreadCount(0);
|
||||
}, [userId, searchSpaceId, typeFilter]);
|
||||
|
||||
// EFFECT 2: Live query for real-time updates + auto-fetch from API if empty
|
||||
useEffect(() => {
|
||||
if (!userId || !electricClient) return;
|
||||
|
||||
const client = electricClient;
|
||||
let mounted = true;
|
||||
|
||||
async function setupLiveQuery() {
|
||||
// Clean up previous live query
|
||||
if (liveQueryRef.current) {
|
||||
try {
|
||||
liveQueryRef.current.unsubscribe();
|
||||
} catch {
|
||||
// PGlite may already be closed during cleanup
|
||||
}
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
|
||||
try {
|
||||
const cutoff = getSyncCutoffDate();
|
||||
|
||||
const query = `SELECT * FROM notifications
|
||||
const itemsQuery = `SELECT * FROM notifications
|
||||
WHERE user_id = $1
|
||||
AND (search_space_id = $2 OR search_space_id IS NULL)
|
||||
AND created_at > '${cutoff}'
|
||||
${typeFilter ? "AND type = $3" : ""}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ${PAGE_SIZE}`;
|
||||
AND created_at > '${cutoffDate}'
|
||||
ORDER BY created_at DESC`;
|
||||
|
||||
const params = typeFilter ? [userId, searchSpaceId, typeFilter] : [userId, searchSpaceId];
|
||||
const liveQuery = await db.live.query<InboxItem>(itemsQuery, [uid, spaceId]);
|
||||
|
||||
const db = client.db as any;
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
// Initial fetch from PGLite - no validation needed, schema is enforced by Electric SQL sync
|
||||
const result = await client.db.query<InboxItem>(query, params);
|
||||
liveQuery.subscribe((result: { rows: InboxItem[] }) => {
|
||||
if (!mounted || !result.rows || !initialLoadDoneRef.current) return;
|
||||
|
||||
if (mounted && result.rows) {
|
||||
const items = deduplicateAndSort(result.rows);
|
||||
setInboxItems(items);
|
||||
const validItems = result.rows.filter((item) => item.id != null && item.title != null);
|
||||
const isFullySynced = syncHandleRef.current?.isUpToDate ?? false;
|
||||
const cutoff = new Date(getSyncCutoffDate());
|
||||
|
||||
// AUTO-FETCH: If Electric returned 0 items, check API for older items
|
||||
// This handles the edge case where user has no recent notifications
|
||||
// but has older ones outside the sync window
|
||||
if (items.length === 0) {
|
||||
console.log(
|
||||
"[useInbox] Electric returned 0 items, checking API for older notifications"
|
||||
);
|
||||
try {
|
||||
// Use the API service with proper Zod validation for API responses
|
||||
const data = await notificationsApiService.getNotifications({
|
||||
queryParams: {
|
||||
search_space_id: searchSpaceId ?? undefined,
|
||||
type: typeFilter ?? undefined,
|
||||
limit: PAGE_SIZE,
|
||||
},
|
||||
// Build a Map for O(1) lookups instead of .find() inside .map()
|
||||
const liveItemMap = new Map(validItems.map((d) => [d.id, d]));
|
||||
const liveIds = new Set(liveItemMap.keys());
|
||||
|
||||
setInboxItems((prev) => {
|
||||
const prevIds = new Set(prev.map((d) => d.id));
|
||||
|
||||
if (electricBaselineIdsRef.current === null) {
|
||||
electricBaselineIdsRef.current = new Set(liveIds);
|
||||
}
|
||||
|
||||
const baseline = electricBaselineIdsRef.current;
|
||||
const newItems = validItems
|
||||
.filter((item) => {
|
||||
if (prevIds.has(item.id)) return false;
|
||||
if (baseline.has(item.id)) return false;
|
||||
return true;
|
||||
});
|
||||
|
||||
if (mounted) {
|
||||
if (data.items.length > 0) {
|
||||
setInboxItems(data.items);
|
||||
}
|
||||
setHasMore(data.has_more);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("[useInbox] API fallback failed:", err);
|
||||
for (const item of newItems) {
|
||||
baseline.add(item.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set up live query for real-time updates
|
||||
if (db.live?.query) {
|
||||
const liveQuery = await db.live.query(query, params);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
if (liveQuery.subscribe) {
|
||||
// Live query data comes from PGlite - no validation needed
|
||||
liveQuery.subscribe((result: { rows: InboxItem[] }) => {
|
||||
if (mounted && result.rows) {
|
||||
const liveItems = result.rows;
|
||||
|
||||
setInboxItems((prev) => {
|
||||
const liveItemIds = new Set(liveItems.map((item) => item.id));
|
||||
|
||||
// FIXED: Keep ALL items not in live result (not just slice)
|
||||
// This prevents data loss when new notifications push items
|
||||
// out of the LIMIT window
|
||||
const itemsToKeep = prev.filter((item) => !liveItemIds.has(item.id));
|
||||
|
||||
return deduplicateAndSort([...liveItems, ...itemsToKeep]);
|
||||
});
|
||||
}
|
||||
let updated = prev.map((item) => {
|
||||
const liveItem = liveItemMap.get(item.id);
|
||||
if (liveItem) return liveItem;
|
||||
return item;
|
||||
});
|
||||
}
|
||||
|
||||
if (liveQuery.unsubscribe) {
|
||||
liveQueryRef.current = liveQuery;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("[useInbox] Live query error:", err);
|
||||
}
|
||||
}
|
||||
if (isFullySynced) {
|
||||
updated = updated.filter((item) => {
|
||||
if (new Date(item.created_at) < cutoff) return true;
|
||||
return liveIds.has(item.id);
|
||||
});
|
||||
}
|
||||
|
||||
setupLiveQuery();
|
||||
if (newItems.length > 0) {
|
||||
return [...newItems, ...updated];
|
||||
}
|
||||
|
||||
return () => {
|
||||
mounted = false;
|
||||
if (liveQueryRef.current) {
|
||||
try {
|
||||
liveQueryRef.current.unsubscribe();
|
||||
} catch {
|
||||
// PGlite may already be closed during cleanup
|
||||
}
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [userId, searchSpaceId, typeFilter, electricClient]);
|
||||
return updated;
|
||||
});
|
||||
});
|
||||
|
||||
// EFFECT 3: Dedicated unread count sync with split tracking
|
||||
// - Fetches server count on mount (accurate total)
|
||||
// - Sets up live query for recent count (real-time updates)
|
||||
// - Handles items older than sync window separately
|
||||
useEffect(() => {
|
||||
if (!userId || !electricClient) return;
|
||||
liveQueryRef.current = liveQuery;
|
||||
|
||||
const client = electricClient;
|
||||
let mounted = true;
|
||||
|
||||
async function setupUnreadCountSync() {
|
||||
// Cleanup previous live query
|
||||
if (unreadCountLiveQueryRef.current) {
|
||||
unreadCountLiveQueryRef.current.unsubscribe();
|
||||
unreadCountLiveQueryRef.current = null;
|
||||
}
|
||||
|
||||
try {
|
||||
// STEP 1: Fetch server counts (total and recent) - guaranteed accurate
|
||||
console.log(
|
||||
"[useInbox] Fetching unread count from server",
|
||||
typeFilter ? `for type: ${typeFilter}` : "for all types"
|
||||
);
|
||||
const serverCounts = await notificationsApiService.getUnreadCount(
|
||||
searchSpaceId ?? undefined,
|
||||
typeFilter ?? undefined
|
||||
);
|
||||
|
||||
if (mounted) {
|
||||
// Calculate older count = total - recent
|
||||
const olderCount = serverCounts.total_unread - serverCounts.recent_unread;
|
||||
setOlderUnreadCount(olderCount);
|
||||
setRecentUnreadCount(serverCounts.recent_unread);
|
||||
console.log(
|
||||
`[useInbox] Server counts: total=${serverCounts.total_unread}, recent=${serverCounts.recent_unread}, older=${olderCount}`
|
||||
);
|
||||
}
|
||||
|
||||
// STEP 2: Set up PGLite live query for RECENT unread count only
|
||||
// This provides real-time updates for notifications within sync window
|
||||
const db = client.db as any;
|
||||
const cutoff = getSyncCutoffDate();
|
||||
|
||||
// Count query - NO LIMIT, counts all unread in synced window
|
||||
const countQuery = `
|
||||
SELECT COUNT(*) as count FROM notifications
|
||||
// Unread count live query — only covers the sync window.
|
||||
// Combined with olderUnreadOffsetRef to produce the full count.
|
||||
const countQuery = `SELECT COUNT(*) as count FROM notifications
|
||||
WHERE user_id = $1
|
||||
AND (search_space_id = $2 OR search_space_id IS NULL)
|
||||
AND created_at > '${cutoff}'
|
||||
AND read = false
|
||||
${typeFilter ? "AND type = $3" : ""}
|
||||
`;
|
||||
const params = typeFilter ? [userId, searchSpaceId, typeFilter] : [userId, searchSpaceId];
|
||||
AND created_at > '${cutoffDate}'
|
||||
AND read = false`;
|
||||
|
||||
if (db.live?.query) {
|
||||
const liveQuery = await db.live.query(countQuery, params);
|
||||
const countLiveQuery = await db.live.query<{ count: number | string }>(countQuery, [uid, spaceId]);
|
||||
|
||||
if (!mounted) {
|
||||
liveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
if (liveQuery.subscribe) {
|
||||
liveQuery.subscribe((result: { rows: Array<{ count: number | string }> }) => {
|
||||
if (mounted && result.rows?.[0]) {
|
||||
const liveCount = Number(result.rows[0].count) || 0;
|
||||
// Update recent count from live query
|
||||
// This fires in real-time when Electric syncs new/updated notifications
|
||||
setRecentUnreadCount(liveCount);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (liveQuery.unsubscribe) {
|
||||
unreadCountLiveQueryRef.current = liveQuery;
|
||||
}
|
||||
if (!mounted) {
|
||||
countLiveQuery.unsubscribe?.();
|
||||
return;
|
||||
}
|
||||
|
||||
countLiveQuery.subscribe((result: { rows: Array<{ count: number | string }> }) => {
|
||||
if (!mounted || !result.rows?.[0] || !initialLoadDoneRef.current) return;
|
||||
const liveRecentUnread = Number(result.rows[0].count) || 0;
|
||||
|
||||
// First callback: compute how many unread are outside the sync window
|
||||
if (olderUnreadOffsetRef.current === null) {
|
||||
olderUnreadOffsetRef.current = Math.max(
|
||||
0,
|
||||
apiUnreadTotalRef.current - liveRecentUnread
|
||||
);
|
||||
}
|
||||
|
||||
setUnreadCount(olderUnreadOffsetRef.current + liveRecentUnread);
|
||||
});
|
||||
|
||||
unreadLiveQueryRef.current = countLiveQuery;
|
||||
} catch (err) {
|
||||
console.error("[useInbox] Unread count sync error:", err);
|
||||
// On error, counts will remain at 0 or previous values
|
||||
// The items-based count will be the fallback
|
||||
console.error("[useInbox] Electric setup failed:", err);
|
||||
}
|
||||
}
|
||||
|
||||
setupUnreadCountSync();
|
||||
setupElectricRealtime();
|
||||
|
||||
return () => {
|
||||
mounted = false;
|
||||
if (unreadCountLiveQueryRef.current) {
|
||||
unreadCountLiveQueryRef.current.unsubscribe();
|
||||
unreadCountLiveQueryRef.current = null;
|
||||
if (syncHandleRef.current) {
|
||||
try { syncHandleRef.current.unsubscribe(); } catch { /* PGlite may be closed */ }
|
||||
syncHandleRef.current = null;
|
||||
}
|
||||
if (liveQueryRef.current) {
|
||||
try { liveQueryRef.current.unsubscribe?.(); } catch { /* PGlite may be closed */ }
|
||||
liveQueryRef.current = null;
|
||||
}
|
||||
if (unreadLiveQueryRef.current) {
|
||||
try { unreadLiveQueryRef.current.unsubscribe?.(); } catch { /* PGlite may be closed */ }
|
||||
unreadLiveQueryRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [userId, searchSpaceId, typeFilter, electricClient]);
|
||||
}, [userId, searchSpaceId, electricClient]);
|
||||
|
||||
// loadMore - Pure cursor-based pagination, no race conditions
|
||||
// Cursor is computed from current state, not stored in refs
|
||||
// Load more pages via API (cursor-based using before_date)
|
||||
const loadMore = useCallback(async () => {
|
||||
// Removed inboxItems.length === 0 check to allow loading older items
|
||||
// when Electric returns 0 items
|
||||
if (!userId || loadingMore || !hasMore) return;
|
||||
if (loadingMore || !hasMore || !userId || !searchSpaceId) return;
|
||||
|
||||
setLoadingMore(true);
|
||||
|
||||
try {
|
||||
// Cursor is computed from current state - no stale refs possible
|
||||
const oldestItem = inboxItems.length > 0 ? inboxItems[inboxItems.length - 1] : null;
|
||||
const beforeDate = oldestItem ? toISOString(oldestItem.created_at) : null;
|
||||
const beforeDate = oldestItem?.created_at ?? undefined;
|
||||
|
||||
console.log("[useInbox] Loading more, before:", beforeDate ?? "none (initial)");
|
||||
|
||||
// Use the API service with proper Zod validation
|
||||
const data = await notificationsApiService.getNotifications({
|
||||
const response = await notificationsApiService.getNotifications({
|
||||
queryParams: {
|
||||
search_space_id: searchSpaceId ?? undefined,
|
||||
type: typeFilter ?? undefined,
|
||||
before_date: beforeDate ?? undefined,
|
||||
limit: PAGE_SIZE,
|
||||
search_space_id: searchSpaceId,
|
||||
before_date: beforeDate,
|
||||
limit: SCROLL_PAGE_SIZE,
|
||||
},
|
||||
});
|
||||
|
||||
if (data.items.length > 0) {
|
||||
// Functional update ensures we always merge with latest state
|
||||
// Items are already validated by the API service
|
||||
setInboxItems((prev) => deduplicateAndSort([...prev, ...data.items]));
|
||||
}
|
||||
const newItems = response.items;
|
||||
|
||||
// Use API's has_more flag
|
||||
setHasMore(data.has_more);
|
||||
setInboxItems((prev) => {
|
||||
const existingIds = new Set(prev.map((d) => d.id));
|
||||
const deduped = newItems.filter((d) => !existingIds.has(d.id));
|
||||
return [...prev, ...deduped];
|
||||
});
|
||||
setHasMore(response.has_more);
|
||||
} catch (err) {
|
||||
console.error("[useInbox] Load more failed:", err);
|
||||
} finally {
|
||||
setLoadingMore(false);
|
||||
}
|
||||
}, [userId, searchSpaceId, typeFilter, loadingMore, hasMore, inboxItems]);
|
||||
}, [loadingMore, hasMore, userId, searchSpaceId, inboxItems]);
|
||||
|
||||
// Mark inbox item as read with optimistic update
|
||||
// Handles both recent items (live query updates count) and older items (manual count decrement)
|
||||
// Mark single item as read with optimistic update
|
||||
const markAsRead = useCallback(
|
||||
async (itemId: number) => {
|
||||
// Find the item to check if it's older than sync window
|
||||
const item = inboxItems.find((i) => i.id === itemId);
|
||||
const isOlderItem = item && !item.read && isOlderThanSyncWindow(item.created_at);
|
||||
if (!item || item.read) return true;
|
||||
|
||||
const cutoff = new Date(getSyncCutoffDate());
|
||||
const isOlderItem = new Date(item.created_at) < cutoff;
|
||||
|
||||
// Optimistic update: mark as read immediately for instant UI feedback
|
||||
setInboxItems((prev) => prev.map((i) => (i.id === itemId ? { ...i, read: true } : i)));
|
||||
setUnreadCount((prev) => Math.max(0, prev - 1));
|
||||
|
||||
// If older item, manually decrement older count
|
||||
// (live query won't see items outside sync window)
|
||||
if (isOlderItem) {
|
||||
setOlderUnreadCount((prev) => Math.max(0, prev - 1));
|
||||
// Adjust older offset so the next live query callback stays consistent
|
||||
if (isOlderItem && olderUnreadOffsetRef.current !== null) {
|
||||
olderUnreadOffsetRef.current = Math.max(0, olderUnreadOffsetRef.current - 1);
|
||||
}
|
||||
|
||||
try {
|
||||
// Use the API service with proper Zod validation
|
||||
const result = await notificationsApiService.markAsRead({ notificationId: itemId });
|
||||
|
||||
if (!result.success) {
|
||||
// Rollback on error
|
||||
setInboxItems((prev) => prev.map((i) => (i.id === itemId ? { ...i, read: false } : i)));
|
||||
if (isOlderItem) {
|
||||
setOlderUnreadCount((prev) => prev + 1);
|
||||
setUnreadCount((prev) => prev + 1);
|
||||
if (isOlderItem && olderUnreadOffsetRef.current !== null) {
|
||||
olderUnreadOffsetRef.current += 1;
|
||||
}
|
||||
}
|
||||
// If successful, Electric SQL will sync the change and live query will update
|
||||
// This ensures eventual consistency even if optimistic update was wrong
|
||||
return result.success;
|
||||
} catch (err) {
|
||||
console.error("Failed to mark as read:", err);
|
||||
// Rollback on error
|
||||
setInboxItems((prev) => prev.map((i) => (i.id === itemId ? { ...i, read: false } : i)));
|
||||
if (isOlderItem) {
|
||||
setOlderUnreadCount((prev) => prev + 1);
|
||||
setUnreadCount((prev) => prev + 1);
|
||||
if (isOlderItem && olderUnreadOffsetRef.current !== null) {
|
||||
olderUnreadOffsetRef.current += 1;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
@ -499,49 +369,39 @@ export function useInbox(
|
|||
[inboxItems]
|
||||
);
|
||||
|
||||
// Mark all inbox items as read with optimistic update
|
||||
// Resets both older and recent counts to 0
|
||||
// Mark all as read with optimistic update
|
||||
const markAllAsRead = useCallback(async () => {
|
||||
// Store previous counts for potential rollback
|
||||
const prevOlderCount = olderUnreadCount;
|
||||
const prevRecentCount = recentUnreadCount;
|
||||
const prevCount = unreadCount;
|
||||
const prevOffset = olderUnreadOffsetRef.current;
|
||||
|
||||
// Optimistic update: mark all as read immediately for instant UI feedback
|
||||
setInboxItems((prev) => prev.map((item) => ({ ...item, read: true })));
|
||||
setOlderUnreadCount(0);
|
||||
setRecentUnreadCount(0);
|
||||
setUnreadCount(0);
|
||||
olderUnreadOffsetRef.current = 0;
|
||||
|
||||
try {
|
||||
// Use the API service with proper Zod validation
|
||||
const result = await notificationsApiService.markAllAsRead();
|
||||
|
||||
if (!result.success) {
|
||||
console.error("Failed to mark all as read");
|
||||
// Rollback counts on error
|
||||
setOlderUnreadCount(prevOlderCount);
|
||||
setRecentUnreadCount(prevRecentCount);
|
||||
setUnreadCount(prevCount);
|
||||
olderUnreadOffsetRef.current = prevOffset;
|
||||
}
|
||||
// Electric SQL will sync and live query will ensure consistency
|
||||
return result.success;
|
||||
} catch (err) {
|
||||
console.error("Failed to mark all as read:", err);
|
||||
// Rollback counts on error
|
||||
setOlderUnreadCount(prevOlderCount);
|
||||
setRecentUnreadCount(prevRecentCount);
|
||||
setUnreadCount(prevCount);
|
||||
olderUnreadOffsetRef.current = prevOffset;
|
||||
return false;
|
||||
}
|
||||
}, [olderUnreadCount, recentUnreadCount]);
|
||||
}, [unreadCount]);
|
||||
|
||||
return {
|
||||
inboxItems,
|
||||
unreadCount: totalUnreadCount,
|
||||
unreadCount,
|
||||
markAsRead,
|
||||
markAllAsRead,
|
||||
loading,
|
||||
loadingMore,
|
||||
hasMore,
|
||||
loadMore,
|
||||
isUsingApiFallback: true, // Always use API for pagination
|
||||
error,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue