From 9575b9d4dbb1082cb925eb3e931d792adef6dd47 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 3 Jun 2026 18:04:47 +0200 Subject: [PATCH] refactor: add notifications module --- .../app/notifications/__init__.py | 15 + .../app/notifications/api/__init__.py | 7 + .../app/notifications/api/api.py | 412 ++++++++++++++++++ .../app/notifications/api/schemas.py | 81 ++++ .../app/notifications/constants.py | 17 + .../app/notifications/persistence/__init__.py | 7 + .../app/notifications/persistence/models.py | 72 +++ .../app/notifications/service/__init__.py | 7 + .../app/notifications/service/base.py | 132 ++++++ .../app/notifications/service/facade.py | 55 +++ .../service/handlers/__init__.py | 17 + .../service/handlers/comment_reply.py | 107 +++++ .../service/handlers/connector_indexing.py | 288 ++++++++++++ .../service/handlers/document_processing.py | 137 ++++++ .../notifications/service/handlers/mention.py | 105 +++++ .../service/handlers/page_limit.py | 80 ++++ surfsense_backend/app/notifications/types.py | 16 + 17 files changed, 1555 insertions(+) create mode 100644 surfsense_backend/app/notifications/__init__.py create mode 100644 surfsense_backend/app/notifications/api/__init__.py create mode 100644 surfsense_backend/app/notifications/api/api.py create mode 100644 surfsense_backend/app/notifications/api/schemas.py create mode 100644 surfsense_backend/app/notifications/constants.py create mode 100644 surfsense_backend/app/notifications/persistence/__init__.py create mode 100644 surfsense_backend/app/notifications/persistence/models.py create mode 100644 surfsense_backend/app/notifications/service/__init__.py create mode 100644 surfsense_backend/app/notifications/service/base.py create mode 100644 surfsense_backend/app/notifications/service/facade.py create mode 100644 surfsense_backend/app/notifications/service/handlers/__init__.py create mode 100644 surfsense_backend/app/notifications/service/handlers/comment_reply.py create mode 100644 surfsense_backend/app/notifications/service/handlers/connector_indexing.py create mode 100644 surfsense_backend/app/notifications/service/handlers/document_processing.py create mode 100644 surfsense_backend/app/notifications/service/handlers/mention.py create mode 100644 surfsense_backend/app/notifications/service/handlers/page_limit.py create mode 100644 surfsense_backend/app/notifications/types.py diff --git a/surfsense_backend/app/notifications/__init__.py b/surfsense_backend/app/notifications/__init__.py new file mode 100644 index 000000000..e29c58c0c --- /dev/null +++ b/surfsense_backend/app/notifications/__init__.py @@ -0,0 +1,15 @@ +"""User notifications: persistence, service, and HTTP API. + +Emit notifications via :class:`~app.notifications.service.NotificationService`; +the router in :mod:`app.notifications.api` exposes the inbox endpoints. +""" + +from __future__ import annotations + +from app.notifications.persistence import Notification +from app.notifications.service import NotificationService + +__all__ = [ + "Notification", + "NotificationService", +] diff --git a/surfsense_backend/app/notifications/api/__init__.py b/surfsense_backend/app/notifications/api/__init__.py new file mode 100644 index 000000000..2708c8805 --- /dev/null +++ b/surfsense_backend/app/notifications/api/__init__.py @@ -0,0 +1,7 @@ +"""Notifications HTTP API.""" + +from __future__ import annotations + +from app.notifications.api.api import router + +__all__ = ["router"] diff --git a/surfsense_backend/app/notifications/api/api.py b/surfsense_backend/app/notifications/api/api.py new file mode 100644 index 000000000..60bd25f5d --- /dev/null +++ b/surfsense_backend/app/notifications/api/api.py @@ -0,0 +1,412 @@ +"""HTTP routes for the notifications inbox (list, counts, mark-read).""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import case, desc, func, literal, literal_column, select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import User, get_async_session +from app.notifications.api.schemas import ( + BatchUnreadCountResponse, + CategoryUnreadCount, + MarkAllReadResponse, + MarkReadResponse, + NotificationListResponse, + NotificationResponse, + SourceTypeItem, + SourceTypesResponse, + UnreadCountResponse, +) +from app.notifications.constants import CATEGORY_TYPES, SYNC_WINDOW_DAYS +from app.notifications.persistence import Notification +from app.notifications.types import NotificationCategory, NotificationType +from app.users import current_active_user + +router = APIRouter(prefix="/notifications", tags=["notifications"]) + + +@router.get("/unread-counts-batch", response_model=BatchUnreadCountResponse) +async def get_unread_counts_batch( + search_space_id: int | None = Query(None, description="Filter by search space ID"), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> BatchUnreadCountResponse: + """Unread counts for every category in a single query.""" + cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS) + + base_filter = [ + Notification.user_id == user.id, + Notification.read == False, # noqa: E712 + ] + + if search_space_id is not None: + # Include global (null search-space) notifications. + base_filter.append( + (Notification.search_space_id == search_space_id) + | (Notification.search_space_id.is_(None)) + ) + + is_comments = Notification.type.in_(CATEGORY_TYPES["comments"]) + is_status = Notification.type.in_(CATEGORY_TYPES["status"]) + is_recent = Notification.created_at > cutoff_date + + query = select( + func.count(case((is_comments, Notification.id))).label("comments_total"), + func.count(case((is_comments & is_recent, Notification.id))).label( + "comments_recent" + ), + func.count(case((is_status, Notification.id))).label("status_total"), + func.count(case((is_status & is_recent, Notification.id))).label( + "status_recent" + ), + ).where(*base_filter) + + result = await session.execute(query) + row = result.one() + + return BatchUnreadCountResponse( + comments=CategoryUnreadCount( + total_unread=row.comments_total, + recent_unread=row.comments_recent, + ), + status=CategoryUnreadCount( + total_unread=row.status_total, + recent_unread=row.status_recent, + ), + ) + + +@router.get("/source-types", response_model=SourceTypesResponse) +async def get_notification_source_types( + search_space_id: int | None = Query(None, description="Filter by search space ID"), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> SourceTypesResponse: + """Distinct connector/document source types for the Status tab filter.""" + base_filter = [Notification.user_id == user.id] + + if search_space_id is not None: + # Include global (null search-space) notifications. + base_filter.append( + (Notification.search_space_id == search_space_id) + | (Notification.search_space_id.is_(None)) + ) + + connector_type_expr = Notification.notification_metadata["connector_type"].astext + connector_query = ( + select( + connector_type_expr.label("source_type"), + literal("connector").label("category"), + func.count(Notification.id).label("cnt"), + ) + .where( + *base_filter, + Notification.type.in_(("connector_indexing", "connector_deletion")), + connector_type_expr.isnot(None), + ) + .group_by(literal_column("source_type")) + ) + + document_type_expr = Notification.notification_metadata["document_type"].astext + document_query = ( + select( + document_type_expr.label("source_type"), + literal("document").label("category"), + func.count(Notification.id).label("cnt"), + ) + .where( + *base_filter, + Notification.type.in_(("document_processing",)), + document_type_expr.isnot(None), + ) + .group_by(literal_column("source_type")) + ) + + connector_result = await session.execute(connector_query) + document_result = await session.execute(document_query) + + sources = [] + for source_type, category, count in [ + *connector_result.all(), + *document_result.all(), + ]: + if not source_type: + continue + sources.append( + SourceTypeItem( + key=f"{category}:{source_type}", + type=source_type, + category=category, + count=count, + ) + ) + + return SourceTypesResponse(sources=sources) + + +@router.get("/unread-count", response_model=UnreadCountResponse) +async def get_unread_count( + search_space_id: int | None = Query(None, description="Filter by search space ID"), + type_filter: NotificationType | None = Query( + None, alias="type", description="Filter by notification type" + ), + category: NotificationCategory | None = Query( + None, description="Filter by category: 'comments' or 'status'" + ), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> UnreadCountResponse: + """Total and recent (within sync window) unread counts for the user. + + Returning both lets a client hold the older count static while + live-syncing the recent ones. + """ + cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS) + + base_filter = [ + Notification.user_id == user.id, + Notification.read == False, # noqa: E712 + ] + + if search_space_id is not None: + # Include global (null search-space) notifications. + base_filter.append( + (Notification.search_space_id == search_space_id) + | (Notification.search_space_id.is_(None)) + ) + + if type_filter: + base_filter.append(Notification.type == type_filter) + + if category: + base_filter.append(Notification.type.in_(CATEGORY_TYPES[category])) + + total_query = select(func.count(Notification.id)).where(*base_filter) + total_result = await session.execute(total_query) + total_unread = total_result.scalar() or 0 + + recent_query = select(func.count(Notification.id)).where( + *base_filter, + Notification.created_at > cutoff_date, + ) + recent_result = await session.execute(recent_query) + recent_unread = recent_result.scalar() or 0 + + return UnreadCountResponse( + total_unread=total_unread, + recent_unread=recent_unread, + ) + + +@router.get("", response_model=NotificationListResponse) +async def list_notifications( + search_space_id: int | None = Query(None, description="Filter by search space ID"), + type_filter: NotificationType | None = Query( + None, alias="type", description="Filter by notification type" + ), + category: NotificationCategory | None = Query( + None, description="Filter by category: 'comments' or 'status'" + ), + source_type: str | None = Query( + None, + description="Filter by source type, e.g. 'connector:GITHUB_CONNECTOR' or 'doctype:FILE'", + ), + filter: str | None = Query( + None, + description="Filter preset: 'unread' for unread only, 'errors' for failed/error items only", + ), + before_date: str | None = Query( + None, description="Get notifications before this ISO date (for pagination)" + ), + search: str | None = Query( + None, description="Search notifications by title or message (case-insensitive)" + ), + limit: int = Query(50, ge=1, le=100, description="Number of items to return"), + offset: int = Query(0, ge=0, description="Number of items to skip"), + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> NotificationListResponse: + """Paginated inbox fallback for items outside the Zero sync window.""" + query = select(Notification).where(Notification.user_id == user.id) + count_query = select(func.count(Notification.id)).where( + Notification.user_id == user.id + ) + + if search_space_id is not None: + # Include global (null search-space) notifications. + query = query.where( + (Notification.search_space_id == search_space_id) + | (Notification.search_space_id.is_(None)) + ) + count_query = count_query.where( + (Notification.search_space_id == search_space_id) + | (Notification.search_space_id.is_(None)) + ) + + if type_filter: + query = query.where(Notification.type == type_filter) + count_query = count_query.where(Notification.type == type_filter) + + if category: + cat_types = CATEGORY_TYPES[category] + query = query.where(Notification.type.in_(cat_types)) + count_query = count_query.where(Notification.type.in_(cat_types)) + + # source_type encodes the JSONB facet to match: 'connector:' or 'doctype:'. + if source_type: + if source_type.startswith("connector:"): + connector_val = source_type[len("connector:") :] + source_filter = Notification.type.in_( + ("connector_indexing", "connector_deletion") + ) & ( + Notification.notification_metadata["connector_type"].astext + == connector_val + ) + query = query.where(source_filter) + count_query = count_query.where(source_filter) + elif source_type.startswith("doctype:"): + doctype_val = source_type[len("doctype:") :] + source_filter = Notification.type.in_(("document_processing",)) & ( + Notification.notification_metadata["document_type"].astext + == doctype_val + ) + query = query.where(source_filter) + count_query = count_query.where(source_filter) + + if filter == "unread": + unread_filter = Notification.read == False # noqa: E712 + query = query.where(unread_filter) + count_query = count_query.where(unread_filter) + elif filter == "errors": + error_filter = (Notification.type == "page_limit_exceeded") | ( + Notification.notification_metadata["status"].astext == "failed" + ) + query = query.where(error_filter) + count_query = count_query.where(error_filter) + + if before_date: + try: + before_datetime = datetime.fromisoformat(before_date.replace("Z", "+00:00")) + query = query.where(Notification.created_at < before_datetime) + count_query = count_query.where(Notification.created_at < before_datetime) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid date format. Use ISO format (e.g., 2024-01-15T00:00:00Z)", + ) from None + + if search: + search_term = f"%{search}%" + search_filter = Notification.title.ilike( + search_term + ) | Notification.message.ilike(search_term) + query = query.where(search_filter) + count_query = count_query.where(search_filter) + + total_result = await session.execute(count_query) + total = total_result.scalar() or 0 + + # Over-fetch by one to tell whether another page exists. + query = ( + query.order_by(desc(Notification.created_at)).offset(offset).limit(limit + 1) + ) + + result = await session.execute(query) + notifications = result.scalars().all() + + has_more = len(notifications) > limit + if has_more: + notifications = notifications[:limit] + + items = [] + for notification in notifications: + items.append( + NotificationResponse( + id=notification.id, + user_id=str(notification.user_id), + search_space_id=notification.search_space_id, + type=notification.type, + title=notification.title, + message=notification.message, + read=notification.read, + metadata=notification.notification_metadata or {}, + created_at=notification.created_at.isoformat() + if notification.created_at + else "", + updated_at=notification.updated_at.isoformat() + if notification.updated_at + else None, + ) + ) + + return NotificationListResponse( + items=items, + total=total, + has_more=has_more, + next_offset=offset + limit if has_more else None, + ) + + +@router.patch("/{notification_id}/read", response_model=MarkReadResponse) +async def mark_notification_as_read( + notification_id: int, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> MarkReadResponse: + """Mark one of the user's notifications read; Zero syncs the change.""" + # Scope to the caller's own notifications. + result = await session.execute( + select(Notification).where( + Notification.id == notification_id, + Notification.user_id == user.id, + ) + ) + notification = result.scalar_one_or_none() + + if not notification: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Notification not found", + ) + + if notification.read: + return MarkReadResponse( + success=True, + message="Notification already marked as read", + ) + + notification.read = True + await session.commit() + + return MarkReadResponse( + success=True, + message="Notification marked as read", + ) + + +@router.patch("/read-all", response_model=MarkAllReadResponse) +async def mark_all_notifications_as_read( + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> MarkAllReadResponse: + """Mark all of the user's notifications read; Zero syncs the changes.""" + result = await session.execute( + update(Notification) + .where( + Notification.user_id == user.id, + Notification.read == False, # noqa: E712 + ) + .values(read=True) + ) + await session.commit() + + updated_count = result.rowcount + + return MarkAllReadResponse( + success=True, + message=f"Marked {updated_count} notification(s) as read", + updated_count=updated_count, + ) diff --git a/surfsense_backend/app/notifications/api/schemas.py b/surfsense_backend/app/notifications/api/schemas.py new file mode 100644 index 000000000..727e5485a --- /dev/null +++ b/surfsense_backend/app/notifications/api/schemas.py @@ -0,0 +1,81 @@ +"""Response shapes for the notifications API.""" + +from __future__ import annotations + +from pydantic import BaseModel + + +class NotificationResponse(BaseModel): + """A single notification.""" + + id: int + user_id: str + search_space_id: int | None + type: str + title: str + message: str + read: bool + metadata: dict + created_at: str + updated_at: str | None + + class Config: + from_attributes = True + + +class NotificationListResponse(BaseModel): + """A page of notifications.""" + + items: list[NotificationResponse] + total: int + has_more: bool + next_offset: int | None + + +class MarkReadResponse(BaseModel): + """Outcome of marking one notification read.""" + + success: bool + message: str + + +class MarkAllReadResponse(BaseModel): + """Outcome of marking every notification read.""" + + success: bool + message: str + updated_count: int + + +class SourceTypeItem(BaseModel): + """A source type with its category and count.""" + + key: str + type: str + category: str # "connector" or "document" + count: int + + +class SourceTypesResponse(BaseModel): + """Source types available for the Status tab filter.""" + + sources: list[SourceTypeItem] + + +class UnreadCountResponse(BaseModel): + """Unread totals, split by sync-window recency.""" + + total_unread: int + recent_unread: int + + +class CategoryUnreadCount(BaseModel): + total_unread: int + recent_unread: int + + +class BatchUnreadCountResponse(BaseModel): + """Per-category unread counts in one response.""" + + comments: CategoryUnreadCount + status: CategoryUnreadCount diff --git a/surfsense_backend/app/notifications/constants.py b/surfsense_backend/app/notifications/constants.py new file mode 100644 index 000000000..e8bd8391d --- /dev/null +++ b/surfsense_backend/app/notifications/constants.py @@ -0,0 +1,17 @@ +"""Notification policy constants.""" + +from __future__ import annotations + +# Notifications newer than this are live-synced; older ones load via the list endpoint. +SYNC_WINDOW_DAYS = 14 + +# Maps an inbox tab to the notification types it shows. +CATEGORY_TYPES: dict[str, tuple[str, ...]] = { + "comments": ("new_mention", "comment_reply"), + "status": ( + "connector_indexing", + "connector_deletion", + "document_processing", + "page_limit_exceeded", + ), +} diff --git a/surfsense_backend/app/notifications/persistence/__init__.py b/surfsense_backend/app/notifications/persistence/__init__.py new file mode 100644 index 000000000..82f9e6f01 --- /dev/null +++ b/surfsense_backend/app/notifications/persistence/__init__.py @@ -0,0 +1,7 @@ +"""Notification persistence models.""" + +from __future__ import annotations + +from .models import Notification + +__all__ = ["Notification"] diff --git a/surfsense_backend/app/notifications/persistence/models.py b/surfsense_backend/app/notifications/persistence/models.py new file mode 100644 index 000000000..557c4bf17 --- /dev/null +++ b/surfsense_backend/app/notifications/persistence/models.py @@ -0,0 +1,72 @@ +"""Per-user inbox notifications, synced to clients via Zero.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from sqlalchemy import ( + TIMESTAMP, + Boolean, + Column, + ForeignKey, + Index, + Integer, + String, + Text, + text, +) +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import relationship + +from app.db import BaseModel, TimestampMixin + + +class Notification(BaseModel, TimestampMixin): + __tablename__ = "notifications" + __table_args__ = ( + # Serves unread-count queries. + Index( + "ix_notifications_user_read_type_created", + "user_id", + "read", + "type", + "created_at", + ), + # Serves the paginated inbox list query. + Index( + "ix_notifications_user_space_created", + "user_id", + "search_space_id", + "created_at", + ), + ) + + user_id = Column( + UUID(as_uuid=True), + ForeignKey("user.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + search_space_id = Column( + Integer, + ForeignKey("searchspaces.id", ondelete="CASCADE"), + nullable=True, + index=True, + ) + type = Column(String(50), nullable=False, index=True) + title = Column(String(200), nullable=False) + message = Column(Text, nullable=False) + read = Column( + Boolean, nullable=False, default=False, server_default=text("false"), index=True + ) + notification_metadata = Column("metadata", JSONB, nullable=True, default={}) + updated_at = Column( + TIMESTAMP(timezone=True), + nullable=True, + default=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + index=True, + ) + + user = relationship("User", back_populates="notifications") + search_space = relationship("SearchSpace", back_populates="notifications") diff --git a/surfsense_backend/app/notifications/service/__init__.py b/surfsense_backend/app/notifications/service/__init__.py new file mode 100644 index 000000000..8cb8491ac --- /dev/null +++ b/surfsense_backend/app/notifications/service/__init__.py @@ -0,0 +1,7 @@ +"""Notification creation/update service.""" + +from __future__ import annotations + +from app.notifications.service.facade import NotificationService + +__all__ = ["NotificationService"] diff --git a/surfsense_backend/app/notifications/service/base.py b/surfsense_backend/app/notifications/service/base.py new file mode 100644 index 000000000..7d239ee3a --- /dev/null +++ b/surfsense_backend/app/notifications/service/base.py @@ -0,0 +1,132 @@ +"""Shared find/upsert/update logic for a single notification type.""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime +from typing import Any +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.notifications.persistence import Notification + +logger = logging.getLogger(__name__) + + +class BaseNotificationHandler: + """Find, upsert, and update notifications of one ``type``.""" + + def __init__(self, notification_type: str): + self.notification_type = notification_type + + async def find_notification_by_operation( + self, + session: AsyncSession, + user_id: UUID, + operation_id: str, + search_space_id: int | None = None, + ) -> Notification | None: + """Return the notification for ``operation_id``, if one exists.""" + query = select(Notification).where( + Notification.user_id == user_id, + Notification.type == self.notification_type, + Notification.notification_metadata["operation_id"].astext == operation_id, + ) + if search_space_id is not None: + query = query.where(Notification.search_space_id == search_space_id) + + result = await session.execute(query) + return result.scalar_one_or_none() + + async def find_or_create_notification( + self, + session: AsyncSession, + user_id: UUID, + operation_id: str, + title: str, + message: str, + search_space_id: int | None = None, + initial_metadata: dict[str, Any] | None = None, + ) -> Notification: + """Upsert a notification keyed by ``operation_id``.""" + notification = await self.find_notification_by_operation( + session, user_id, operation_id, search_space_id + ) + + if notification: + notification.title = title + notification.message = message + if initial_metadata: + notification.notification_metadata = { + **notification.notification_metadata, + **initial_metadata, + } + # Tell SQLAlchemy the JSONB dict changed in place. + flag_modified(notification, "notification_metadata") + await session.commit() + await session.refresh(notification) + logger.info( + f"Updated notification {notification.id} for operation {operation_id}" + ) + return notification + + metadata = initial_metadata or {} + metadata["operation_id"] = operation_id + metadata["status"] = "in_progress" + metadata["started_at"] = datetime.now(UTC).isoformat() + + notification = Notification( + user_id=user_id, + search_space_id=search_space_id, + type=self.notification_type, + title=title, + message=message, + notification_metadata=metadata, + ) + session.add(notification) + await session.commit() + await session.refresh(notification) + logger.info( + f"Created notification {notification.id} for operation {operation_id}" + ) + return notification + + async def update_notification( + self, + session: AsyncSession, + notification: Notification, + title: str | None = None, + message: str | None = None, + status: str | None = None, + metadata_updates: dict[str, Any] | None = None, + ) -> Notification: + """Apply field/status/metadata changes and persist.""" + if title is not None: + notification.title = title + if message is not None: + notification.message = message + + if status is not None: + notification.notification_metadata["status"] = status + if status in ("completed", "failed"): + notification.notification_metadata["completed_at"] = datetime.now( + UTC + ).isoformat() + # Tell SQLAlchemy the JSONB dict changed in place. + flag_modified(notification, "notification_metadata") + + if metadata_updates: + notification.notification_metadata = { + **notification.notification_metadata, + **metadata_updates, + } + # Tell SQLAlchemy the JSONB dict changed in place. + flag_modified(notification, "notification_metadata") + + await session.commit() + await session.refresh(notification) + logger.info(f"Updated notification {notification.id}") + return notification diff --git a/surfsense_backend/app/notifications/service/facade.py b/surfsense_backend/app/notifications/service/facade.py new file mode 100644 index 000000000..63154301c --- /dev/null +++ b/surfsense_backend/app/notifications/service/facade.py @@ -0,0 +1,55 @@ +"""Single entry point that composes the per-type notification handlers.""" + +from __future__ import annotations + +import logging +from typing import Any +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.notifications.persistence import Notification +from app.notifications.service.handlers import ( + CommentReplyNotificationHandler, + ConnectorIndexingNotificationHandler, + DocumentProcessingNotificationHandler, + MentionNotificationHandler, + PageLimitNotificationHandler, +) + +logger = logging.getLogger(__name__) + + +class NotificationService: + """Facade over the per-type handlers; mutations sync via Zero.""" + + connector_indexing = ConnectorIndexingNotificationHandler() + document_processing = DocumentProcessingNotificationHandler() + mention = MentionNotificationHandler() + comment_reply = CommentReplyNotificationHandler() + page_limit = PageLimitNotificationHandler() + + @staticmethod + async def create_notification( + session: AsyncSession, + user_id: UUID, + notification_type: str, + title: str, + message: str, + search_space_id: int | None = None, + notification_metadata: dict[str, Any] | None = None, + ) -> Notification: + """Create a generic notification of any ``notification_type``.""" + notification = Notification( + user_id=user_id, + search_space_id=search_space_id, + type=notification_type, + title=title, + message=message, + notification_metadata=notification_metadata or {}, + ) + session.add(notification) + await session.commit() + await session.refresh(notification) + logger.info(f"Created notification {notification.id} for user {user_id}") + return notification diff --git a/surfsense_backend/app/notifications/service/handlers/__init__.py b/surfsense_backend/app/notifications/service/handlers/__init__.py new file mode 100644 index 000000000..8c32dea3b --- /dev/null +++ b/surfsense_backend/app/notifications/service/handlers/__init__.py @@ -0,0 +1,17 @@ +"""Per-type notification handlers.""" + +from __future__ import annotations + +from .comment_reply import CommentReplyNotificationHandler +from .connector_indexing import ConnectorIndexingNotificationHandler +from .document_processing import DocumentProcessingNotificationHandler +from .mention import MentionNotificationHandler +from .page_limit import PageLimitNotificationHandler + +__all__ = [ + "CommentReplyNotificationHandler", + "ConnectorIndexingNotificationHandler", + "DocumentProcessingNotificationHandler", + "MentionNotificationHandler", + "PageLimitNotificationHandler", +] diff --git a/surfsense_backend/app/notifications/service/handlers/comment_reply.py b/surfsense_backend/app/notifications/service/handlers/comment_reply.py new file mode 100644 index 000000000..9457125b1 --- /dev/null +++ b/surfsense_backend/app/notifications/service/handlers/comment_reply.py @@ -0,0 +1,107 @@ +"""Notifications for replies to a user's comments.""" + +from __future__ import annotations + +import logging +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.notifications.persistence import Notification +from app.notifications.service.base import BaseNotificationHandler + +logger = logging.getLogger(__name__) + + +class CommentReplyNotificationHandler(BaseNotificationHandler): + """Notifications for replies to a user's comments.""" + + def __init__(self): + super().__init__("comment_reply") + + async def find_notification_by_reply( + self, + session: AsyncSession, + reply_id: int, + user_id: UUID, + ) -> Notification | None: + query = select(Notification).where( + Notification.type == self.notification_type, + Notification.user_id == user_id, + Notification.notification_metadata["reply_id"].astext == str(reply_id), + ) + result = await session.execute(query) + return result.scalar_one_or_none() + + async def notify_comment_reply( + self, + session: AsyncSession, + user_id: UUID, + reply_id: int, + parent_comment_id: int, + message_id: int, + thread_id: int, + thread_title: str, + author_id: str, + author_name: str, + author_avatar_url: str | None, + author_email: str, + content_preview: str, + search_space_id: int, + ) -> Notification: + """Notify of a reply; idempotent on ``reply_id`` per user.""" + existing = await self.find_notification_by_reply(session, reply_id, user_id) + if existing: + logger.info( + f"Notification already exists for reply {reply_id} to user {user_id}" + ) + return existing + + title = f"{author_name} replied in a thread" + message = content_preview[:100] + ("..." if len(content_preview) > 100 else "") + + metadata = { + "reply_id": reply_id, + "parent_comment_id": parent_comment_id, + "message_id": message_id, + "thread_id": thread_id, + "thread_title": thread_title, + "author_id": author_id, + "author_name": author_name, + "author_avatar_url": author_avatar_url, + "author_email": author_email, + "content_preview": content_preview[:200], + } + + try: + notification = Notification( + user_id=user_id, + search_space_id=search_space_id, + type=self.notification_type, + title=title, + message=message, + notification_metadata=metadata, + ) + session.add(notification) + await session.commit() + await session.refresh(notification) + logger.info( + f"Created comment_reply notification {notification.id} for user {user_id}" + ) + return notification + except Exception as e: + await session.rollback() + if ( + "duplicate key" in str(e).lower() + or "unique constraint" in str(e).lower() + ): + logger.warning( + f"Duplicate notification for reply {reply_id} to user {user_id}" + ) + existing = await self.find_notification_by_reply( + session, reply_id, user_id + ) + if existing: + return existing + raise diff --git a/surfsense_backend/app/notifications/service/handlers/connector_indexing.py b/surfsense_backend/app/notifications/service/handlers/connector_indexing.py new file mode 100644 index 000000000..d75e7e6fc --- /dev/null +++ b/surfsense_backend/app/notifications/service/handlers/connector_indexing.py @@ -0,0 +1,288 @@ +"""Notifications for connector indexing runs.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.notifications.persistence import Notification +from app.notifications.service.base import BaseNotificationHandler + + +class ConnectorIndexingNotificationHandler(BaseNotificationHandler): + """Notifications for connector indexing runs.""" + + def __init__(self): + super().__init__("connector_indexing") + + def _generate_operation_id( + self, + connector_id: int, + start_date: str | None = None, + end_date: str | None = None, + ) -> str: + """Build a unique id for a connector indexing run.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") + date_range = "" + if start_date or end_date: + date_range = f"_{start_date or 'none'}_{end_date or 'none'}" + return f"connector_{connector_id}_{timestamp}{date_range}" + + def _generate_google_drive_operation_id( + self, connector_id: int, folder_count: int, file_count: int + ) -> str: + """Build a unique id for a Google Drive indexing run.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") + items_info = f"_{folder_count}f_{file_count}files" + return f"drive_{connector_id}_{timestamp}{items_info}" + + async def notify_indexing_started( + self, + session: AsyncSession, + user_id: UUID, + connector_id: int, + connector_name: str, + connector_type: str, + search_space_id: int, + start_date: str | None = None, + end_date: str | None = None, + ) -> Notification: + """Open (or refresh) the notification when indexing starts.""" + operation_id = self._generate_operation_id(connector_id, start_date, end_date) + title = f"Syncing: {connector_name}" + message = "Connecting to your account" + + metadata = { + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "start_date": start_date, + "end_date": end_date, + "indexed_count": 0, + "sync_stage": "connecting", + } + + return await self.find_or_create_notification( + session=session, + user_id=user_id, + operation_id=operation_id, + title=title, + message=message, + search_space_id=search_space_id, + initial_metadata=metadata, + ) + + async def notify_indexing_progress( + self, + session: AsyncSession, + notification: Notification, + indexed_count: int, + total_count: int | None = None, + stage: str | None = None, + stage_message: str | None = None, + ) -> Notification: + """Update the notification with indexing progress.""" + stage_messages = { + "connecting": "Connecting to your account", + "fetching": "Fetching your content", + "processing": "Preparing for search", + "storing": "Almost done", + } + + if stage or stage_message: + progress_msg = stage_message or stage_messages.get(stage, "Processing") + else: + # Legacy callers that pass neither stage nor message. + progress_msg = "Fetching your content" + + metadata_updates = {"indexed_count": indexed_count} + if total_count is not None: + metadata_updates["total_count"] = total_count + progress_percent = int((indexed_count / total_count) * 100) + metadata_updates["progress_percent"] = progress_percent + if stage: + metadata_updates["sync_stage"] = stage + + return await self.update_notification( + session=session, + notification=notification, + message=progress_msg, + status="in_progress", + metadata_updates=metadata_updates, + ) + + async def notify_retry_progress( + self, + session: AsyncSession, + notification: Notification, + indexed_count: int, + retry_reason: str, + attempt: int, + max_attempts: int, + wait_seconds: float | None = None, + service_name: str | None = None, + ) -> Notification: + """Surface that an external service is rate-limiting/retrying. + + Reusable by any connector; frames the delay as the provider's, not ours. + """ + if not service_name: + service_name = notification.notification_metadata.get( + "connector_name", "Service" + ) + # Strip the workspace suffix, e.g. "Notion - My Workspace" -> "Notion". + if " - " in service_name: + service_name = service_name.split(" - ")[0] + + # Worded so the delay reads as the provider's, not ours. + retry_messages = { + "rate_limit": f"{service_name} rate limit reached", + "server_error": f"{service_name} is slow to respond", + "timeout": f"{service_name} took too long", + "temporary_error": f"{service_name} temporarily unavailable", + } + + base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}") + + # Only surface a wait time when it's long enough to be worth showing. + if wait_seconds and wait_seconds > 5: + message = f"{base_message}. Retrying in {int(wait_seconds)}s..." + else: + message = f"{base_message}. Retrying..." + + if indexed_count > 0: + item_text = "item" if indexed_count == 1 else "items" + message = f"{message} ({indexed_count} {item_text} synced so far)" + + metadata_updates = { + "indexed_count": indexed_count, + "sync_stage": "waiting_retry", + "retry_attempt": attempt, + "retry_max_attempts": max_attempts, + "retry_reason": retry_reason, + "retry_wait_seconds": wait_seconds, + } + + return await self.update_notification( + session=session, + notification=notification, + message=message, + status="in_progress", + metadata_updates=metadata_updates, + ) + + async def notify_indexing_completed( + self, + session: AsyncSession, + notification: Notification, + indexed_count: int, + error_message: str | None = None, + is_warning: bool = False, + skipped_count: int | None = None, + unsupported_count: int | None = None, + ) -> Notification: + """Finalize the notification as ready/failed when indexing ends.""" + connector_name = notification.notification_metadata.get( + "connector_name", "Connector" + ) + + unsupported_text = "" + if unsupported_count and unsupported_count > 0: + file_word = "file was" if unsupported_count == 1 else "files were" + unsupported_text = f" {unsupported_count} {file_word} not supported." + + if error_message: + if indexed_count > 0: + title = f"Ready: {connector_name}" + file_text = "file" if indexed_count == 1 else "files" + message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}" + status = "completed" + elif is_warning: + title = f"Ready: {connector_name}" + message = f"Sync complete.{unsupported_text} {error_message}" + status = "completed" + else: + title = f"Failed: {connector_name}" + message = f"Sync failed: {error_message}" + if unsupported_text: + message += unsupported_text + status = "failed" + else: + title = f"Ready: {connector_name}" + if indexed_count == 0: + if unsupported_count and unsupported_count > 0: + message = f"Sync complete.{unsupported_text}" + else: + message = "Already up to date!" + else: + file_text = "file" if indexed_count == 1 else "files" + message = f"Now searchable! {indexed_count} {file_text} synced." + if unsupported_text: + message += unsupported_text + status = "completed" + + metadata_updates = { + "indexed_count": indexed_count, + "skipped_count": skipped_count or 0, + "unsupported_count": unsupported_count or 0, + "sync_stage": "completed" + if (not error_message or is_warning or indexed_count > 0) + else "failed", + "error_message": error_message, + } + + return await self.update_notification( + session=session, + notification=notification, + title=title, + message=message, + status=status, + metadata_updates=metadata_updates, + ) + + async def notify_google_drive_indexing_started( + self, + session: AsyncSession, + user_id: UUID, + connector_id: int, + connector_name: str, + connector_type: str, + search_space_id: int, + folder_count: int, + file_count: int, + folder_names: list[str] | None = None, + file_names: list[str] | None = None, + ) -> Notification: + """Open (or refresh) the notification when Drive indexing starts.""" + operation_id = self._generate_google_drive_operation_id( + connector_id, folder_count, file_count + ) + title = f"Syncing: {connector_name}" + message = "Preparing your files" + + metadata = { + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "folder_count": folder_count, + "file_count": file_count, + "indexed_count": 0, + "sync_stage": "connecting", + } + + if folder_names: + metadata["folder_names"] = folder_names + if file_names: + metadata["file_names"] = file_names + + return await self.find_or_create_notification( + session=session, + user_id=user_id, + operation_id=operation_id, + title=title, + message=message, + search_space_id=search_space_id, + initial_metadata=metadata, + ) diff --git a/surfsense_backend/app/notifications/service/handlers/document_processing.py b/surfsense_backend/app/notifications/service/handlers/document_processing.py new file mode 100644 index 000000000..2b162a053 --- /dev/null +++ b/surfsense_backend/app/notifications/service/handlers/document_processing.py @@ -0,0 +1,137 @@ +"""Notifications for single-document processing.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.notifications.persistence import Notification +from app.notifications.service.base import BaseNotificationHandler + + +class DocumentProcessingNotificationHandler(BaseNotificationHandler): + """Notifications for single-document processing.""" + + def __init__(self): + super().__init__("document_processing") + + def _generate_operation_id( + self, document_type: str, filename: str, search_space_id: int + ) -> str: + """Build a unique id for a document processing run.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") + # Create a short hash of filename to ensure uniqueness + import hashlib + + filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8] + return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}" + + async def notify_processing_started( + self, + session: AsyncSession, + user_id: UUID, + document_type: str, + document_name: str, + search_space_id: int, + file_size: int | None = None, + ) -> Notification: + """Open the notification when document processing is queued.""" + operation_id = self._generate_operation_id( + document_type, document_name, search_space_id + ) + title = f"Processing: {document_name}" + message = "Waiting in queue" + + metadata = { + "document_type": document_type, + "document_name": document_name, + "processing_stage": "queued", + } + + if file_size is not None: + metadata["file_size"] = file_size + + return await self.find_or_create_notification( + session=session, + user_id=user_id, + operation_id=operation_id, + title=title, + message=message, + search_space_id=search_space_id, + initial_metadata=metadata, + ) + + async def notify_processing_progress( + self, + session: AsyncSession, + notification: Notification, + stage: str, + stage_message: str | None = None, + chunks_count: int | None = None, + ) -> Notification: + """Update the notification with the current processing stage.""" + stage_messages = { + "parsing": "Reading your file", + "chunking": "Preparing for search", + "embedding": "Preparing for search", + "storing": "Finalizing", + } + + message = stage_message or stage_messages.get(stage, "Processing") + + metadata_updates = {"processing_stage": stage} + # Store chunks_count in metadata for debugging, but don't show to user + if chunks_count is not None: + metadata_updates["chunks_count"] = chunks_count + + return await self.update_notification( + session=session, + notification=notification, + message=message, + status="in_progress", + metadata_updates=metadata_updates, + ) + + async def notify_processing_completed( + self, + session: AsyncSession, + notification: Notification, + document_id: int | None = None, + chunks_count: int | None = None, + error_message: str | None = None, + ) -> Notification: + """Finalize the notification as ready/failed when processing ends.""" + document_name = notification.notification_metadata.get( + "document_name", "Document" + ) + + if error_message: + title = f"Failed: {document_name}" + message = f"Processing failed: {error_message}" + status = "failed" + else: + title = f"Ready: {document_name}" + message = "Now searchable!" + status = "completed" + + metadata_updates = { + "processing_stage": "completed" if not error_message else "failed", + "error_message": error_message, + } + + if document_id is not None: + metadata_updates["document_id"] = document_id + # Store chunks_count in metadata for debugging, but don't show to user + if chunks_count is not None: + metadata_updates["chunks_count"] = chunks_count + + return await self.update_notification( + session=session, + notification=notification, + title=title, + message=message, + status=status, + metadata_updates=metadata_updates, + ) diff --git a/surfsense_backend/app/notifications/service/handlers/mention.py b/surfsense_backend/app/notifications/service/handlers/mention.py new file mode 100644 index 000000000..650907bab --- /dev/null +++ b/surfsense_backend/app/notifications/service/handlers/mention.py @@ -0,0 +1,105 @@ +"""Notifications for @mentions in comments.""" + +from __future__ import annotations + +import logging +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.notifications.persistence import Notification +from app.notifications.service.base import BaseNotificationHandler + +logger = logging.getLogger(__name__) + + +class MentionNotificationHandler(BaseNotificationHandler): + """Notifications for @mentions in comments.""" + + def __init__(self): + super().__init__("new_mention") + + async def find_notification_by_mention( + self, + session: AsyncSession, + mention_id: int, + ) -> Notification | None: + """Return the notification for ``mention_id``, if one exists.""" + query = select(Notification).where( + Notification.type == self.notification_type, + Notification.notification_metadata["mention_id"].astext == str(mention_id), + ) + result = await session.execute(query) + return result.scalar_one_or_none() + + async def notify_new_mention( + self, + session: AsyncSession, + mentioned_user_id: UUID, + mention_id: int, + comment_id: int, + message_id: int, + thread_id: int, + thread_title: str, + author_id: str, + author_name: str, + author_avatar_url: str | None, + author_email: str, + content_preview: str, + search_space_id: int, + ) -> Notification: + """Notify a mentioned user; idempotent on ``mention_id``.""" + existing = await self.find_notification_by_mention(session, mention_id) + if existing: + logger.info( + f"Notification already exists for mention {mention_id}, returning existing" + ) + return existing + + title = f"{author_name} mentioned you" + message = content_preview[:100] + ("..." if len(content_preview) > 100 else "") + + metadata = { + "mention_id": mention_id, + "comment_id": comment_id, + "message_id": message_id, + "thread_id": thread_id, + "thread_title": thread_title, + "author_id": author_id, + "author_name": author_name, + "author_avatar_url": author_avatar_url, + "author_email": author_email, + "content_preview": content_preview[:200], + } + + try: + notification = Notification( + user_id=mentioned_user_id, + search_space_id=search_space_id, + type=self.notification_type, + title=title, + message=message, + notification_metadata=metadata, + ) + session.add(notification) + await session.commit() + await session.refresh(notification) + logger.info( + f"Created new_mention notification {notification.id} for user {mentioned_user_id}" + ) + return notification + except Exception as e: + # Race: a concurrent insert won; fetch the existing row instead. + await session.rollback() + if ( + "duplicate key" in str(e).lower() + or "unique constraint" in str(e).lower() + ): + logger.warning( + f"Duplicate notification detected for mention {mention_id}, fetching existing" + ) + existing = await self.find_notification_by_mention(session, mention_id) + if existing: + return existing + raise diff --git a/surfsense_backend/app/notifications/service/handlers/page_limit.py b/surfsense_backend/app/notifications/service/handlers/page_limit.py new file mode 100644 index 000000000..00e8dfc18 --- /dev/null +++ b/surfsense_backend/app/notifications/service/handlers/page_limit.py @@ -0,0 +1,80 @@ +"""Notifications for exceeding the page limit.""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.notifications.persistence import Notification +from app.notifications.service.base import BaseNotificationHandler + +logger = logging.getLogger(__name__) + + +class PageLimitNotificationHandler(BaseNotificationHandler): + """Notifications for exceeding the page limit.""" + + def __init__(self): + super().__init__("page_limit_exceeded") + + def _generate_operation_id(self, document_name: str, search_space_id: int) -> str: + """Build a unique id for a page-limit notification.""" + import hashlib + + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") + # Create a short hash of document name to ensure uniqueness + doc_hash = hashlib.md5(document_name.encode()).hexdigest()[:8] + return f"page_limit_{search_space_id}_{timestamp}_{doc_hash}" + + async def notify_page_limit_exceeded( + self, + session: AsyncSession, + user_id: UUID, + document_name: str, + document_type: str, + search_space_id: int, + pages_used: int, + pages_limit: int, + pages_to_add: int, + ) -> Notification: + """Notify that a document was blocked by the page limit.""" + operation_id = self._generate_operation_id(document_name, search_space_id) + + display_name = ( + document_name[:40] + "..." if len(document_name) > 40 else document_name + ) + title = f"Page limit exceeded: {display_name}" + message = f"This document has ~{pages_to_add} page(s) but you've used {pages_used}/{pages_limit} pages. Upgrade to process more documents." + + metadata = { + "operation_id": operation_id, + "document_name": document_name, + "document_type": document_type, + "pages_used": pages_used, + "pages_limit": pages_limit, + "pages_to_add": pages_to_add, + "status": "failed", + "error_type": "page_limit_exceeded", + # Where the inbox item links to. + "action_url": f"/dashboard/{search_space_id}/more-pages", + "action_label": "Upgrade Plan", + } + + notification = Notification( + user_id=user_id, + search_space_id=search_space_id, + type=self.notification_type, + title=title, + message=message, + notification_metadata=metadata, + ) + session.add(notification) + await session.commit() + await session.refresh(notification) + logger.info( + f"Created page_limit_exceeded notification {notification.id} for user {user_id}" + ) + return notification diff --git a/surfsense_backend/app/notifications/types.py b/surfsense_backend/app/notifications/types.py new file mode 100644 index 000000000..bb8bcfab1 --- /dev/null +++ b/surfsense_backend/app/notifications/types.py @@ -0,0 +1,16 @@ +"""The notification types the API recognizes.""" + +from __future__ import annotations + +from typing import Literal + +NotificationType = Literal[ + "connector_indexing", + "connector_deletion", + "document_processing", + "new_mention", + "comment_reply", + "page_limit_exceeded", +] + +NotificationCategory = Literal["comments", "status"]