feat: implement source type filtering in notifications API and UI, enhancing user experience by allowing users to filter notifications by connector and document types in the status tab

This commit is contained in:
Anish Sarkar 2026-03-06 17:25:07 +05:30
parent fe0b026315
commit d03f938fcd
5 changed files with 264 additions and 81 deletions

View file

@ -10,7 +10,7 @@ from typing import Literal
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlalchemy import desc, func, select, update
from sqlalchemy import desc, func, literal, literal_column, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Notification, User, get_async_session
@ -69,6 +69,21 @@ class MarkAllReadResponse(BaseModel):
updated_count: int
class SourceTypeItem(BaseModel):
"""A single source type with its category and count."""
key: str
type: str
category: str # "connector" or "document"
count: int
class SourceTypesResponse(BaseModel):
"""Response for notification source types used in status tab filter."""
sources: list[SourceTypeItem]
class UnreadCountResponse(BaseModel):
"""Response for unread count with split between recent and older items."""
@ -76,6 +91,74 @@ class UnreadCountResponse(BaseModel):
recent_unread: int # Within SYNC_WINDOW_DAYS
@router.get("/source-types", response_model=SourceTypesResponse)
async def get_notification_source_types(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> SourceTypesResponse:
"""
Get all distinct connector types and document types from the user's
status notifications. Used to populate the filter dropdown in the
inbox Status tab so that all types are shown regardless of pagination.
"""
base_filter = [Notification.user_id == user.id]
if search_space_id is not None:
base_filter.append(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
connector_type_expr = Notification.notification_metadata["connector_type"].astext
connector_query = (
select(
connector_type_expr.label("source_type"),
literal("connector").label("category"),
func.count(Notification.id).label("cnt"),
)
.where(
*base_filter,
Notification.type.in_(("connector_indexing", "connector_deletion")),
connector_type_expr.isnot(None),
)
.group_by(literal_column("source_type"))
)
document_type_expr = Notification.notification_metadata["document_type"].astext
document_query = (
select(
document_type_expr.label("source_type"),
literal("document").label("category"),
func.count(Notification.id).label("cnt"),
)
.where(
*base_filter,
Notification.type.in_(("document_processing",)),
document_type_expr.isnot(None),
)
.group_by(literal_column("source_type"))
)
connector_result = await session.execute(connector_query)
document_result = await session.execute(document_query)
sources = []
for source_type, category, count in [*connector_result.all(), *document_result.all()]:
if not source_type:
continue
sources.append(
SourceTypeItem(
key=f"{category}:{source_type}",
type=source_type,
category=category,
count=count,
)
)
return SourceTypesResponse(sources=sources)
@router.get("/unread-count", response_model=UnreadCountResponse)
async def get_unread_count(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
@ -141,6 +224,10 @@ async def list_notifications(
type_filter: NotificationType | None = Query(
None, alias="type", description="Filter by notification type"
),
source_type: str | None = Query(
None,
description="Filter by source type, e.g. 'connector:GITHUB_CONNECTOR' or 'doctype:FILE'",
),
before_date: str | None = Query(
None, description="Get notifications before this ISO date (for pagination)"
),
@ -182,6 +269,31 @@ async def list_notifications(
query = query.where(Notification.type == type_filter)
count_query = count_query.where(Notification.type == type_filter)
# Filter by source type (connector or document type from JSONB metadata)
if source_type:
if source_type.startswith("connector:"):
connector_val = source_type[len("connector:"):]
source_filter = (
Notification.type.in_(("connector_indexing", "connector_deletion"))
& (
Notification.notification_metadata["connector_type"].astext
== connector_val
)
)
query = query.where(source_filter)
count_query = count_query.where(source_filter)
elif source_type.startswith("doctype:"):
doctype_val = source_type[len("doctype:"):]
source_filter = (
Notification.type.in_(("document_processing",))
& (
Notification.notification_metadata["document_type"].astext
== doctype_val
)
)
query = query.where(source_filter)
count_query = count_query.where(source_filter)
# Filter by date (for efficient pagination of older items)
if before_date:
try:

View file

@ -291,100 +291,128 @@ export function InboxSidebar({
activeTab === "comments" ? (mentions.hasMore ?? false) : (status.hasMore ?? false);
const loadMore = activeTab === "comments" ? mentions.loadMore : status.loadMore;
// Get unique source types (connectors + document types) from status items for filtering
// Fetch ALL source types from the backend so the filter shows every connector/document
// type the user has notifications for, regardless of how many items are loaded via pagination.
const { data: sourceTypesData } = useQuery({
queryKey: cacheKeys.notifications.sourceTypes(searchSpaceId),
queryFn: () => notificationsApiService.getSourceTypes(searchSpaceId ?? undefined),
staleTime: 60 * 1000,
enabled: open && activeTab === "status",
});
const statusSourceOptions = useMemo(() => {
const sources: Array<{
key: string;
type: string;
category: "connector" | "document";
displayName: string;
}> = [];
const seenConnectors = new Set<string>();
const seenDocTypes = new Set<string>();
if (!sourceTypesData?.sources) return [];
for (const item of statusItems) {
if (item.type === "connector_indexing" && isConnectorIndexingMetadata(item.metadata)) {
const ct = item.metadata.connector_type;
if (!seenConnectors.has(ct)) {
seenConnectors.add(ct);
sources.push({
key: `connector:${ct}`,
type: ct,
category: "connector",
displayName: getConnectorTypeDisplayName(ct),
});
}
} else if (
item.type === "document_processing" &&
isDocumentProcessingMetadata(item.metadata)
) {
const dt = item.metadata.document_type;
if (!seenDocTypes.has(dt)) {
seenDocTypes.add(dt);
sources.push({
key: `doctype:${dt}`,
type: dt,
category: "document",
displayName: getDocumentTypeLabel(dt),
});
}
}
}
return sources;
}, [statusItems]);
return sourceTypesData.sources.map((source) => ({
key: source.key,
type: source.type,
category: source.category,
displayName:
source.category === "connector"
? getConnectorTypeDisplayName(source.type)
: getDocumentTypeLabel(source.type),
}));
}, [sourceTypesData]);
// Get items for current tab
const displayItems = activeTab === "comments" ? commentsItems : statusItems;
// Filter items based on filter type, connector filter, and search mode
// When searching: use server-side API results (searches ALL notifications)
// When not searching: use Electric real-time items (fast, local)
const filteredItems = useMemo(() => {
// In search mode, use API results
let items: InboxItem[] = isSearchMode ? (searchResponse?.items ?? []) : displayItems;
// When a source filter is active, fetch matching items from the API so
// older items (outside the Electric sync window) are included.
const isSourceFilterMode = activeTab === "status" && !!selectedSource;
const { data: sourceFilterResponse, isLoading: isSourceFilterLoading } = useQuery({
queryKey: cacheKeys.notifications.bySourceType(searchSpaceId, selectedSource ?? ""),
queryFn: () =>
notificationsApiService.getNotifications({
queryParams: {
search_space_id: searchSpaceId ?? undefined,
source_type: selectedSource ?? undefined,
limit: 50,
},
}),
staleTime: 30 * 1000,
enabled: isSourceFilterMode && open && !isSearchMode,
});
// For status tab search results, filter to status-specific types
if (isSearchMode && activeTab === "status") {
items = items.filter(
(item) =>
item.type === "connector_indexing" ||
item.type === "document_processing" ||
item.type === "page_limit_exceeded" ||
item.type === "connector_deletion"
// Client-side matcher: checks if an item matches the active source filter
const matchesSourceFilter = useCallback(
(item: InboxItem): boolean => {
if (!selectedSource) return true;
if (selectedSource.startsWith("connector:")) {
const connectorType = selectedSource.slice("connector:".length);
return (
item.type === "connector_indexing" &&
isConnectorIndexingMetadata(item.metadata) &&
item.metadata.connector_type === connectorType
);
}
if (selectedSource.startsWith("doctype:")) {
const docType = selectedSource.slice("doctype:".length);
return (
item.type === "document_processing" &&
isDocumentProcessingMetadata(item.metadata) &&
item.metadata.document_type === docType
);
}
return true;
},
[selectedSource]
);
// Filter items based on filter type, connector filter, and search mode
// Three data paths:
// 1. Search mode → server-side search results
// 2. Source filter mode → API results merged with real-time Electric items
// 3. Default → Electric real-time items (fast, local)
const filteredItems = useMemo(() => {
let items: InboxItem[];
if (isSearchMode) {
items = searchResponse?.items ?? [];
if (activeTab === "status") {
items = items.filter(
(item) =>
item.type === "connector_indexing" ||
item.type === "document_processing" ||
item.type === "page_limit_exceeded" ||
item.type === "connector_deletion"
);
}
} else if (isSourceFilterMode) {
// Merge API results (covers older items) with Electric real-time items
// that match the filter (covers brand-new items arriving in real-time).
const apiItems = sourceFilterResponse?.items ?? [];
const realtimeMatching = statusItems.filter(matchesSourceFilter);
const seen = new Set(apiItems.map((i) => i.id));
const merged = [...apiItems];
for (const item of realtimeMatching) {
if (!seen.has(item.id)) {
merged.push(item);
}
}
items = merged.sort(
(a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
);
} else {
items = displayItems;
}
// Apply read/unread filter
if (activeFilter === "unread") {
items = items.filter((item) => !item.read);
}
// Apply source filter (connector type or document type, for status tab only)
if (activeTab === "status" && selectedSource) {
items = items.filter((item) => {
if (selectedSource.startsWith("connector:")) {
const connectorType = selectedSource.slice("connector:".length);
return (
item.type === "connector_indexing" &&
isConnectorIndexingMetadata(item.metadata) &&
item.metadata.connector_type === connectorType
);
}
if (selectedSource.startsWith("doctype:")) {
const docType = selectedSource.slice("doctype:".length);
return (
item.type === "document_processing" &&
isDocumentProcessingMetadata(item.metadata) &&
item.metadata.document_type === docType
);
}
return true;
});
}
return items;
}, [displayItems, searchResponse, isSearchMode, activeFilter, activeTab, selectedSource]);
}, [
displayItems,
statusItems,
searchResponse,
sourceFilterResponse,
isSearchMode,
isSourceFilterMode,
matchesSourceFilter,
activeFilter,
activeTab,
]);
// Intersection Observer for infinite scroll with prefetching
// Re-runs when active tab changes so each tab gets its own pagination
@ -953,7 +981,7 @@ export function InboxSidebar({
</Tabs>
<div className="flex-1 overflow-y-auto overflow-x-hidden p-2">
{(isSearchMode ? isSearchLoading : loading) ? (
{(isSearchMode ? isSearchLoading : isSourceFilterMode ? isSourceFilterLoading : loading) ? (
<div className="space-y-2">
{activeTab === "comments"
? /* Comments skeleton: avatar + two-line text + time */

View file

@ -204,6 +204,7 @@ export const getNotificationsRequest = z.object({
queryParams: z.object({
search_space_id: z.number().optional(),
type: inboxItemTypeEnum.optional(),
source_type: z.string().optional(),
before_date: z.string().optional(),
search: z.string().optional(),
limit: z.number().min(1).max(100).optional(),
@ -261,6 +262,20 @@ export const getUnreadCountResponse = z.object({
recent_unread: z.number(), // Within SYNC_WINDOW_DAYS (14 days)
});
/**
* Response schema for notification source types (status tab filter)
*/
export const sourceTypeItem = z.object({
key: z.string(),
type: z.string(),
category: z.enum(["connector", "document"]),
count: z.number(),
});
export const getSourceTypesResponse = z.object({
sources: z.array(sourceTypeItem),
});
// =============================================================================
// Type Guards for Metadata
// =============================================================================
@ -387,3 +402,5 @@ export type MarkNotificationReadResponse = z.infer<typeof markNotificationReadRe
export type MarkAllNotificationsReadResponse = z.infer<typeof markAllNotificationsReadResponse>;
export type GetUnreadCountRequest = z.infer<typeof getUnreadCountRequest>;
export type GetUnreadCountResponse = z.infer<typeof getUnreadCountResponse>;
export type SourceTypeItem = z.infer<typeof sourceTypeItem>;
export type GetSourceTypesResponse = z.infer<typeof getSourceTypesResponse>;

View file

@ -1,9 +1,11 @@
import {
type GetNotificationsRequest,
type GetNotificationsResponse,
type GetSourceTypesResponse,
type GetUnreadCountResponse,
getNotificationsRequest,
getNotificationsResponse,
getSourceTypesResponse,
getUnreadCountResponse,
type InboxItemTypeEnum,
type MarkAllNotificationsReadResponse,
@ -42,6 +44,9 @@ class NotificationsApiService {
if (queryParams.type) {
params.append("type", queryParams.type);
}
if (queryParams.source_type) {
params.append("source_type", queryParams.source_type);
}
if (queryParams.before_date) {
params.append("before_date", queryParams.before_date);
}
@ -92,6 +97,23 @@ class NotificationsApiService {
return baseApiService.patch("/api/v1/notifications/read-all", markAllNotificationsReadResponse);
};
/**
* Get distinct source types (connector + document types) across all
* status notifications. Used to populate the inbox Status tab filter.
*/
getSourceTypes = async (searchSpaceId?: number): Promise<GetSourceTypesResponse> => {
const params = new URLSearchParams();
if (searchSpaceId !== undefined) {
params.append("search_space_id", String(searchSpaceId));
}
const queryString = params.toString();
return baseApiService.get(
`/api/v1/notifications/source-types${queryString ? `?${queryString}` : ""}`,
getSourceTypesResponse
);
};
/**
* Get unread notification count with split between total and recent
* - total_unread: All unread notifications

View file

@ -96,5 +96,9 @@ export const cacheKeys = {
notifications: {
search: (searchSpaceId: number | null, search: string, tab: string) =>
["notifications", "search", searchSpaceId, search, tab] as const,
sourceTypes: (searchSpaceId: number | null) =>
["notifications", "source-types", searchSpaceId] as const,
bySourceType: (searchSpaceId: number | null, sourceType: string) =>
["notifications", "by-source-type", searchSpaceId, sourceType] as const,
},
};