diff --git a/surfsense_backend/app/routes/notifications_routes.py b/surfsense_backend/app/routes/notifications_routes.py index 4f80c6529..72b6da9a6 100644 --- a/surfsense_backend/app/routes/notifications_routes.py +++ b/surfsense_backend/app/routes/notifications_routes.py @@ -10,7 +10,7 @@ from typing import Literal from fastapi import APIRouter, Depends, HTTPException, Query, status from pydantic import BaseModel -from sqlalchemy import desc, func, select, update +from sqlalchemy import desc, func, literal, literal_column, select, update from sqlalchemy.ext.asyncio import AsyncSession from app.db import Notification, User, get_async_session @@ -69,6 +69,21 @@ class MarkAllReadResponse(BaseModel): updated_count: int +class SourceTypeItem(BaseModel): + """A single source type with its category and count.""" + + key: str + type: str + category: str # "connector" or "document" + count: int + + +class SourceTypesResponse(BaseModel): + """Response for notification source types used in status tab filter.""" + + sources: list[SourceTypeItem] + + class UnreadCountResponse(BaseModel): """Response for unread count with split between recent and older items.""" @@ -76,6 +91,74 @@ class UnreadCountResponse(BaseModel): recent_unread: int # Within SYNC_WINDOW_DAYS +@router.get("/source-types", response_model=SourceTypesResponse) +async def get_notification_source_types( + search_space_id: int | None = Query(None, description="Filter by search space ID"), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> SourceTypesResponse: + """ + Get all distinct connector types and document types from the user's + status notifications. Used to populate the filter dropdown in the + inbox Status tab so that all types are shown regardless of pagination. + """ + base_filter = [Notification.user_id == user.id] + + if search_space_id is not None: + base_filter.append( + (Notification.search_space_id == search_space_id) + | (Notification.search_space_id.is_(None)) + ) + + connector_type_expr = Notification.notification_metadata["connector_type"].astext + connector_query = ( + select( + connector_type_expr.label("source_type"), + literal("connector").label("category"), + func.count(Notification.id).label("cnt"), + ) + .where( + *base_filter, + Notification.type.in_(("connector_indexing", "connector_deletion")), + connector_type_expr.isnot(None), + ) + .group_by(literal_column("source_type")) + ) + + document_type_expr = Notification.notification_metadata["document_type"].astext + document_query = ( + select( + document_type_expr.label("source_type"), + literal("document").label("category"), + func.count(Notification.id).label("cnt"), + ) + .where( + *base_filter, + Notification.type.in_(("document_processing",)), + document_type_expr.isnot(None), + ) + .group_by(literal_column("source_type")) + ) + + connector_result = await session.execute(connector_query) + document_result = await session.execute(document_query) + + sources = [] + for source_type, category, count in [*connector_result.all(), *document_result.all()]: + if not source_type: + continue + sources.append( + SourceTypeItem( + key=f"{category}:{source_type}", + type=source_type, + category=category, + count=count, + ) + ) + + return SourceTypesResponse(sources=sources) + + @router.get("/unread-count", response_model=UnreadCountResponse) async def get_unread_count( search_space_id: int | None = Query(None, description="Filter by search space ID"), @@ -141,6 +224,10 @@ async def list_notifications( type_filter: NotificationType | None = Query( None, alias="type", description="Filter by notification type" ), + source_type: str | None = Query( + None, + description="Filter by source type, e.g. 'connector:GITHUB_CONNECTOR' or 'doctype:FILE'", + ), before_date: str | None = Query( None, description="Get notifications before this ISO date (for pagination)" ), @@ -182,6 +269,31 @@ async def list_notifications( query = query.where(Notification.type == type_filter) count_query = count_query.where(Notification.type == type_filter) + # Filter by source type (connector or document type from JSONB metadata) + if source_type: + if source_type.startswith("connector:"): + connector_val = source_type[len("connector:"):] + source_filter = ( + Notification.type.in_(("connector_indexing", "connector_deletion")) + & ( + Notification.notification_metadata["connector_type"].astext + == connector_val + ) + ) + query = query.where(source_filter) + count_query = count_query.where(source_filter) + elif source_type.startswith("doctype:"): + doctype_val = source_type[len("doctype:"):] + source_filter = ( + Notification.type.in_(("document_processing",)) + & ( + Notification.notification_metadata["document_type"].astext + == doctype_val + ) + ) + query = query.where(source_filter) + count_query = count_query.where(source_filter) + # Filter by date (for efficient pagination of older items) if before_date: try: diff --git a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx index 684bf6b85..12b2b7457 100644 --- a/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx +++ b/surfsense_web/components/layout/ui/sidebar/InboxSidebar.tsx @@ -291,100 +291,128 @@ export function InboxSidebar({ activeTab === "comments" ? (mentions.hasMore ?? false) : (status.hasMore ?? false); const loadMore = activeTab === "comments" ? mentions.loadMore : status.loadMore; - // Get unique source types (connectors + document types) from status items for filtering + // Fetch ALL source types from the backend so the filter shows every connector/document + // type the user has notifications for, regardless of how many items are loaded via pagination. + const { data: sourceTypesData } = useQuery({ + queryKey: cacheKeys.notifications.sourceTypes(searchSpaceId), + queryFn: () => notificationsApiService.getSourceTypes(searchSpaceId ?? undefined), + staleTime: 60 * 1000, + enabled: open && activeTab === "status", + }); + const statusSourceOptions = useMemo(() => { - const sources: Array<{ - key: string; - type: string; - category: "connector" | "document"; - displayName: string; - }> = []; - const seenConnectors = new Set(); - const seenDocTypes = new Set(); + if (!sourceTypesData?.sources) return []; - for (const item of statusItems) { - if (item.type === "connector_indexing" && isConnectorIndexingMetadata(item.metadata)) { - const ct = item.metadata.connector_type; - if (!seenConnectors.has(ct)) { - seenConnectors.add(ct); - sources.push({ - key: `connector:${ct}`, - type: ct, - category: "connector", - displayName: getConnectorTypeDisplayName(ct), - }); - } - } else if ( - item.type === "document_processing" && - isDocumentProcessingMetadata(item.metadata) - ) { - const dt = item.metadata.document_type; - if (!seenDocTypes.has(dt)) { - seenDocTypes.add(dt); - sources.push({ - key: `doctype:${dt}`, - type: dt, - category: "document", - displayName: getDocumentTypeLabel(dt), - }); - } - } - } - - return sources; - }, [statusItems]); + return sourceTypesData.sources.map((source) => ({ + key: source.key, + type: source.type, + category: source.category, + displayName: + source.category === "connector" + ? getConnectorTypeDisplayName(source.type) + : getDocumentTypeLabel(source.type), + })); + }, [sourceTypesData]); // Get items for current tab const displayItems = activeTab === "comments" ? commentsItems : statusItems; - // Filter items based on filter type, connector filter, and search mode - // When searching: use server-side API results (searches ALL notifications) - // When not searching: use Electric real-time items (fast, local) - const filteredItems = useMemo(() => { - // In search mode, use API results - let items: InboxItem[] = isSearchMode ? (searchResponse?.items ?? []) : displayItems; + // When a source filter is active, fetch matching items from the API so + // older items (outside the Electric sync window) are included. + const isSourceFilterMode = activeTab === "status" && !!selectedSource; + const { data: sourceFilterResponse, isLoading: isSourceFilterLoading } = useQuery({ + queryKey: cacheKeys.notifications.bySourceType(searchSpaceId, selectedSource ?? ""), + queryFn: () => + notificationsApiService.getNotifications({ + queryParams: { + search_space_id: searchSpaceId ?? undefined, + source_type: selectedSource ?? undefined, + limit: 50, + }, + }), + staleTime: 30 * 1000, + enabled: isSourceFilterMode && open && !isSearchMode, + }); - // For status tab search results, filter to status-specific types - if (isSearchMode && activeTab === "status") { - items = items.filter( - (item) => - item.type === "connector_indexing" || - item.type === "document_processing" || - item.type === "page_limit_exceeded" || - item.type === "connector_deletion" + // Client-side matcher: checks if an item matches the active source filter + const matchesSourceFilter = useCallback( + (item: InboxItem): boolean => { + if (!selectedSource) return true; + if (selectedSource.startsWith("connector:")) { + const connectorType = selectedSource.slice("connector:".length); + return ( + item.type === "connector_indexing" && + isConnectorIndexingMetadata(item.metadata) && + item.metadata.connector_type === connectorType + ); + } + if (selectedSource.startsWith("doctype:")) { + const docType = selectedSource.slice("doctype:".length); + return ( + item.type === "document_processing" && + isDocumentProcessingMetadata(item.metadata) && + item.metadata.document_type === docType + ); + } + return true; + }, + [selectedSource] + ); + + // Filter items based on filter type, connector filter, and search mode + // Three data paths: + // 1. Search mode → server-side search results + // 2. Source filter mode → API results merged with real-time Electric items + // 3. Default → Electric real-time items (fast, local) + const filteredItems = useMemo(() => { + let items: InboxItem[]; + + if (isSearchMode) { + items = searchResponse?.items ?? []; + if (activeTab === "status") { + items = items.filter( + (item) => + item.type === "connector_indexing" || + item.type === "document_processing" || + item.type === "page_limit_exceeded" || + item.type === "connector_deletion" + ); + } + } else if (isSourceFilterMode) { + // Merge API results (covers older items) with Electric real-time items + // that match the filter (covers brand-new items arriving in real-time). + const apiItems = sourceFilterResponse?.items ?? []; + const realtimeMatching = statusItems.filter(matchesSourceFilter); + const seen = new Set(apiItems.map((i) => i.id)); + const merged = [...apiItems]; + for (const item of realtimeMatching) { + if (!seen.has(item.id)) { + merged.push(item); + } + } + items = merged.sort( + (a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime() ); + } else { + items = displayItems; } - // Apply read/unread filter if (activeFilter === "unread") { items = items.filter((item) => !item.read); } - // Apply source filter (connector type or document type, for status tab only) - if (activeTab === "status" && selectedSource) { - items = items.filter((item) => { - if (selectedSource.startsWith("connector:")) { - const connectorType = selectedSource.slice("connector:".length); - return ( - item.type === "connector_indexing" && - isConnectorIndexingMetadata(item.metadata) && - item.metadata.connector_type === connectorType - ); - } - if (selectedSource.startsWith("doctype:")) { - const docType = selectedSource.slice("doctype:".length); - return ( - item.type === "document_processing" && - isDocumentProcessingMetadata(item.metadata) && - item.metadata.document_type === docType - ); - } - return true; - }); - } - return items; - }, [displayItems, searchResponse, isSearchMode, activeFilter, activeTab, selectedSource]); + }, [ + displayItems, + statusItems, + searchResponse, + sourceFilterResponse, + isSearchMode, + isSourceFilterMode, + matchesSourceFilter, + activeFilter, + activeTab, + ]); // Intersection Observer for infinite scroll with prefetching // Re-runs when active tab changes so each tab gets its own pagination @@ -953,7 +981,7 @@ export function InboxSidebar({
- {(isSearchMode ? isSearchLoading : loading) ? ( + {(isSearchMode ? isSearchLoading : isSourceFilterMode ? isSourceFilterLoading : loading) ? (
{activeTab === "comments" ? /* Comments skeleton: avatar + two-line text + time */ diff --git a/surfsense_web/contracts/types/inbox.types.ts b/surfsense_web/contracts/types/inbox.types.ts index bd04eefd1..dc0bfb6a1 100644 --- a/surfsense_web/contracts/types/inbox.types.ts +++ b/surfsense_web/contracts/types/inbox.types.ts @@ -204,6 +204,7 @@ export const getNotificationsRequest = z.object({ queryParams: z.object({ search_space_id: z.number().optional(), type: inboxItemTypeEnum.optional(), + source_type: z.string().optional(), before_date: z.string().optional(), search: z.string().optional(), limit: z.number().min(1).max(100).optional(), @@ -261,6 +262,20 @@ export const getUnreadCountResponse = z.object({ recent_unread: z.number(), // Within SYNC_WINDOW_DAYS (14 days) }); +/** + * Response schema for notification source types (status tab filter) + */ +export const sourceTypeItem = z.object({ + key: z.string(), + type: z.string(), + category: z.enum(["connector", "document"]), + count: z.number(), +}); + +export const getSourceTypesResponse = z.object({ + sources: z.array(sourceTypeItem), +}); + // ============================================================================= // Type Guards for Metadata // ============================================================================= @@ -387,3 +402,5 @@ export type MarkNotificationReadResponse = z.infer; export type GetUnreadCountRequest = z.infer; export type GetUnreadCountResponse = z.infer; +export type SourceTypeItem = z.infer; +export type GetSourceTypesResponse = z.infer; diff --git a/surfsense_web/lib/apis/notifications-api.service.ts b/surfsense_web/lib/apis/notifications-api.service.ts index 086633d81..7c865f343 100644 --- a/surfsense_web/lib/apis/notifications-api.service.ts +++ b/surfsense_web/lib/apis/notifications-api.service.ts @@ -1,9 +1,11 @@ import { type GetNotificationsRequest, type GetNotificationsResponse, + type GetSourceTypesResponse, type GetUnreadCountResponse, getNotificationsRequest, getNotificationsResponse, + getSourceTypesResponse, getUnreadCountResponse, type InboxItemTypeEnum, type MarkAllNotificationsReadResponse, @@ -42,6 +44,9 @@ class NotificationsApiService { if (queryParams.type) { params.append("type", queryParams.type); } + if (queryParams.source_type) { + params.append("source_type", queryParams.source_type); + } if (queryParams.before_date) { params.append("before_date", queryParams.before_date); } @@ -92,6 +97,23 @@ class NotificationsApiService { return baseApiService.patch("/api/v1/notifications/read-all", markAllNotificationsReadResponse); }; + /** + * Get distinct source types (connector + document types) across all + * status notifications. Used to populate the inbox Status tab filter. + */ + getSourceTypes = async (searchSpaceId?: number): Promise => { + const params = new URLSearchParams(); + if (searchSpaceId !== undefined) { + params.append("search_space_id", String(searchSpaceId)); + } + const queryString = params.toString(); + + return baseApiService.get( + `/api/v1/notifications/source-types${queryString ? `?${queryString}` : ""}`, + getSourceTypesResponse + ); + }; + /** * Get unread notification count with split between total and recent * - total_unread: All unread notifications diff --git a/surfsense_web/lib/query-client/cache-keys.ts b/surfsense_web/lib/query-client/cache-keys.ts index b08d8b510..717633206 100644 --- a/surfsense_web/lib/query-client/cache-keys.ts +++ b/surfsense_web/lib/query-client/cache-keys.ts @@ -96,5 +96,9 @@ export const cacheKeys = { notifications: { search: (searchSpaceId: number | null, search: string, tab: string) => ["notifications", "search", searchSpaceId, search, tab] as const, + sourceTypes: (searchSpaceId: number | null) => + ["notifications", "source-types", searchSpaceId] as const, + bySourceType: (searchSpaceId: number | null, sourceType: string) => + ["notifications", "by-source-type", searchSpaceId, sourceType] as const, }, };