From 778ab9b254e1b3ad961b2727be8cfba42b9b7e85 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 3 Jun 2026 21:53:03 +0200 Subject: [PATCH] refactor: route indexing handlers through message helpers --- .../service/handlers/connector_indexing.py | 161 +++--------------- .../service/handlers/document_processing.py | 54 +----- .../service/handlers/page_limit.py | 20 +-- 3 files changed, 38 insertions(+), 197 deletions(-) diff --git a/surfsense_backend/app/notifications/service/handlers/connector_indexing.py b/surfsense_backend/app/notifications/service/handlers/connector_indexing.py index d75e7e6fc..9ebfae2ea 100644 --- a/surfsense_backend/app/notifications/service/handlers/connector_indexing.py +++ b/surfsense_backend/app/notifications/service/handlers/connector_indexing.py @@ -2,13 +2,13 @@ from __future__ import annotations -from datetime import UTC, datetime from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from app.notifications.persistence import Notification from app.notifications.service.base import BaseNotificationHandler +from app.notifications.service.messages import connector_indexing as msg class ConnectorIndexingNotificationHandler(BaseNotificationHandler): @@ -17,27 +17,6 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): def __init__(self): super().__init__("connector_indexing") - def _generate_operation_id( - self, - connector_id: int, - start_date: str | None = None, - end_date: str | None = None, - ) -> str: - """Build a unique id for a connector indexing run.""" - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") - date_range = "" - if start_date or end_date: - date_range = f"_{start_date or 'none'}_{end_date or 'none'}" - return f"connector_{connector_id}_{timestamp}{date_range}" - - def _generate_google_drive_operation_id( - self, connector_id: int, folder_count: int, file_count: int - ) -> str: - """Build a unique id for a Google Drive indexing run.""" - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") - items_info = f"_{folder_count}f_{file_count}files" - return f"drive_{connector_id}_{timestamp}{items_info}" - async def notify_indexing_started( self, session: AsyncSession, @@ -50,7 +29,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): end_date: str | None = None, ) -> Notification: """Open (or refresh) the notification when indexing starts.""" - operation_id = self._generate_operation_id(connector_id, start_date, end_date) + operation_id = msg.operation_id(connector_id, start_date, end_date) title = f"Syncing: {connector_name}" message = "Connecting to your account" @@ -84,31 +63,13 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): stage_message: str | None = None, ) -> Notification: """Update the notification with indexing progress.""" - stage_messages = { - "connecting": "Connecting to your account", - "fetching": "Fetching your content", - "processing": "Preparing for search", - "storing": "Almost done", - } - - if stage or stage_message: - progress_msg = stage_message or stage_messages.get(stage, "Processing") - else: - # Legacy callers that pass neither stage nor message. - progress_msg = "Fetching your content" - - metadata_updates = {"indexed_count": indexed_count} - if total_count is not None: - metadata_updates["total_count"] = total_count - progress_percent = int((indexed_count / total_count) * 100) - metadata_updates["progress_percent"] = progress_percent - if stage: - metadata_updates["sync_stage"] = stage - + message, metadata_updates = msg.progress( + indexed_count, total_count, stage, stage_message + ) return await self.update_notification( session=session, notification=notification, - message=progress_msg, + message=message, status="in_progress", metadata_updates=metadata_updates, ) @@ -124,47 +85,19 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): wait_seconds: float | None = None, service_name: str | None = None, ) -> Notification: - """Surface that an external service is rate-limiting/retrying. - - Reusable by any connector; frames the delay as the provider's, not ours. - """ - if not service_name: - service_name = notification.notification_metadata.get( - "connector_name", "Service" - ) - # Strip the workspace suffix, e.g. "Notion - My Workspace" -> "Notion". - if " - " in service_name: - service_name = service_name.split(" - ")[0] - - # Worded so the delay reads as the provider's, not ours. - retry_messages = { - "rate_limit": f"{service_name} rate limit reached", - "server_error": f"{service_name} is slow to respond", - "timeout": f"{service_name} took too long", - "temporary_error": f"{service_name} temporarily unavailable", - } - - base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}") - - # Only surface a wait time when it's long enough to be worth showing. - if wait_seconds and wait_seconds > 5: - message = f"{base_message}. Retrying in {int(wait_seconds)}s..." - else: - message = f"{base_message}. Retrying..." - - if indexed_count > 0: - item_text = "item" if indexed_count == 1 else "items" - message = f"{message} ({indexed_count} {item_text} synced so far)" - - metadata_updates = { - "indexed_count": indexed_count, - "sync_stage": "waiting_retry", - "retry_attempt": attempt, - "retry_max_attempts": max_attempts, - "retry_reason": retry_reason, - "retry_wait_seconds": wait_seconds, - } - + """Surface that an external service is rate-limiting/retrying.""" + connector_name = notification.notification_metadata.get( + "connector_name", "Service" + ) + message, metadata_updates = msg.retry( + connector_name, + indexed_count, + retry_reason, + attempt, + max_attempts, + wait_seconds, + service_name, + ) return await self.update_notification( session=session, notification=notification, @@ -187,52 +120,14 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): connector_name = notification.notification_metadata.get( "connector_name", "Connector" ) - - unsupported_text = "" - if unsupported_count and unsupported_count > 0: - file_word = "file was" if unsupported_count == 1 else "files were" - unsupported_text = f" {unsupported_count} {file_word} not supported." - - if error_message: - if indexed_count > 0: - title = f"Ready: {connector_name}" - file_text = "file" if indexed_count == 1 else "files" - message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}" - status = "completed" - elif is_warning: - title = f"Ready: {connector_name}" - message = f"Sync complete.{unsupported_text} {error_message}" - status = "completed" - else: - title = f"Failed: {connector_name}" - message = f"Sync failed: {error_message}" - if unsupported_text: - message += unsupported_text - status = "failed" - else: - title = f"Ready: {connector_name}" - if indexed_count == 0: - if unsupported_count and unsupported_count > 0: - message = f"Sync complete.{unsupported_text}" - else: - message = "Already up to date!" - else: - file_text = "file" if indexed_count == 1 else "files" - message = f"Now searchable! {indexed_count} {file_text} synced." - if unsupported_text: - message += unsupported_text - status = "completed" - - metadata_updates = { - "indexed_count": indexed_count, - "skipped_count": skipped_count or 0, - "unsupported_count": unsupported_count or 0, - "sync_stage": "completed" - if (not error_message or is_warning or indexed_count > 0) - else "failed", - "error_message": error_message, - } - + title, message, status, metadata_updates = msg.completion( + connector_name, + indexed_count, + error_message, + is_warning, + skipped_count, + unsupported_count, + ) return await self.update_notification( session=session, notification=notification, @@ -256,7 +151,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): file_names: list[str] | None = None, ) -> Notification: """Open (or refresh) the notification when Drive indexing starts.""" - operation_id = self._generate_google_drive_operation_id( + operation_id = msg.google_drive_operation_id( connector_id, folder_count, file_count ) title = f"Syncing: {connector_name}" diff --git a/surfsense_backend/app/notifications/service/handlers/document_processing.py b/surfsense_backend/app/notifications/service/handlers/document_processing.py index 2b162a053..8644df2c8 100644 --- a/surfsense_backend/app/notifications/service/handlers/document_processing.py +++ b/surfsense_backend/app/notifications/service/handlers/document_processing.py @@ -2,13 +2,13 @@ from __future__ import annotations -from datetime import UTC, datetime from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from app.notifications.persistence import Notification from app.notifications.service.base import BaseNotificationHandler +from app.notifications.service.messages import document_processing as msg class DocumentProcessingNotificationHandler(BaseNotificationHandler): @@ -17,17 +17,6 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): def __init__(self): super().__init__("document_processing") - def _generate_operation_id( - self, document_type: str, filename: str, search_space_id: int - ) -> str: - """Build a unique id for a document processing run.""" - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") - # Create a short hash of filename to ensure uniqueness - import hashlib - - filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8] - return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}" - async def notify_processing_started( self, session: AsyncSession, @@ -38,9 +27,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): file_size: int | None = None, ) -> Notification: """Open the notification when document processing is queued.""" - operation_id = self._generate_operation_id( - document_type, document_name, search_space_id - ) + operation_id = msg.operation_id(document_type, document_name, search_space_id) title = f"Processing: {document_name}" message = "Waiting in queue" @@ -72,19 +59,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): chunks_count: int | None = None, ) -> Notification: """Update the notification with the current processing stage.""" - stage_messages = { - "parsing": "Reading your file", - "chunking": "Preparing for search", - "embedding": "Preparing for search", - "storing": "Finalizing", - } - - message = stage_message or stage_messages.get(stage, "Processing") - - metadata_updates = {"processing_stage": stage} - # Store chunks_count in metadata for debugging, but don't show to user - if chunks_count is not None: - metadata_updates["chunks_count"] = chunks_count + message, metadata_updates = msg.progress(stage, stage_message, chunks_count) return await self.update_notification( session=session, @@ -106,26 +81,9 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler): document_name = notification.notification_metadata.get( "document_name", "Document" ) - - if error_message: - title = f"Failed: {document_name}" - message = f"Processing failed: {error_message}" - status = "failed" - else: - title = f"Ready: {document_name}" - message = "Now searchable!" - status = "completed" - - metadata_updates = { - "processing_stage": "completed" if not error_message else "failed", - "error_message": error_message, - } - - if document_id is not None: - metadata_updates["document_id"] = document_id - # Store chunks_count in metadata for debugging, but don't show to user - if chunks_count is not None: - metadata_updates["chunks_count"] = chunks_count + title, message, status, metadata_updates = msg.completion( + document_name, error_message, document_id, chunks_count + ) return await self.update_notification( session=session, diff --git a/surfsense_backend/app/notifications/service/handlers/page_limit.py b/surfsense_backend/app/notifications/service/handlers/page_limit.py index 00e8dfc18..90722dc62 100644 --- a/surfsense_backend/app/notifications/service/handlers/page_limit.py +++ b/surfsense_backend/app/notifications/service/handlers/page_limit.py @@ -3,13 +3,13 @@ from __future__ import annotations import logging -from datetime import UTC, datetime from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from app.notifications.persistence import Notification from app.notifications.service.base import BaseNotificationHandler +from app.notifications.service.messages import page_limit as msg logger = logging.getLogger(__name__) @@ -20,15 +20,6 @@ class PageLimitNotificationHandler(BaseNotificationHandler): def __init__(self): super().__init__("page_limit_exceeded") - def _generate_operation_id(self, document_name: str, search_space_id: int) -> str: - """Build a unique id for a page-limit notification.""" - import hashlib - - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f") - # Create a short hash of document name to ensure uniqueness - doc_hash = hashlib.md5(document_name.encode()).hexdigest()[:8] - return f"page_limit_{search_space_id}_{timestamp}_{doc_hash}" - async def notify_page_limit_exceeded( self, session: AsyncSession, @@ -41,13 +32,10 @@ class PageLimitNotificationHandler(BaseNotificationHandler): pages_to_add: int, ) -> Notification: """Notify that a document was blocked by the page limit.""" - operation_id = self._generate_operation_id(document_name, search_space_id) - - display_name = ( - document_name[:40] + "..." if len(document_name) > 40 else document_name + operation_id = msg.operation_id(document_name, search_space_id) + title, message = msg.summary( + document_name, pages_used, pages_limit, pages_to_add ) - title = f"Page limit exceeded: {display_name}" - message = f"This document has ~{pages_to_add} page(s) but you've used {pages_used}/{pages_limit} pages. Upgrade to process more documents." metadata = { "operation_id": operation_id,