Merge branch 'feat/inbox' into feat/composio

This commit is contained in:
Anish Sarkar 2026-01-22 23:02:00 +05:30
commit 4bff1e8c10
4 changed files with 268 additions and 38 deletions

View file

@ -5,7 +5,7 @@ Electric SQL automatically syncs the changes to all connected clients for recent
For older items (beyond the sync window), use the list endpoint.
"""
from datetime import datetime
from datetime import UTC, datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
@ -17,6 +17,9 @@ from app.users import current_active_user
router = APIRouter(prefix="/notifications", tags=["notifications"])
# Must match frontend SYNC_WINDOW_DAYS in use-inbox.ts
SYNC_WINDOW_DAYS = 14
class NotificationResponse(BaseModel):
"""Response model for a single notification."""
@ -60,11 +63,68 @@ class MarkAllReadResponse(BaseModel):
updated_count: int
class UnreadCountResponse(BaseModel):
"""Response for unread count with split between recent and older items."""
total_unread: int
recent_unread: int # Within SYNC_WINDOW_DAYS
@router.get("/unread-count", response_model=UnreadCountResponse)
async def get_unread_count(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> UnreadCountResponse:
"""
Get the total unread notification count for the current user.
Returns both:
- total_unread: All unread notifications (for accurate badge count)
- recent_unread: Unread notifications within the sync window (last 14 days)
This allows the frontend to calculate:
- older_unread = total_unread - recent_unread (static until reconciliation)
- Display count = older_unread + live_recent_count (from Electric SQL)
"""
# Calculate cutoff date for sync window
cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS)
# Base filter for user's unread notifications
base_filter = [
Notification.user_id == user.id,
Notification.read == False, # noqa: E712
]
# Add search space filter if provided (include null for global notifications)
if search_space_id is not None:
base_filter.append(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
# Total unread count (all time)
total_query = select(func.count(Notification.id)).where(*base_filter)
total_result = await session.execute(total_query)
total_unread = total_result.scalar() or 0
# Recent unread count (within sync window)
recent_query = select(func.count(Notification.id)).where(
*base_filter,
Notification.created_at > cutoff_date,
)
recent_result = await session.execute(recent_query)
recent_unread = recent_result.scalar() or 0
return UnreadCountResponse(
total_unread=total_unread,
recent_unread=recent_unread,
)
@router.get("", response_model=NotificationListResponse)
async def list_notifications(
search_space_id: int | None = Query(
None, description="Filter by search space ID"
),
search_space_id: int | None = Query(None, description="Filter by search space ID"),
type_filter: str | None = Query(
None, alias="type", description="Filter by notification type"
),

View file

@ -184,6 +184,22 @@ export const markAllNotificationsReadResponse = z.object({
updated_count: z.number(),
});
/**
* Request schema for getting unread count
*/
export const getUnreadCountRequest = z.object({
search_space_id: z.number().optional(),
});
/**
* Response schema for unread count
* Returns both total and recent counts for split tracking
*/
export const getUnreadCountResponse = z.object({
total_unread: z.number(),
recent_unread: z.number(), // Within SYNC_WINDOW_DAYS (14 days)
});
// =============================================================================
// Type Guards for Metadata
// =============================================================================
@ -261,3 +277,5 @@ export type GetNotificationsResponse = z.infer<typeof getNotificationsResponse>;
export type MarkNotificationReadRequest = z.infer<typeof markNotificationReadRequest>;
export type MarkNotificationReadResponse = z.infer<typeof markNotificationReadResponse>;
export type MarkAllNotificationsReadResponse = z.infer<typeof markAllNotificationsReadResponse>;
export type GetUnreadCountRequest = z.infer<typeof getUnreadCountRequest>;
export type GetUnreadCountResponse = z.infer<typeof getUnreadCountResponse>;

View file

@ -1,6 +1,6 @@
"use client";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import type { InboxItem, InboxItemTypeEnum } from "@/contracts/types/inbox.types";
import { notificationsApiService } from "@/lib/apis/notifications-api.service";
import type { SyncHandle } from "@/lib/electric/client";
@ -11,6 +11,15 @@ export type { InboxItem, InboxItemTypeEnum } from "@/contracts/types/inbox.types
const PAGE_SIZE = 50;
const SYNC_WINDOW_DAYS = 14;
/**
* Check if an item is older than the sync window
*/
function isOlderThanSyncWindow(createdAt: string): boolean {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - SYNC_WINDOW_DAYS);
return new Date(createdAt) < cutoffDate;
}
/**
* Deduplicate by ID and sort by created_at descending.
* This is the SINGLE source of truth for deduplication - prevents race conditions.
@ -84,15 +93,19 @@ export function useInbox(
const [hasMore, setHasMore] = useState(true);
const [error, setError] = useState<Error | null>(null);
// Split unread count tracking for accurate counts with 14-day sync window
// olderUnreadCount = unread items OLDER than sync window (from server, static until reconciliation)
// recentUnreadCount = unread items within sync window (from live query, real-time)
const [olderUnreadCount, setOlderUnreadCount] = useState(0);
const [recentUnreadCount, setRecentUnreadCount] = useState(0);
const syncHandleRef = useRef<SyncHandle | null>(null);
const liveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
const userSyncKeyRef = useRef<string | null>(null);
const unreadCountLiveQueryRef = useRef<{ unsubscribe: () => void } | null>(null);
// Calculate unread count from inboxItems (includes both recent and older when loaded)
// This ensures the count is always in sync with what's displayed
const totalUnreadCount = useMemo(() => {
return inboxItems.filter((item) => !item.read).length;
}, [inboxItems]);
// Total unread = older (static from server) + recent (live from Electric)
const totalUnreadCount = olderUnreadCount + recentUnreadCount;
// EFFECT 1: Electric SQL sync for real-time updates
useEffect(() => {
@ -167,6 +180,9 @@ export function useInbox(
useEffect(() => {
setHasMore(true);
setInboxItems([]);
// Reset count states - will be refetched by the unread count effect
setOlderUnreadCount(0);
setRecentUnreadCount(0);
}, [userId, searchSpaceId, typeFilter]);
// EFFECT 2: Live query for real-time updates + auto-fetch from API if empty
@ -283,6 +299,97 @@ export function useInbox(
};
}, [userId, searchSpaceId, typeFilter, electricClient]);
// EFFECT 3: Dedicated unread count sync with split tracking
// - Fetches server count on mount (accurate total)
// - Sets up live query for recent count (real-time updates)
// - Handles items older than sync window separately
useEffect(() => {
if (!userId || !electricClient) return;
const client = electricClient;
let mounted = true;
async function setupUnreadCountSync() {
// Cleanup previous live query
if (unreadCountLiveQueryRef.current) {
unreadCountLiveQueryRef.current.unsubscribe();
unreadCountLiveQueryRef.current = null;
}
try {
// STEP 1: Fetch server counts (total and recent) - guaranteed accurate
console.log("[useInbox] Fetching unread count from server");
const serverCounts = await notificationsApiService.getUnreadCount(
searchSpaceId ?? undefined
);
if (mounted) {
// Calculate older count = total - recent
const olderCount = serverCounts.total_unread - serverCounts.recent_unread;
setOlderUnreadCount(olderCount);
setRecentUnreadCount(serverCounts.recent_unread);
console.log(
`[useInbox] Server counts: total=${serverCounts.total_unread}, recent=${serverCounts.recent_unread}, older=${olderCount}`
);
}
// STEP 2: Set up PGLite live query for RECENT unread count only
// This provides real-time updates for notifications within sync window
const db = client.db as any;
const cutoff = getSyncCutoffDate();
// Count query - NO LIMIT, counts all unread in synced window
const countQuery = `
SELECT COUNT(*) as count FROM notifications
WHERE user_id = $1
AND (search_space_id = $2 OR search_space_id IS NULL)
AND created_at > '${cutoff}'
AND read = false
${typeFilter ? "AND type = $3" : ""}
`;
const params = typeFilter ? [userId, searchSpaceId, typeFilter] : [userId, searchSpaceId];
if (db.live?.query) {
const liveQuery = await db.live.query(countQuery, params);
if (!mounted) {
liveQuery.unsubscribe?.();
return;
}
if (liveQuery.subscribe) {
liveQuery.subscribe((result: { rows: Array<{ count: number | string }> }) => {
if (mounted && result.rows?.[0]) {
const liveCount = Number(result.rows[0].count) || 0;
// Update recent count from live query
// This fires in real-time when Electric syncs new/updated notifications
setRecentUnreadCount(liveCount);
}
});
}
if (liveQuery.unsubscribe) {
unreadCountLiveQueryRef.current = liveQuery;
}
}
} catch (err) {
console.error("[useInbox] Unread count sync error:", err);
// On error, counts will remain at 0 or previous values
// The items-based count will be the fallback
}
}
setupUnreadCountSync();
return () => {
mounted = false;
if (unreadCountLiveQueryRef.current) {
unreadCountLiveQueryRef.current.unsubscribe();
unreadCountLiveQueryRef.current = null;
}
};
}, [userId, searchSpaceId, typeFilter, electricClient]);
// loadMore - Pure cursor-based pagination, no race conditions
// Cursor is computed from current state, not stored in refs
const loadMore = useCallback(async () => {
@ -325,39 +432,60 @@ export function useInbox(
}, [userId, searchSpaceId, typeFilter, loadingMore, hasMore, inboxItems]);
// Mark inbox item as read with optimistic update
const markAsRead = useCallback(async (itemId: number) => {
// Optimistic update: mark as read immediately for instant UI feedback
setInboxItems((prev) =>
prev.map((item) => (item.id === itemId ? { ...item, read: true } : item))
);
// Handles both recent items (live query updates count) and older items (manual count decrement)
const markAsRead = useCallback(
async (itemId: number) => {
// Find the item to check if it's older than sync window
const item = inboxItems.find((i) => i.id === itemId);
const isOlderItem = item && !item.read && isOlderThanSyncWindow(item.created_at);
try {
// Use the API service with proper Zod validation
const result = await notificationsApiService.markAsRead({ notificationId: itemId });
// Optimistic update: mark as read immediately for instant UI feedback
setInboxItems((prev) => prev.map((i) => (i.id === itemId ? { ...i, read: true } : i)));
if (!result.success) {
// Rollback on error
setInboxItems((prev) =>
prev.map((item) => (item.id === itemId ? { ...item, read: false } : item))
);
// If older item, manually decrement older count
// (live query won't see items outside sync window)
if (isOlderItem) {
setOlderUnreadCount((prev) => Math.max(0, prev - 1));
}
// If successful, Electric SQL will sync the change and live query will update
// This ensures eventual consistency even if optimistic update was wrong
return result.success;
} catch (err) {
console.error("Failed to mark as read:", err);
// Rollback on error
setInboxItems((prev) =>
prev.map((item) => (item.id === itemId ? { ...item, read: false } : item))
);
return false;
}
}, []);
try {
// Use the API service with proper Zod validation
const result = await notificationsApiService.markAsRead({ notificationId: itemId });
if (!result.success) {
// Rollback on error
setInboxItems((prev) => prev.map((i) => (i.id === itemId ? { ...i, read: false } : i)));
if (isOlderItem) {
setOlderUnreadCount((prev) => prev + 1);
}
}
// If successful, Electric SQL will sync the change and live query will update
// This ensures eventual consistency even if optimistic update was wrong
return result.success;
} catch (err) {
console.error("Failed to mark as read:", err);
// Rollback on error
setInboxItems((prev) => prev.map((i) => (i.id === itemId ? { ...i, read: false } : i)));
if (isOlderItem) {
setOlderUnreadCount((prev) => prev + 1);
}
return false;
}
},
[inboxItems]
);
// Mark all inbox items as read with optimistic update
// Resets both older and recent counts to 0
const markAllAsRead = useCallback(async () => {
// Store previous counts for potential rollback
const prevOlderCount = olderUnreadCount;
const prevRecentCount = recentUnreadCount;
// Optimistic update: mark all as read immediately for instant UI feedback
setInboxItems((prev) => prev.map((item) => ({ ...item, read: true })));
setOlderUnreadCount(0);
setRecentUnreadCount(0);
try {
// Use the API service with proper Zod validation
@ -365,16 +493,20 @@ export function useInbox(
if (!result.success) {
console.error("Failed to mark all as read");
// On error, let Electric SQL sync correct the state
// Rollback counts on error
setOlderUnreadCount(prevOlderCount);
setRecentUnreadCount(prevRecentCount);
}
// Electric SQL will sync and live query will ensure consistency
return result.success;
} catch (err) {
console.error("Failed to mark all as read:", err);
// On error, let Electric SQL sync correct the state
// Rollback counts on error
setOlderUnreadCount(prevOlderCount);
setRecentUnreadCount(prevRecentCount);
return false;
}
}, []);
}, [olderUnreadCount, recentUnreadCount]);
return {
inboxItems,

View file

@ -1,11 +1,13 @@
import {
type GetNotificationsRequest,
type GetNotificationsResponse,
type GetUnreadCountResponse,
type MarkAllNotificationsReadResponse,
type MarkNotificationReadRequest,
type MarkNotificationReadResponse,
getNotificationsRequest,
getNotificationsResponse,
getUnreadCountResponse,
markAllNotificationsReadResponse,
markNotificationReadRequest,
markNotificationReadResponse,
@ -85,6 +87,24 @@ class NotificationsApiService {
markAllAsRead = async (): Promise<MarkAllNotificationsReadResponse> => {
return baseApiService.patch("/api/v1/notifications/read-all", markAllNotificationsReadResponse);
};
/**
* Get unread notification count with split between total and recent
* - total_unread: All unread notifications
* - recent_unread: Unread within sync window (last 14 days)
*/
getUnreadCount = async (searchSpaceId?: number): Promise<GetUnreadCountResponse> => {
const params = new URLSearchParams();
if (searchSpaceId !== undefined) {
params.append("search_space_id", String(searchSpaceId));
}
const queryString = params.toString();
return baseApiService.get(
`/api/v1/notifications/unread-count${queryString ? `?${queryString}` : ""}`,
getUnreadCountResponse
);
};
}
export const notificationsApiService = new NotificationsApiService();