refactor: add notifications module

This commit is contained in:
CREDO23 2026-06-03 18:04:47 +02:00
parent 309bd9a2dd
commit 9575b9d4db
17 changed files with 1555 additions and 0 deletions

View file

@ -0,0 +1,15 @@
"""User notifications: persistence, service, and HTTP API.
Emit notifications via :class:`~app.notifications.service.NotificationService`;
the router in :mod:`app.notifications.api` exposes the inbox endpoints.
"""
from __future__ import annotations
from app.notifications.persistence import Notification
from app.notifications.service import NotificationService
__all__ = [
"Notification",
"NotificationService",
]

View file

@ -0,0 +1,7 @@
"""Notifications HTTP API."""
from __future__ import annotations
from app.notifications.api.api import router
__all__ = ["router"]

View file

@ -0,0 +1,412 @@
"""HTTP routes for the notifications inbox (list, counts, mark-read)."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import case, desc, func, literal, literal_column, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import User, get_async_session
from app.notifications.api.schemas import (
BatchUnreadCountResponse,
CategoryUnreadCount,
MarkAllReadResponse,
MarkReadResponse,
NotificationListResponse,
NotificationResponse,
SourceTypeItem,
SourceTypesResponse,
UnreadCountResponse,
)
from app.notifications.constants import CATEGORY_TYPES, SYNC_WINDOW_DAYS
from app.notifications.persistence import Notification
from app.notifications.types import NotificationCategory, NotificationType
from app.users import current_active_user
router = APIRouter(prefix="/notifications", tags=["notifications"])
@router.get("/unread-counts-batch", response_model=BatchUnreadCountResponse)
async def get_unread_counts_batch(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> BatchUnreadCountResponse:
"""Unread counts for every category in a single query."""
cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS)
base_filter = [
Notification.user_id == user.id,
Notification.read == False, # noqa: E712
]
if search_space_id is not None:
# Include global (null search-space) notifications.
base_filter.append(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
is_comments = Notification.type.in_(CATEGORY_TYPES["comments"])
is_status = Notification.type.in_(CATEGORY_TYPES["status"])
is_recent = Notification.created_at > cutoff_date
query = select(
func.count(case((is_comments, Notification.id))).label("comments_total"),
func.count(case((is_comments & is_recent, Notification.id))).label(
"comments_recent"
),
func.count(case((is_status, Notification.id))).label("status_total"),
func.count(case((is_status & is_recent, Notification.id))).label(
"status_recent"
),
).where(*base_filter)
result = await session.execute(query)
row = result.one()
return BatchUnreadCountResponse(
comments=CategoryUnreadCount(
total_unread=row.comments_total,
recent_unread=row.comments_recent,
),
status=CategoryUnreadCount(
total_unread=row.status_total,
recent_unread=row.status_recent,
),
)
@router.get("/source-types", response_model=SourceTypesResponse)
async def get_notification_source_types(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> SourceTypesResponse:
"""Distinct connector/document source types for the Status tab filter."""
base_filter = [Notification.user_id == user.id]
if search_space_id is not None:
# Include global (null search-space) notifications.
base_filter.append(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
connector_type_expr = Notification.notification_metadata["connector_type"].astext
connector_query = (
select(
connector_type_expr.label("source_type"),
literal("connector").label("category"),
func.count(Notification.id).label("cnt"),
)
.where(
*base_filter,
Notification.type.in_(("connector_indexing", "connector_deletion")),
connector_type_expr.isnot(None),
)
.group_by(literal_column("source_type"))
)
document_type_expr = Notification.notification_metadata["document_type"].astext
document_query = (
select(
document_type_expr.label("source_type"),
literal("document").label("category"),
func.count(Notification.id).label("cnt"),
)
.where(
*base_filter,
Notification.type.in_(("document_processing",)),
document_type_expr.isnot(None),
)
.group_by(literal_column("source_type"))
)
connector_result = await session.execute(connector_query)
document_result = await session.execute(document_query)
sources = []
for source_type, category, count in [
*connector_result.all(),
*document_result.all(),
]:
if not source_type:
continue
sources.append(
SourceTypeItem(
key=f"{category}:{source_type}",
type=source_type,
category=category,
count=count,
)
)
return SourceTypesResponse(sources=sources)
@router.get("/unread-count", response_model=UnreadCountResponse)
async def get_unread_count(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
type_filter: NotificationType | None = Query(
None, alias="type", description="Filter by notification type"
),
category: NotificationCategory | None = Query(
None, description="Filter by category: 'comments' or 'status'"
),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> UnreadCountResponse:
"""Total and recent (within sync window) unread counts for the user.
Returning both lets a client hold the older count static while
live-syncing the recent ones.
"""
cutoff_date = datetime.now(UTC) - timedelta(days=SYNC_WINDOW_DAYS)
base_filter = [
Notification.user_id == user.id,
Notification.read == False, # noqa: E712
]
if search_space_id is not None:
# Include global (null search-space) notifications.
base_filter.append(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
if type_filter:
base_filter.append(Notification.type == type_filter)
if category:
base_filter.append(Notification.type.in_(CATEGORY_TYPES[category]))
total_query = select(func.count(Notification.id)).where(*base_filter)
total_result = await session.execute(total_query)
total_unread = total_result.scalar() or 0
recent_query = select(func.count(Notification.id)).where(
*base_filter,
Notification.created_at > cutoff_date,
)
recent_result = await session.execute(recent_query)
recent_unread = recent_result.scalar() or 0
return UnreadCountResponse(
total_unread=total_unread,
recent_unread=recent_unread,
)
@router.get("", response_model=NotificationListResponse)
async def list_notifications(
search_space_id: int | None = Query(None, description="Filter by search space ID"),
type_filter: NotificationType | None = Query(
None, alias="type", description="Filter by notification type"
),
category: NotificationCategory | None = Query(
None, description="Filter by category: 'comments' or 'status'"
),
source_type: str | None = Query(
None,
description="Filter by source type, e.g. 'connector:GITHUB_CONNECTOR' or 'doctype:FILE'",
),
filter: str | None = Query(
None,
description="Filter preset: 'unread' for unread only, 'errors' for failed/error items only",
),
before_date: str | None = Query(
None, description="Get notifications before this ISO date (for pagination)"
),
search: str | None = Query(
None, description="Search notifications by title or message (case-insensitive)"
),
limit: int = Query(50, ge=1, le=100, description="Number of items to return"),
offset: int = Query(0, ge=0, description="Number of items to skip"),
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> NotificationListResponse:
"""Paginated inbox fallback for items outside the Zero sync window."""
query = select(Notification).where(Notification.user_id == user.id)
count_query = select(func.count(Notification.id)).where(
Notification.user_id == user.id
)
if search_space_id is not None:
# Include global (null search-space) notifications.
query = query.where(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
count_query = count_query.where(
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
if type_filter:
query = query.where(Notification.type == type_filter)
count_query = count_query.where(Notification.type == type_filter)
if category:
cat_types = CATEGORY_TYPES[category]
query = query.where(Notification.type.in_(cat_types))
count_query = count_query.where(Notification.type.in_(cat_types))
# source_type encodes the JSONB facet to match: 'connector:<type>' or 'doctype:<type>'.
if source_type:
if source_type.startswith("connector:"):
connector_val = source_type[len("connector:") :]
source_filter = Notification.type.in_(
("connector_indexing", "connector_deletion")
) & (
Notification.notification_metadata["connector_type"].astext
== connector_val
)
query = query.where(source_filter)
count_query = count_query.where(source_filter)
elif source_type.startswith("doctype:"):
doctype_val = source_type[len("doctype:") :]
source_filter = Notification.type.in_(("document_processing",)) & (
Notification.notification_metadata["document_type"].astext
== doctype_val
)
query = query.where(source_filter)
count_query = count_query.where(source_filter)
if filter == "unread":
unread_filter = Notification.read == False # noqa: E712
query = query.where(unread_filter)
count_query = count_query.where(unread_filter)
elif filter == "errors":
error_filter = (Notification.type == "page_limit_exceeded") | (
Notification.notification_metadata["status"].astext == "failed"
)
query = query.where(error_filter)
count_query = count_query.where(error_filter)
if before_date:
try:
before_datetime = datetime.fromisoformat(before_date.replace("Z", "+00:00"))
query = query.where(Notification.created_at < before_datetime)
count_query = count_query.where(Notification.created_at < before_datetime)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid date format. Use ISO format (e.g., 2024-01-15T00:00:00Z)",
) from None
if search:
search_term = f"%{search}%"
search_filter = Notification.title.ilike(
search_term
) | Notification.message.ilike(search_term)
query = query.where(search_filter)
count_query = count_query.where(search_filter)
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# Over-fetch by one to tell whether another page exists.
query = (
query.order_by(desc(Notification.created_at)).offset(offset).limit(limit + 1)
)
result = await session.execute(query)
notifications = result.scalars().all()
has_more = len(notifications) > limit
if has_more:
notifications = notifications[:limit]
items = []
for notification in notifications:
items.append(
NotificationResponse(
id=notification.id,
user_id=str(notification.user_id),
search_space_id=notification.search_space_id,
type=notification.type,
title=notification.title,
message=notification.message,
read=notification.read,
metadata=notification.notification_metadata or {},
created_at=notification.created_at.isoformat()
if notification.created_at
else "",
updated_at=notification.updated_at.isoformat()
if notification.updated_at
else None,
)
)
return NotificationListResponse(
items=items,
total=total,
has_more=has_more,
next_offset=offset + limit if has_more else None,
)
@router.patch("/{notification_id}/read", response_model=MarkReadResponse)
async def mark_notification_as_read(
notification_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> MarkReadResponse:
"""Mark one of the user's notifications read; Zero syncs the change."""
# Scope to the caller's own notifications.
result = await session.execute(
select(Notification).where(
Notification.id == notification_id,
Notification.user_id == user.id,
)
)
notification = result.scalar_one_or_none()
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Notification not found",
)
if notification.read:
return MarkReadResponse(
success=True,
message="Notification already marked as read",
)
notification.read = True
await session.commit()
return MarkReadResponse(
success=True,
message="Notification marked as read",
)
@router.patch("/read-all", response_model=MarkAllReadResponse)
async def mark_all_notifications_as_read(
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> MarkAllReadResponse:
"""Mark all of the user's notifications read; Zero syncs the changes."""
result = await session.execute(
update(Notification)
.where(
Notification.user_id == user.id,
Notification.read == False, # noqa: E712
)
.values(read=True)
)
await session.commit()
updated_count = result.rowcount
return MarkAllReadResponse(
success=True,
message=f"Marked {updated_count} notification(s) as read",
updated_count=updated_count,
)

View file

@ -0,0 +1,81 @@
"""Response shapes for the notifications API."""
from __future__ import annotations
from pydantic import BaseModel
class NotificationResponse(BaseModel):
"""A single notification."""
id: int
user_id: str
search_space_id: int | None
type: str
title: str
message: str
read: bool
metadata: dict
created_at: str
updated_at: str | None
class Config:
from_attributes = True
class NotificationListResponse(BaseModel):
"""A page of notifications."""
items: list[NotificationResponse]
total: int
has_more: bool
next_offset: int | None
class MarkReadResponse(BaseModel):
"""Outcome of marking one notification read."""
success: bool
message: str
class MarkAllReadResponse(BaseModel):
"""Outcome of marking every notification read."""
success: bool
message: str
updated_count: int
class SourceTypeItem(BaseModel):
"""A source type with its category and count."""
key: str
type: str
category: str # "connector" or "document"
count: int
class SourceTypesResponse(BaseModel):
"""Source types available for the Status tab filter."""
sources: list[SourceTypeItem]
class UnreadCountResponse(BaseModel):
"""Unread totals, split by sync-window recency."""
total_unread: int
recent_unread: int
class CategoryUnreadCount(BaseModel):
total_unread: int
recent_unread: int
class BatchUnreadCountResponse(BaseModel):
"""Per-category unread counts in one response."""
comments: CategoryUnreadCount
status: CategoryUnreadCount

View file

@ -0,0 +1,17 @@
"""Notification policy constants."""
from __future__ import annotations
# Notifications newer than this are live-synced; older ones load via the list endpoint.
SYNC_WINDOW_DAYS = 14
# Maps an inbox tab to the notification types it shows.
CATEGORY_TYPES: dict[str, tuple[str, ...]] = {
"comments": ("new_mention", "comment_reply"),
"status": (
"connector_indexing",
"connector_deletion",
"document_processing",
"page_limit_exceeded",
),
}

View file

@ -0,0 +1,7 @@
"""Notification persistence models."""
from __future__ import annotations
from .models import Notification
__all__ = ["Notification"]

View file

@ -0,0 +1,72 @@
"""Per-user inbox notifications, synced to clients via Zero."""
from __future__ import annotations
from datetime import UTC, datetime
from sqlalchemy import (
TIMESTAMP,
Boolean,
Column,
ForeignKey,
Index,
Integer,
String,
Text,
text,
)
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import relationship
from app.db import BaseModel, TimestampMixin
class Notification(BaseModel, TimestampMixin):
__tablename__ = "notifications"
__table_args__ = (
# Serves unread-count queries.
Index(
"ix_notifications_user_read_type_created",
"user_id",
"read",
"type",
"created_at",
),
# Serves the paginated inbox list query.
Index(
"ix_notifications_user_space_created",
"user_id",
"search_space_id",
"created_at",
),
)
user_id = Column(
UUID(as_uuid=True),
ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
search_space_id = Column(
Integer,
ForeignKey("searchspaces.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
type = Column(String(50), nullable=False, index=True)
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)
read = Column(
Boolean, nullable=False, default=False, server_default=text("false"), index=True
)
notification_metadata = Column("metadata", JSONB, nullable=True, default={})
updated_at = Column(
TIMESTAMP(timezone=True),
nullable=True,
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
index=True,
)
user = relationship("User", back_populates="notifications")
search_space = relationship("SearchSpace", back_populates="notifications")

View file

@ -0,0 +1,7 @@
"""Notification creation/update service."""
from __future__ import annotations
from app.notifications.service.facade import NotificationService
__all__ = ["NotificationService"]

View file

@ -0,0 +1,132 @@
"""Shared find/upsert/update logic for a single notification type."""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.notifications.persistence import Notification
logger = logging.getLogger(__name__)
class BaseNotificationHandler:
"""Find, upsert, and update notifications of one ``type``."""
def __init__(self, notification_type: str):
self.notification_type = notification_type
async def find_notification_by_operation(
self,
session: AsyncSession,
user_id: UUID,
operation_id: str,
search_space_id: int | None = None,
) -> Notification | None:
"""Return the notification for ``operation_id``, if one exists."""
query = select(Notification).where(
Notification.user_id == user_id,
Notification.type == self.notification_type,
Notification.notification_metadata["operation_id"].astext == operation_id,
)
if search_space_id is not None:
query = query.where(Notification.search_space_id == search_space_id)
result = await session.execute(query)
return result.scalar_one_or_none()
async def find_or_create_notification(
self,
session: AsyncSession,
user_id: UUID,
operation_id: str,
title: str,
message: str,
search_space_id: int | None = None,
initial_metadata: dict[str, Any] | None = None,
) -> Notification:
"""Upsert a notification keyed by ``operation_id``."""
notification = await self.find_notification_by_operation(
session, user_id, operation_id, search_space_id
)
if notification:
notification.title = title
notification.message = message
if initial_metadata:
notification.notification_metadata = {
**notification.notification_metadata,
**initial_metadata,
}
# Tell SQLAlchemy the JSONB dict changed in place.
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
logger.info(
f"Updated notification {notification.id} for operation {operation_id}"
)
return notification
metadata = initial_metadata or {}
metadata["operation_id"] = operation_id
metadata["status"] = "in_progress"
metadata["started_at"] = datetime.now(UTC).isoformat()
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=self.notification_type,
title=title,
message=message,
notification_metadata=metadata,
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(
f"Created notification {notification.id} for operation {operation_id}"
)
return notification
async def update_notification(
self,
session: AsyncSession,
notification: Notification,
title: str | None = None,
message: str | None = None,
status: str | None = None,
metadata_updates: dict[str, Any] | None = None,
) -> Notification:
"""Apply field/status/metadata changes and persist."""
if title is not None:
notification.title = title
if message is not None:
notification.message = message
if status is not None:
notification.notification_metadata["status"] = status
if status in ("completed", "failed"):
notification.notification_metadata["completed_at"] = datetime.now(
UTC
).isoformat()
# Tell SQLAlchemy the JSONB dict changed in place.
flag_modified(notification, "notification_metadata")
if metadata_updates:
notification.notification_metadata = {
**notification.notification_metadata,
**metadata_updates,
}
# Tell SQLAlchemy the JSONB dict changed in place.
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
logger.info(f"Updated notification {notification.id}")
return notification

View file

@ -0,0 +1,55 @@
"""Single entry point that composes the per-type notification handlers."""
from __future__ import annotations
import logging
from typing import Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.notifications.persistence import Notification
from app.notifications.service.handlers import (
CommentReplyNotificationHandler,
ConnectorIndexingNotificationHandler,
DocumentProcessingNotificationHandler,
MentionNotificationHandler,
PageLimitNotificationHandler,
)
logger = logging.getLogger(__name__)
class NotificationService:
"""Facade over the per-type handlers; mutations sync via Zero."""
connector_indexing = ConnectorIndexingNotificationHandler()
document_processing = DocumentProcessingNotificationHandler()
mention = MentionNotificationHandler()
comment_reply = CommentReplyNotificationHandler()
page_limit = PageLimitNotificationHandler()
@staticmethod
async def create_notification(
session: AsyncSession,
user_id: UUID,
notification_type: str,
title: str,
message: str,
search_space_id: int | None = None,
notification_metadata: dict[str, Any] | None = None,
) -> Notification:
"""Create a generic notification of any ``notification_type``."""
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=notification_type,
title=title,
message=message,
notification_metadata=notification_metadata or {},
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(f"Created notification {notification.id} for user {user_id}")
return notification

View file

@ -0,0 +1,17 @@
"""Per-type notification handlers."""
from __future__ import annotations
from .comment_reply import CommentReplyNotificationHandler
from .connector_indexing import ConnectorIndexingNotificationHandler
from .document_processing import DocumentProcessingNotificationHandler
from .mention import MentionNotificationHandler
from .page_limit import PageLimitNotificationHandler
__all__ = [
"CommentReplyNotificationHandler",
"ConnectorIndexingNotificationHandler",
"DocumentProcessingNotificationHandler",
"MentionNotificationHandler",
"PageLimitNotificationHandler",
]

View file

@ -0,0 +1,107 @@
"""Notifications for replies to a user's comments."""
from __future__ import annotations
import logging
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.notifications.persistence import Notification
from app.notifications.service.base import BaseNotificationHandler
logger = logging.getLogger(__name__)
class CommentReplyNotificationHandler(BaseNotificationHandler):
"""Notifications for replies to a user's comments."""
def __init__(self):
super().__init__("comment_reply")
async def find_notification_by_reply(
self,
session: AsyncSession,
reply_id: int,
user_id: UUID,
) -> Notification | None:
query = select(Notification).where(
Notification.type == self.notification_type,
Notification.user_id == user_id,
Notification.notification_metadata["reply_id"].astext == str(reply_id),
)
result = await session.execute(query)
return result.scalar_one_or_none()
async def notify_comment_reply(
self,
session: AsyncSession,
user_id: UUID,
reply_id: int,
parent_comment_id: int,
message_id: int,
thread_id: int,
thread_title: str,
author_id: str,
author_name: str,
author_avatar_url: str | None,
author_email: str,
content_preview: str,
search_space_id: int,
) -> Notification:
"""Notify of a reply; idempotent on ``reply_id`` per user."""
existing = await self.find_notification_by_reply(session, reply_id, user_id)
if existing:
logger.info(
f"Notification already exists for reply {reply_id} to user {user_id}"
)
return existing
title = f"{author_name} replied in a thread"
message = content_preview[:100] + ("..." if len(content_preview) > 100 else "")
metadata = {
"reply_id": reply_id,
"parent_comment_id": parent_comment_id,
"message_id": message_id,
"thread_id": thread_id,
"thread_title": thread_title,
"author_id": author_id,
"author_name": author_name,
"author_avatar_url": author_avatar_url,
"author_email": author_email,
"content_preview": content_preview[:200],
}
try:
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=self.notification_type,
title=title,
message=message,
notification_metadata=metadata,
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(
f"Created comment_reply notification {notification.id} for user {user_id}"
)
return notification
except Exception as e:
await session.rollback()
if (
"duplicate key" in str(e).lower()
or "unique constraint" in str(e).lower()
):
logger.warning(
f"Duplicate notification for reply {reply_id} to user {user_id}"
)
existing = await self.find_notification_by_reply(
session, reply_id, user_id
)
if existing:
return existing
raise

View file

@ -0,0 +1,288 @@
"""Notifications for connector indexing runs."""
from __future__ import annotations
from datetime import UTC, datetime
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.notifications.persistence import Notification
from app.notifications.service.base import BaseNotificationHandler
class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"""Notifications for connector indexing runs."""
def __init__(self):
super().__init__("connector_indexing")
def _generate_operation_id(
self,
connector_id: int,
start_date: str | None = None,
end_date: str | None = None,
) -> str:
"""Build a unique id for a connector indexing run."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
date_range = ""
if start_date or end_date:
date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
return f"connector_{connector_id}_{timestamp}{date_range}"
def _generate_google_drive_operation_id(
self, connector_id: int, folder_count: int, file_count: int
) -> str:
"""Build a unique id for a Google Drive indexing run."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
items_info = f"_{folder_count}f_{file_count}files"
return f"drive_{connector_id}_{timestamp}{items_info}"
async def notify_indexing_started(
self,
session: AsyncSession,
user_id: UUID,
connector_id: int,
connector_name: str,
connector_type: str,
search_space_id: int,
start_date: str | None = None,
end_date: str | None = None,
) -> Notification:
"""Open (or refresh) the notification when indexing starts."""
operation_id = self._generate_operation_id(connector_id, start_date, end_date)
title = f"Syncing: {connector_name}"
message = "Connecting to your account"
metadata = {
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"start_date": start_date,
"end_date": end_date,
"indexed_count": 0,
"sync_stage": "connecting",
}
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
async def notify_indexing_progress(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
total_count: int | None = None,
stage: str | None = None,
stage_message: str | None = None,
) -> Notification:
"""Update the notification with indexing progress."""
stage_messages = {
"connecting": "Connecting to your account",
"fetching": "Fetching your content",
"processing": "Preparing for search",
"storing": "Almost done",
}
if stage or stage_message:
progress_msg = stage_message or stage_messages.get(stage, "Processing")
else:
# Legacy callers that pass neither stage nor message.
progress_msg = "Fetching your content"
metadata_updates = {"indexed_count": indexed_count}
if total_count is not None:
metadata_updates["total_count"] = total_count
progress_percent = int((indexed_count / total_count) * 100)
metadata_updates["progress_percent"] = progress_percent
if stage:
metadata_updates["sync_stage"] = stage
return await self.update_notification(
session=session,
notification=notification,
message=progress_msg,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_retry_progress(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
retry_reason: str,
attempt: int,
max_attempts: int,
wait_seconds: float | None = None,
service_name: str | None = None,
) -> Notification:
"""Surface that an external service is rate-limiting/retrying.
Reusable by any connector; frames the delay as the provider's, not ours.
"""
if not service_name:
service_name = notification.notification_metadata.get(
"connector_name", "Service"
)
# Strip the workspace suffix, e.g. "Notion - My Workspace" -> "Notion".
if " - " in service_name:
service_name = service_name.split(" - ")[0]
# Worded so the delay reads as the provider's, not ours.
retry_messages = {
"rate_limit": f"{service_name} rate limit reached",
"server_error": f"{service_name} is slow to respond",
"timeout": f"{service_name} took too long",
"temporary_error": f"{service_name} temporarily unavailable",
}
base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}")
# Only surface a wait time when it's long enough to be worth showing.
if wait_seconds and wait_seconds > 5:
message = f"{base_message}. Retrying in {int(wait_seconds)}s..."
else:
message = f"{base_message}. Retrying..."
if indexed_count > 0:
item_text = "item" if indexed_count == 1 else "items"
message = f"{message} ({indexed_count} {item_text} synced so far)"
metadata_updates = {
"indexed_count": indexed_count,
"sync_stage": "waiting_retry",
"retry_attempt": attempt,
"retry_max_attempts": max_attempts,
"retry_reason": retry_reason,
"retry_wait_seconds": wait_seconds,
}
return await self.update_notification(
session=session,
notification=notification,
message=message,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_indexing_completed(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
error_message: str | None = None,
is_warning: bool = False,
skipped_count: int | None = None,
unsupported_count: int | None = None,
) -> Notification:
"""Finalize the notification as ready/failed when indexing ends."""
connector_name = notification.notification_metadata.get(
"connector_name", "Connector"
)
unsupported_text = ""
if unsupported_count and unsupported_count > 0:
file_word = "file was" if unsupported_count == 1 else "files were"
unsupported_text = f" {unsupported_count} {file_word} not supported."
if error_message:
if indexed_count > 0:
title = f"Ready: {connector_name}"
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}"
status = "completed"
elif is_warning:
title = f"Ready: {connector_name}"
message = f"Sync complete.{unsupported_text} {error_message}"
status = "completed"
else:
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
if unsupported_text:
message += unsupported_text
status = "failed"
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
if unsupported_count and unsupported_count > 0:
message = f"Sync complete.{unsupported_text}"
else:
message = "Already up to date!"
else:
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced."
if unsupported_text:
message += unsupported_text
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"unsupported_count": unsupported_count or 0,
"sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",
"error_message": error_message,
}
return await self.update_notification(
session=session,
notification=notification,
title=title,
message=message,
status=status,
metadata_updates=metadata_updates,
)
async def notify_google_drive_indexing_started(
self,
session: AsyncSession,
user_id: UUID,
connector_id: int,
connector_name: str,
connector_type: str,
search_space_id: int,
folder_count: int,
file_count: int,
folder_names: list[str] | None = None,
file_names: list[str] | None = None,
) -> Notification:
"""Open (or refresh) the notification when Drive indexing starts."""
operation_id = self._generate_google_drive_operation_id(
connector_id, folder_count, file_count
)
title = f"Syncing: {connector_name}"
message = "Preparing your files"
metadata = {
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"folder_count": folder_count,
"file_count": file_count,
"indexed_count": 0,
"sync_stage": "connecting",
}
if folder_names:
metadata["folder_names"] = folder_names
if file_names:
metadata["file_names"] = file_names
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)

View file

@ -0,0 +1,137 @@
"""Notifications for single-document processing."""
from __future__ import annotations
from datetime import UTC, datetime
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.notifications.persistence import Notification
from app.notifications.service.base import BaseNotificationHandler
class DocumentProcessingNotificationHandler(BaseNotificationHandler):
"""Notifications for single-document processing."""
def __init__(self):
super().__init__("document_processing")
def _generate_operation_id(
self, document_type: str, filename: str, search_space_id: int
) -> str:
"""Build a unique id for a document processing run."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
# Create a short hash of filename to ensure uniqueness
import hashlib
filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}"
async def notify_processing_started(
self,
session: AsyncSession,
user_id: UUID,
document_type: str,
document_name: str,
search_space_id: int,
file_size: int | None = None,
) -> Notification:
"""Open the notification when document processing is queued."""
operation_id = self._generate_operation_id(
document_type, document_name, search_space_id
)
title = f"Processing: {document_name}"
message = "Waiting in queue"
metadata = {
"document_type": document_type,
"document_name": document_name,
"processing_stage": "queued",
}
if file_size is not None:
metadata["file_size"] = file_size
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
async def notify_processing_progress(
self,
session: AsyncSession,
notification: Notification,
stage: str,
stage_message: str | None = None,
chunks_count: int | None = None,
) -> Notification:
"""Update the notification with the current processing stage."""
stage_messages = {
"parsing": "Reading your file",
"chunking": "Preparing for search",
"embedding": "Preparing for search",
"storing": "Finalizing",
}
message = stage_message or stage_messages.get(stage, "Processing")
metadata_updates = {"processing_stage": stage}
# Store chunks_count in metadata for debugging, but don't show to user
if chunks_count is not None:
metadata_updates["chunks_count"] = chunks_count
return await self.update_notification(
session=session,
notification=notification,
message=message,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_processing_completed(
self,
session: AsyncSession,
notification: Notification,
document_id: int | None = None,
chunks_count: int | None = None,
error_message: str | None = None,
) -> Notification:
"""Finalize the notification as ready/failed when processing ends."""
document_name = notification.notification_metadata.get(
"document_name", "Document"
)
if error_message:
title = f"Failed: {document_name}"
message = f"Processing failed: {error_message}"
status = "failed"
else:
title = f"Ready: {document_name}"
message = "Now searchable!"
status = "completed"
metadata_updates = {
"processing_stage": "completed" if not error_message else "failed",
"error_message": error_message,
}
if document_id is not None:
metadata_updates["document_id"] = document_id
# Store chunks_count in metadata for debugging, but don't show to user
if chunks_count is not None:
metadata_updates["chunks_count"] = chunks_count
return await self.update_notification(
session=session,
notification=notification,
title=title,
message=message,
status=status,
metadata_updates=metadata_updates,
)

View file

@ -0,0 +1,105 @@
"""Notifications for @mentions in comments."""
from __future__ import annotations
import logging
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.notifications.persistence import Notification
from app.notifications.service.base import BaseNotificationHandler
logger = logging.getLogger(__name__)
class MentionNotificationHandler(BaseNotificationHandler):
"""Notifications for @mentions in comments."""
def __init__(self):
super().__init__("new_mention")
async def find_notification_by_mention(
self,
session: AsyncSession,
mention_id: int,
) -> Notification | None:
"""Return the notification for ``mention_id``, if one exists."""
query = select(Notification).where(
Notification.type == self.notification_type,
Notification.notification_metadata["mention_id"].astext == str(mention_id),
)
result = await session.execute(query)
return result.scalar_one_or_none()
async def notify_new_mention(
self,
session: AsyncSession,
mentioned_user_id: UUID,
mention_id: int,
comment_id: int,
message_id: int,
thread_id: int,
thread_title: str,
author_id: str,
author_name: str,
author_avatar_url: str | None,
author_email: str,
content_preview: str,
search_space_id: int,
) -> Notification:
"""Notify a mentioned user; idempotent on ``mention_id``."""
existing = await self.find_notification_by_mention(session, mention_id)
if existing:
logger.info(
f"Notification already exists for mention {mention_id}, returning existing"
)
return existing
title = f"{author_name} mentioned you"
message = content_preview[:100] + ("..." if len(content_preview) > 100 else "")
metadata = {
"mention_id": mention_id,
"comment_id": comment_id,
"message_id": message_id,
"thread_id": thread_id,
"thread_title": thread_title,
"author_id": author_id,
"author_name": author_name,
"author_avatar_url": author_avatar_url,
"author_email": author_email,
"content_preview": content_preview[:200],
}
try:
notification = Notification(
user_id=mentioned_user_id,
search_space_id=search_space_id,
type=self.notification_type,
title=title,
message=message,
notification_metadata=metadata,
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(
f"Created new_mention notification {notification.id} for user {mentioned_user_id}"
)
return notification
except Exception as e:
# Race: a concurrent insert won; fetch the existing row instead.
await session.rollback()
if (
"duplicate key" in str(e).lower()
or "unique constraint" in str(e).lower()
):
logger.warning(
f"Duplicate notification detected for mention {mention_id}, fetching existing"
)
existing = await self.find_notification_by_mention(session, mention_id)
if existing:
return existing
raise

View file

@ -0,0 +1,80 @@
"""Notifications for exceeding the page limit."""
from __future__ import annotations
import logging
from datetime import UTC, datetime
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.notifications.persistence import Notification
from app.notifications.service.base import BaseNotificationHandler
logger = logging.getLogger(__name__)
class PageLimitNotificationHandler(BaseNotificationHandler):
"""Notifications for exceeding the page limit."""
def __init__(self):
super().__init__("page_limit_exceeded")
def _generate_operation_id(self, document_name: str, search_space_id: int) -> str:
"""Build a unique id for a page-limit notification."""
import hashlib
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
# Create a short hash of document name to ensure uniqueness
doc_hash = hashlib.md5(document_name.encode()).hexdigest()[:8]
return f"page_limit_{search_space_id}_{timestamp}_{doc_hash}"
async def notify_page_limit_exceeded(
self,
session: AsyncSession,
user_id: UUID,
document_name: str,
document_type: str,
search_space_id: int,
pages_used: int,
pages_limit: int,
pages_to_add: int,
) -> Notification:
"""Notify that a document was blocked by the page limit."""
operation_id = self._generate_operation_id(document_name, search_space_id)
display_name = (
document_name[:40] + "..." if len(document_name) > 40 else document_name
)
title = f"Page limit exceeded: {display_name}"
message = f"This document has ~{pages_to_add} page(s) but you've used {pages_used}/{pages_limit} pages. Upgrade to process more documents."
metadata = {
"operation_id": operation_id,
"document_name": document_name,
"document_type": document_type,
"pages_used": pages_used,
"pages_limit": pages_limit,
"pages_to_add": pages_to_add,
"status": "failed",
"error_type": "page_limit_exceeded",
# Where the inbox item links to.
"action_url": f"/dashboard/{search_space_id}/more-pages",
"action_label": "Upgrade Plan",
}
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=self.notification_type,
title=title,
message=message,
notification_metadata=metadata,
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(
f"Created page_limit_exceeded notification {notification.id} for user {user_id}"
)
return notification

View file

@ -0,0 +1,16 @@
"""The notification types the API recognizes."""
from __future__ import annotations
from typing import Literal
NotificationType = Literal[
"connector_indexing",
"connector_deletion",
"document_processing",
"new_mention",
"comment_reply",
"page_limit_exceeded",
]
NotificationCategory = Literal["comments", "status"]