From c23bdc4a5e0a4ed6d9dcffed606e67fe48eac424 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 3 Jun 2026 21:52:58 +0200 Subject: [PATCH] feat: add pure notification message helpers --- .../service/messages/__init__.py | 6 + .../service/messages/connector_indexing.py | 164 ++++++++++++++++++ .../service/messages/document_processing.py | 64 +++++++ .../service/messages/page_limit.py | 25 +++ .../notifications/service/messages/text.py | 8 + 5 files changed, 267 insertions(+) create mode 100644 surfsense_backend/app/notifications/service/messages/__init__.py create mode 100644 surfsense_backend/app/notifications/service/messages/connector_indexing.py create mode 100644 surfsense_backend/app/notifications/service/messages/document_processing.py create mode 100644 surfsense_backend/app/notifications/service/messages/page_limit.py create mode 100644 surfsense_backend/app/notifications/service/messages/text.py diff --git a/surfsense_backend/app/notifications/service/messages/__init__.py b/surfsense_backend/app/notifications/service/messages/__init__.py new file mode 100644 index 000000000..95373537d --- /dev/null +++ b/surfsense_backend/app/notifications/service/messages/__init__.py @@ -0,0 +1,6 @@ +"""Pure, side-effect-free presentation logic for notifications. + +Handlers compute their user-facing title/message/status/metadata here, then +persist the result. Keeping this layer free of I/O makes it unit-testable +without a database. +""" diff --git a/surfsense_backend/app/notifications/service/messages/connector_indexing.py b/surfsense_backend/app/notifications/service/messages/connector_indexing.py new file mode 100644 index 000000000..8a2926211 --- /dev/null +++ b/surfsense_backend/app/notifications/service/messages/connector_indexing.py @@ -0,0 +1,164 @@ +"""Pure presentation logic for connector-indexing notifications.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any + + +def operation_id( + connector_id: int, + start_date: str | None = None, + end_date: str | None = None, +) -> str: + """Build a unique id for a connector indexing run.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") + date_range = "" + if start_date or end_date: + date_range = f"_{start_date or 'none'}_{end_date or 'none'}" + return f"connector_{connector_id}_{timestamp}{date_range}" + + +def google_drive_operation_id( + connector_id: int, folder_count: int, file_count: int +) -> str: + """Build a unique id for a Google Drive indexing run.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") + items_info = f"_{folder_count}f_{file_count}files" + return f"drive_{connector_id}_{timestamp}{items_info}" + + +def progress( + indexed_count: int, + total_count: int | None = None, + stage: str | None = None, + stage_message: str | None = None, +) -> tuple[str, dict[str, Any]]: + """Compute the progress message and metadata updates for an indexing run.""" + stage_messages = { + "connecting": "Connecting to your account", + "fetching": "Fetching your content", + "processing": "Preparing for search", + "storing": "Almost done", + } + + if stage or stage_message: + progress_msg = stage_message or stage_messages.get(stage, "Processing") + else: + # Legacy callers that pass neither stage nor message. + progress_msg = "Fetching your content" + + metadata_updates: dict[str, Any] = {"indexed_count": indexed_count} + if total_count is not None: + metadata_updates["total_count"] = total_count + progress_percent = int((indexed_count / total_count) * 100) + metadata_updates["progress_percent"] = progress_percent + if stage: + metadata_updates["sync_stage"] = stage + + return progress_msg, metadata_updates + + +def retry( + connector_name: str, + indexed_count: int, + retry_reason: str, + attempt: int, + max_attempts: int, + wait_seconds: float | None = None, + service_name: str | None = None, +) -> tuple[str, dict[str, Any]]: + """Compute the retry message and metadata, framing the delay as the provider's.""" + if not service_name: + service_name = connector_name + # Strip the workspace suffix, e.g. "Notion - My Workspace" -> "Notion". + if " - " in service_name: + service_name = service_name.split(" - ")[0] + + # Worded so the delay reads as the provider's, not ours. + retry_messages = { + "rate_limit": f"{service_name} rate limit reached", + "server_error": f"{service_name} is slow to respond", + "timeout": f"{service_name} took too long", + "temporary_error": f"{service_name} temporarily unavailable", + } + + base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}") + + # Only surface a wait time when it's long enough to be worth showing. + if wait_seconds and wait_seconds > 5: + message = f"{base_message}. Retrying in {int(wait_seconds)}s..." + else: + message = f"{base_message}. Retrying..." + + if indexed_count > 0: + item_text = "item" if indexed_count == 1 else "items" + message = f"{message} ({indexed_count} {item_text} synced so far)" + + metadata_updates = { + "indexed_count": indexed_count, + "sync_stage": "waiting_retry", + "retry_attempt": attempt, + "retry_max_attempts": max_attempts, + "retry_reason": retry_reason, + "retry_wait_seconds": wait_seconds, + } + + return message, metadata_updates + + +def completion( + connector_name: str, + indexed_count: int, + error_message: str | None = None, + is_warning: bool = False, + skipped_count: int | None = None, + unsupported_count: int | None = None, +) -> tuple[str, str, str, dict[str, Any]]: + """Compute the final title, message, status, and metadata for a finished run.""" + unsupported_text = "" + if unsupported_count and unsupported_count > 0: + file_word = "file was" if unsupported_count == 1 else "files were" + unsupported_text = f" {unsupported_count} {file_word} not supported." + + if error_message: + if indexed_count > 0: + title = f"Ready: {connector_name}" + file_text = "file" if indexed_count == 1 else "files" + message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}" + status = "completed" + elif is_warning: + title = f"Ready: {connector_name}" + message = f"Sync complete.{unsupported_text} {error_message}" + status = "completed" + else: + title = f"Failed: {connector_name}" + message = f"Sync failed: {error_message}" + if unsupported_text: + message += unsupported_text + status = "failed" + else: + title = f"Ready: {connector_name}" + if indexed_count == 0: + if unsupported_count and unsupported_count > 0: + message = f"Sync complete.{unsupported_text}" + else: + message = "Already up to date!" + else: + file_text = "file" if indexed_count == 1 else "files" + message = f"Now searchable! {indexed_count} {file_text} synced." + if unsupported_text: + message += unsupported_text + status = "completed" + + metadata_updates = { + "indexed_count": indexed_count, + "skipped_count": skipped_count or 0, + "unsupported_count": unsupported_count or 0, + "sync_stage": "completed" + if (not error_message or is_warning or indexed_count > 0) + else "failed", + "error_message": error_message, + } + + return title, message, status, metadata_updates diff --git a/surfsense_backend/app/notifications/service/messages/document_processing.py b/surfsense_backend/app/notifications/service/messages/document_processing.py new file mode 100644 index 000000000..3805c2847 --- /dev/null +++ b/surfsense_backend/app/notifications/service/messages/document_processing.py @@ -0,0 +1,64 @@ +"""Pure presentation logic for document-processing notifications.""" + +from __future__ import annotations + +import hashlib +from datetime import UTC, datetime +from typing import Any + + +def operation_id(document_type: str, filename: str, search_space_id: int) -> str: + """Build a unique id for a document processing run.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") + filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8] + return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}" + + +def progress( + stage: str, + stage_message: str | None = None, + chunks_count: int | None = None, +) -> tuple[str, dict[str, Any]]: + """Compute the progress message and metadata updates for a processing run.""" + stage_messages = { + "parsing": "Reading your file", + "chunking": "Preparing for search", + "embedding": "Preparing for search", + "storing": "Finalizing", + } + + message = stage_message or stage_messages.get(stage, "Processing") + + metadata_updates: dict[str, Any] = {"processing_stage": stage} + if chunks_count is not None: + metadata_updates["chunks_count"] = chunks_count + + return message, metadata_updates + + +def completion( + document_name: str, + error_message: str | None = None, + document_id: int | None = None, + chunks_count: int | None = None, +) -> tuple[str, str, str, dict[str, Any]]: + """Compute the final title, message, status, and metadata for a finished run.""" + if error_message: + title = f"Failed: {document_name}" + message = f"Processing failed: {error_message}" + status = "failed" + else: + title = f"Ready: {document_name}" + message = "Now searchable!" + status = "completed" + + metadata_updates: dict[str, Any] = { + "processing_stage": "completed" if not error_message else "failed", + "error_message": error_message, + } + if document_id is not None: + metadata_updates["document_id"] = document_id + if chunks_count is not None: + metadata_updates["chunks_count"] = chunks_count + + return title, message, status, metadata_updates diff --git a/surfsense_backend/app/notifications/service/messages/page_limit.py b/surfsense_backend/app/notifications/service/messages/page_limit.py new file mode 100644 index 000000000..54e5cbdec --- /dev/null +++ b/surfsense_backend/app/notifications/service/messages/page_limit.py @@ -0,0 +1,25 @@ +"""Pure presentation logic for page-limit notifications.""" + +from __future__ import annotations + +import hashlib +from datetime import UTC, datetime + +from app.notifications.service.messages.text import truncate + + +def operation_id(document_name: str, search_space_id: int) -> str: + """Build a unique id for a page-limit notification.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") + doc_hash = hashlib.md5(document_name.encode()).hexdigest()[:8] + return f"page_limit_{search_space_id}_{timestamp}_{doc_hash}" + + +def summary( + document_name: str, pages_used: int, pages_limit: int, pages_to_add: int +) -> tuple[str, str]: + """Compute the title and message for a blocked-by-page-limit document.""" + display_name = truncate(document_name, 40) + title = f"Page limit exceeded: {display_name}" + message = f"This document has ~{pages_to_add} page(s) but you've used {pages_used}/{pages_limit} pages. Upgrade to process more documents." + return title, message diff --git a/surfsense_backend/app/notifications/service/messages/text.py b/surfsense_backend/app/notifications/service/messages/text.py new file mode 100644 index 000000000..98d5284cb --- /dev/null +++ b/surfsense_backend/app/notifications/service/messages/text.py @@ -0,0 +1,8 @@ +"""Shared text helpers for notification copy.""" + +from __future__ import annotations + + +def truncate(text: str, limit: int) -> str: + """Return ``text`` capped at ``limit`` chars, appending an ellipsis if cut.""" + return text[:limit] + "..." if len(text) > limit else text