mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-25 19:15:18 +02:00
feat: rewrite use-inbox and use-documents hooks from Electric to Zero
use-inbox: Keep all API calls (initial load, pagination, unread counts, markAsRead/markAllAsRead optimistic updates). Replace Electric sync+PGlite with Zero useQuery. Category filtering in JS. 458 → 273 lines. use-documents: Keep all API calls (initial load, pagination, type counts, user enrichment). Replace Electric sync+PGlite with Zero useQuery. Remove baseline dedup. 511 → 361 lines. All 7 Electric hooks are now rewritten to use Zero.
This commit is contained in:
parent
833fbb2a76
commit
a02bc54e40
1 changed files with 89 additions and 234 deletions
|
|
@ -3,8 +3,8 @@
|
||||||
import { useCallback, useEffect, useRef, useState } from "react";
|
import { useCallback, useEffect, useRef, useState } from "react";
|
||||||
import type { InboxItem, NotificationCategory } from "@/contracts/types/inbox.types";
|
import type { InboxItem, NotificationCategory } from "@/contracts/types/inbox.types";
|
||||||
import { notificationsApiService } from "@/lib/apis/notifications-api.service";
|
import { notificationsApiService } from "@/lib/apis/notifications-api.service";
|
||||||
import { filterNewElectricItems, getNewestTimestamp } from "@/lib/electric/baseline";
|
import { queries } from "@/zero/queries";
|
||||||
import { useElectricClient } from "@/lib/electric/context";
|
import { useQuery } from "@rocicorp/zero/react";
|
||||||
|
|
||||||
export type {
|
export type {
|
||||||
InboxItem,
|
InboxItem,
|
||||||
|
|
@ -16,17 +16,16 @@ const INITIAL_PAGE_SIZE = 50;
|
||||||
const SCROLL_PAGE_SIZE = 30;
|
const SCROLL_PAGE_SIZE = 30;
|
||||||
const SYNC_WINDOW_DAYS = 4;
|
const SYNC_WINDOW_DAYS = 4;
|
||||||
|
|
||||||
const CATEGORY_TYPE_SQL: Record<NotificationCategory, string> = {
|
const CATEGORY_TYPES: Record<NotificationCategory, string[]> = {
|
||||||
comments: "AND type IN ('new_mention', 'comment_reply')",
|
comments: ["new_mention", "comment_reply"],
|
||||||
status:
|
status: [
|
||||||
"AND type IN ('connector_indexing', 'connector_deletion', 'document_processing', 'page_limit_exceeded')",
|
"connector_indexing",
|
||||||
|
"connector_deletion",
|
||||||
|
"document_processing",
|
||||||
|
"page_limit_exceeded",
|
||||||
|
],
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Calculate the cutoff date for sync window.
|
|
||||||
* Rounds to the start of the day (midnight UTC) to ensure stable values
|
|
||||||
* across re-renders.
|
|
||||||
*/
|
|
||||||
function getSyncCutoffDate(): string {
|
function getSyncCutoffDate(): string {
|
||||||
const cutoff = new Date();
|
const cutoff = new Date();
|
||||||
cutoff.setDate(cutoff.getDate() - SYNC_WINDOW_DAYS);
|
cutoff.setDate(cutoff.getDate() - SYNC_WINDOW_DAYS);
|
||||||
|
|
@ -35,24 +34,12 @@ function getSyncCutoffDate(): string {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hook for managing inbox items with API-first architecture + Electric real-time deltas.
|
* Hook for managing inbox items with API-first architecture + Zero real-time deltas.
|
||||||
*
|
*
|
||||||
* Architecture (Documents pattern, per-tab):
|
* Architecture:
|
||||||
* 1. API is the PRIMARY data source — fetches first page on mount with category filter
|
* 1. API is the PRIMARY data source — fetches first page on mount with category filter
|
||||||
* 2. Electric provides REAL-TIME updates (new items, status changes, read state)
|
* 2. Zero provides REAL-TIME updates (new items, status changes, read state)
|
||||||
* 3. Baseline pattern prevents duplicates between API and Electric
|
* 3. Unread count = olderUnreadOffset + recent unread from Zero
|
||||||
* 4. Electric sync shape is SHARED across instances (client-level caching)
|
|
||||||
* — each instance creates its own type-filtered live queries
|
|
||||||
*
|
|
||||||
* Unread count strategy:
|
|
||||||
* - API provides the category-filtered total on mount (ground truth across all time)
|
|
||||||
* - Electric live query counts unread within SYNC_WINDOW_DAYS (filtered by type)
|
|
||||||
* - olderUnreadOffsetRef bridges the gap: total = offset + recent
|
|
||||||
* - Optimistic updates adjust both the count and the offset (for old items)
|
|
||||||
*
|
|
||||||
* @param userId - The user ID to fetch inbox items for
|
|
||||||
* @param searchSpaceId - The search space ID to filter inbox items
|
|
||||||
* @param category - Which tab: "comments" or "status"
|
|
||||||
*/
|
*/
|
||||||
export function useInbox(
|
export function useInbox(
|
||||||
userId: string | null,
|
userId: string | null,
|
||||||
|
|
@ -61,8 +48,6 @@ export function useInbox(
|
||||||
prefetchedUnread?: { total_unread: number; recent_unread: number } | null,
|
prefetchedUnread?: { total_unread: number; recent_unread: number } | null,
|
||||||
prefetchedUnreadReady = true
|
prefetchedUnreadReady = true
|
||||||
) {
|
) {
|
||||||
const electricClient = useElectricClient();
|
|
||||||
|
|
||||||
const [inboxItems, setInboxItems] = useState<InboxItem[]>([]);
|
const [inboxItems, setInboxItems] = useState<InboxItem[]>([]);
|
||||||
const [loading, setLoading] = useState(true);
|
const [loading, setLoading] = useState(true);
|
||||||
const [loadingMore, setLoadingMore] = useState(false);
|
const [loadingMore, setLoadingMore] = useState(false);
|
||||||
|
|
@ -71,17 +56,12 @@ export function useInbox(
|
||||||
const [unreadCount, setUnreadCount] = useState(0);
|
const [unreadCount, setUnreadCount] = useState(0);
|
||||||
|
|
||||||
const initialLoadDoneRef = useRef(false);
|
const initialLoadDoneRef = useRef(false);
|
||||||
const electricBaselineIdsRef = useRef<Set<number> | null>(null);
|
|
||||||
const newestApiTimestampRef = useRef<string | null>(null);
|
|
||||||
const liveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null);
|
|
||||||
const unreadLiveQueryRef = useRef<{ unsubscribe?: () => void } | null>(null);
|
|
||||||
|
|
||||||
const olderUnreadOffsetRef = useRef<number | null>(null);
|
const olderUnreadOffsetRef = useRef<number | null>(null);
|
||||||
const apiUnreadTotalRef = useRef(0);
|
const apiUnreadTotalRef = useRef(0);
|
||||||
|
|
||||||
// EFFECT 1: Fetch first page + unread count from API with category filter.
|
const categoryTypes = CATEGORY_TYPES[category];
|
||||||
// When prefetchedUnreadReady=false, we wait for the batch query to settle
|
|
||||||
// before deciding whether we need an individual unread-count fallback call.
|
// EFFECT 1: Fetch first page + unread count from API with category filter
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (!userId || !searchSpaceId) return;
|
if (!userId || !searchSpaceId) return;
|
||||||
if (!prefetchedUnreadReady) return;
|
if (!prefetchedUnreadReady) return;
|
||||||
|
|
@ -92,8 +72,6 @@ export function useInbox(
|
||||||
setInboxItems([]);
|
setInboxItems([]);
|
||||||
setHasMore(false);
|
setHasMore(false);
|
||||||
initialLoadDoneRef.current = false;
|
initialLoadDoneRef.current = false;
|
||||||
electricBaselineIdsRef.current = null;
|
|
||||||
newestApiTimestampRef.current = null;
|
|
||||||
olderUnreadOffsetRef.current = null;
|
olderUnreadOffsetRef.current = null;
|
||||||
apiUnreadTotalRef.current = 0;
|
apiUnreadTotalRef.current = 0;
|
||||||
|
|
||||||
|
|
@ -107,7 +85,6 @@ export function useInbox(
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
// Use prefetched counts when available, otherwise fetch individually.
|
|
||||||
const unreadPromise = prefetchedUnread
|
const unreadPromise = prefetchedUnread
|
||||||
? Promise.resolve(prefetchedUnread)
|
? Promise.resolve(prefetchedUnread)
|
||||||
: notificationsApiService.getUnreadCount(searchSpaceId, undefined, category);
|
: notificationsApiService.getUnreadCount(searchSpaceId, undefined, category);
|
||||||
|
|
@ -123,7 +100,6 @@ export function useInbox(
|
||||||
setHasMore(notificationsResponse.has_more);
|
setHasMore(notificationsResponse.has_more);
|
||||||
setUnreadCount(unreadResponse.total_unread);
|
setUnreadCount(unreadResponse.total_unread);
|
||||||
apiUnreadTotalRef.current = unreadResponse.total_unread;
|
apiUnreadTotalRef.current = unreadResponse.total_unread;
|
||||||
newestApiTimestampRef.current = getNewestTimestamp(notificationsResponse.items);
|
|
||||||
setError(null);
|
setError(null);
|
||||||
initialLoadDoneRef.current = true;
|
initialLoadDoneRef.current = true;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|
@ -141,208 +117,87 @@ export function useInbox(
|
||||||
};
|
};
|
||||||
}, [userId, searchSpaceId, category, prefetchedUnread, prefetchedUnreadReady]);
|
}, [userId, searchSpaceId, category, prefetchedUnread, prefetchedUnreadReady]);
|
||||||
|
|
||||||
// EFFECT 2: Electric sync (shared shape) + per-instance type-filtered live queries
|
// EFFECT 2: Zero real-time sync for notification updates
|
||||||
|
const [zeroNotifications] = useQuery(
|
||||||
|
queries.notifications.byUser({ userId: userId ?? "" })
|
||||||
|
);
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (!userId || !searchSpaceId || !electricClient) return;
|
if (!userId || !searchSpaceId || !zeroNotifications || !initialLoadDoneRef.current) return;
|
||||||
|
|
||||||
const uid = userId;
|
const cutoff = new Date(getSyncCutoffDate());
|
||||||
const spaceId = searchSpaceId;
|
|
||||||
const client = electricClient;
|
|
||||||
const typeFilter = CATEGORY_TYPE_SQL[category];
|
|
||||||
let mounted = true;
|
|
||||||
|
|
||||||
async function setupElectricRealtime() {
|
const validItems = zeroNotifications.filter((item) => {
|
||||||
// Clean up previous live queries (NOT the sync shape — it's shared)
|
if (item.id == null) return false;
|
||||||
if (liveQueryRef.current) {
|
if (!categoryTypes.includes(item.type)) return false;
|
||||||
try {
|
if (item.searchSpaceId !== null && item.searchSpaceId !== searchSpaceId) return false;
|
||||||
liveQueryRef.current.unsubscribe?.();
|
return true;
|
||||||
} catch {
|
});
|
||||||
/* PGlite may be closed */
|
|
||||||
|
const recentItems = validItems.filter(
|
||||||
|
(item) => new Date(item.createdAt) > cutoff
|
||||||
|
);
|
||||||
|
|
||||||
|
const liveIds = new Set(recentItems.map((d) => d.id));
|
||||||
|
|
||||||
|
setInboxItems((prev) => {
|
||||||
|
const prevIds = new Set(prev.map((d) => d.id));
|
||||||
|
|
||||||
|
const newItems: InboxItem[] = recentItems
|
||||||
|
.filter((d) => !prevIds.has(d.id))
|
||||||
|
.map((item) => ({
|
||||||
|
id: item.id,
|
||||||
|
user_id: item.userId,
|
||||||
|
search_space_id: item.searchSpaceId ?? undefined,
|
||||||
|
type: item.type,
|
||||||
|
title: item.title,
|
||||||
|
message: item.message,
|
||||||
|
read: item.read,
|
||||||
|
metadata: item.metadata as unknown as Record<string, unknown>,
|
||||||
|
created_at: String(item.createdAt),
|
||||||
|
updated_at: item.updatedAt ? String(item.updatedAt) : undefined,
|
||||||
|
} as InboxItem));
|
||||||
|
|
||||||
|
let updated = prev.map((existing) => {
|
||||||
|
const liveItem = recentItems.find((v) => v.id === existing.id);
|
||||||
|
if (liveItem) {
|
||||||
|
return {
|
||||||
|
...existing,
|
||||||
|
read: liveItem.read,
|
||||||
|
title: liveItem.title,
|
||||||
|
message: liveItem.message,
|
||||||
|
metadata: liveItem.metadata as unknown as Record<string, unknown>,
|
||||||
|
} as InboxItem;
|
||||||
}
|
}
|
||||||
liveQueryRef.current = null;
|
return existing;
|
||||||
}
|
});
|
||||||
if (unreadLiveQueryRef.current) {
|
|
||||||
try {
|
updated = updated.filter((item) => {
|
||||||
unreadLiveQueryRef.current.unsubscribe?.();
|
if (new Date(item.created_at) < cutoff) return true;
|
||||||
} catch {
|
return liveIds.has(item.id);
|
||||||
/* PGlite may be closed */
|
});
|
||||||
}
|
|
||||||
unreadLiveQueryRef.current = null;
|
if (newItems.length > 0) {
|
||||||
|
return [...newItems, ...updated];
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
return updated;
|
||||||
const cutoffDate = getSyncCutoffDate();
|
});
|
||||||
|
|
||||||
// Sync shape is cached by the Electric client — multiple hook instances
|
// Calibrate older-unread offset on first Zero data
|
||||||
// calling syncShape with the same params get the same handle.
|
if (olderUnreadOffsetRef.current === null) {
|
||||||
const handle = await client.syncShape({
|
const recentUnreadCount = recentItems.filter((item) => !item.read).length;
|
||||||
table: "notifications",
|
olderUnreadOffsetRef.current = Math.max(
|
||||||
where: `user_id = '${uid}' AND created_at > '${cutoffDate}'`,
|
0,
|
||||||
primaryKey: ["id"],
|
apiUnreadTotalRef.current - recentUnreadCount
|
||||||
});
|
);
|
||||||
|
|
||||||
if (!mounted) return;
|
|
||||||
|
|
||||||
if (!handle.isUpToDate && handle.initialSyncPromise) {
|
|
||||||
await Promise.race([
|
|
||||||
handle.initialSyncPromise,
|
|
||||||
new Promise((resolve) => setTimeout(resolve, 5000)),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!mounted) return;
|
|
||||||
|
|
||||||
const db = client.db as {
|
|
||||||
live?: {
|
|
||||||
query: <T>(
|
|
||||||
sql: string,
|
|
||||||
params?: (number | string)[]
|
|
||||||
) => Promise<{
|
|
||||||
subscribe: (cb: (result: { rows: T[] }) => void) => void;
|
|
||||||
unsubscribe?: () => void;
|
|
||||||
}>;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
if (!db.live?.query) return;
|
|
||||||
|
|
||||||
// Per-instance live query filtered by category types
|
|
||||||
const itemsQuery = `SELECT * FROM notifications
|
|
||||||
WHERE user_id = $1
|
|
||||||
AND (search_space_id = $2 OR search_space_id IS NULL)
|
|
||||||
AND created_at > '${cutoffDate}'
|
|
||||||
${typeFilter}
|
|
||||||
ORDER BY created_at DESC`;
|
|
||||||
|
|
||||||
const liveQuery = await db.live.query<InboxItem>(itemsQuery, [uid, spaceId]);
|
|
||||||
|
|
||||||
if (!mounted) {
|
|
||||||
liveQuery.unsubscribe?.();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
liveQuery.subscribe((result: { rows: InboxItem[] }) => {
|
|
||||||
if (!mounted || !result.rows || !initialLoadDoneRef.current) return;
|
|
||||||
|
|
||||||
const validItems = result.rows.filter((item) => item.id != null && item.title != null);
|
|
||||||
const cutoff = new Date(getSyncCutoffDate());
|
|
||||||
|
|
||||||
const liveItemMap = new Map(validItems.map((d) => [d.id, d]));
|
|
||||||
const liveIds = new Set(liveItemMap.keys());
|
|
||||||
|
|
||||||
setInboxItems((prev) => {
|
|
||||||
const prevIds = new Set(prev.map((d) => d.id));
|
|
||||||
|
|
||||||
const newItems = filterNewElectricItems(
|
|
||||||
validItems,
|
|
||||||
liveIds,
|
|
||||||
prevIds,
|
|
||||||
electricBaselineIdsRef,
|
|
||||||
newestApiTimestampRef.current
|
|
||||||
);
|
|
||||||
|
|
||||||
let updated = prev.map((item) => {
|
|
||||||
const liveItem = liveItemMap.get(item.id);
|
|
||||||
if (liveItem) return liveItem;
|
|
||||||
return item;
|
|
||||||
});
|
|
||||||
|
|
||||||
const isFullySynced = handle.isUpToDate;
|
|
||||||
if (isFullySynced) {
|
|
||||||
updated = updated.filter((item) => {
|
|
||||||
if (new Date(item.created_at) < cutoff) return true;
|
|
||||||
return liveIds.has(item.id);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (newItems.length > 0) {
|
|
||||||
return [...newItems, ...updated];
|
|
||||||
}
|
|
||||||
|
|
||||||
return updated;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Calibrate the older-unread offset using baseline items
|
|
||||||
// (items present in both Electric and the API-loaded list).
|
|
||||||
// This avoids the timing bug where new items arriving between
|
|
||||||
// the API fetch and Electric's first callback would be absorbed
|
|
||||||
// into the offset, making the count appear unchanged.
|
|
||||||
const baseline = electricBaselineIdsRef.current;
|
|
||||||
if (olderUnreadOffsetRef.current === null && baseline !== null) {
|
|
||||||
const baselineUnreadCount = validItems.filter(
|
|
||||||
(item) => baseline.has(item.id) && !item.read
|
|
||||||
).length;
|
|
||||||
olderUnreadOffsetRef.current = Math.max(
|
|
||||||
0,
|
|
||||||
apiUnreadTotalRef.current - baselineUnreadCount
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Derive unread count from all Electric items + the older offset
|
|
||||||
if (olderUnreadOffsetRef.current !== null) {
|
|
||||||
const electricUnreadCount = validItems.filter((item) => !item.read).length;
|
|
||||||
setUnreadCount(olderUnreadOffsetRef.current + electricUnreadCount);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
liveQueryRef.current = liveQuery;
|
|
||||||
|
|
||||||
// Per-instance unread count live query filtered by category types.
|
|
||||||
// Acts as a secondary reactive path for read-status changes that
|
|
||||||
// may not trigger the items live query in all edge cases.
|
|
||||||
const countQuery = `SELECT COUNT(*) as count FROM notifications
|
|
||||||
WHERE user_id = $1
|
|
||||||
AND (search_space_id = $2 OR search_space_id IS NULL)
|
|
||||||
AND created_at > '${cutoffDate}'
|
|
||||||
AND read = false
|
|
||||||
${typeFilter}`;
|
|
||||||
|
|
||||||
const countLiveQuery = await db.live.query<{ count: number | string }>(countQuery, [
|
|
||||||
uid,
|
|
||||||
spaceId,
|
|
||||||
]);
|
|
||||||
|
|
||||||
if (!mounted) {
|
|
||||||
countLiveQuery.unsubscribe?.();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
countLiveQuery.subscribe((result: { rows: Array<{ count: number | string }> }) => {
|
|
||||||
if (!mounted || !result.rows?.[0] || !initialLoadDoneRef.current) return;
|
|
||||||
if (olderUnreadOffsetRef.current === null) return;
|
|
||||||
const liveRecentUnread = Number(result.rows[0].count) || 0;
|
|
||||||
setUnreadCount(olderUnreadOffsetRef.current + liveRecentUnread);
|
|
||||||
});
|
|
||||||
|
|
||||||
unreadLiveQueryRef.current = countLiveQuery;
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`[useInbox:${category}] Electric setup failed:`, err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
setupElectricRealtime();
|
if (olderUnreadOffsetRef.current !== null) {
|
||||||
|
const recentUnreadCount = recentItems.filter((item) => !item.read).length;
|
||||||
return () => {
|
setUnreadCount(olderUnreadOffsetRef.current + recentUnreadCount);
|
||||||
mounted = false;
|
}
|
||||||
// Only clean up live queries — sync shape is shared across instances
|
}, [userId, searchSpaceId, zeroNotifications, categoryTypes]);
|
||||||
if (liveQueryRef.current) {
|
|
||||||
try {
|
|
||||||
liveQueryRef.current.unsubscribe?.();
|
|
||||||
} catch {
|
|
||||||
/* PGlite may be closed */
|
|
||||||
}
|
|
||||||
liveQueryRef.current = null;
|
|
||||||
}
|
|
||||||
if (unreadLiveQueryRef.current) {
|
|
||||||
try {
|
|
||||||
unreadLiveQueryRef.current.unsubscribe?.();
|
|
||||||
} catch {
|
|
||||||
/* PGlite may be closed */
|
|
||||||
}
|
|
||||||
unreadLiveQueryRef.current = null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}, [userId, searchSpaceId, electricClient, category]);
|
|
||||||
|
|
||||||
// Load more pages via API (cursor-based using before_date)
|
// Load more pages via API (cursor-based using before_date)
|
||||||
const loadMore = useCallback(async () => {
|
const loadMore = useCallback(async () => {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue