mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-06 20:15:17 +02:00
refactor: route indexing handlers through message helpers
This commit is contained in:
parent
7618c3aafb
commit
778ab9b254
3 changed files with 38 additions and 197 deletions
|
|
@ -2,13 +2,13 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.notifications.persistence import Notification
|
||||
from app.notifications.service.base import BaseNotificationHandler
|
||||
from app.notifications.service.messages import connector_indexing as msg
|
||||
|
||||
|
||||
class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
||||
|
|
@ -17,27 +17,6 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
def __init__(self):
|
||||
super().__init__("connector_indexing")
|
||||
|
||||
def _generate_operation_id(
|
||||
self,
|
||||
connector_id: int,
|
||||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
) -> str:
|
||||
"""Build a unique id for a connector indexing run."""
|
||||
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
|
||||
date_range = ""
|
||||
if start_date or end_date:
|
||||
date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
|
||||
return f"connector_{connector_id}_{timestamp}{date_range}"
|
||||
|
||||
def _generate_google_drive_operation_id(
|
||||
self, connector_id: int, folder_count: int, file_count: int
|
||||
) -> str:
|
||||
"""Build a unique id for a Google Drive indexing run."""
|
||||
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
|
||||
items_info = f"_{folder_count}f_{file_count}files"
|
||||
return f"drive_{connector_id}_{timestamp}{items_info}"
|
||||
|
||||
async def notify_indexing_started(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
|
|
@ -50,7 +29,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
end_date: str | None = None,
|
||||
) -> Notification:
|
||||
"""Open (or refresh) the notification when indexing starts."""
|
||||
operation_id = self._generate_operation_id(connector_id, start_date, end_date)
|
||||
operation_id = msg.operation_id(connector_id, start_date, end_date)
|
||||
title = f"Syncing: {connector_name}"
|
||||
message = "Connecting to your account"
|
||||
|
||||
|
|
@ -84,31 +63,13 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
stage_message: str | None = None,
|
||||
) -> Notification:
|
||||
"""Update the notification with indexing progress."""
|
||||
stage_messages = {
|
||||
"connecting": "Connecting to your account",
|
||||
"fetching": "Fetching your content",
|
||||
"processing": "Preparing for search",
|
||||
"storing": "Almost done",
|
||||
}
|
||||
|
||||
if stage or stage_message:
|
||||
progress_msg = stage_message or stage_messages.get(stage, "Processing")
|
||||
else:
|
||||
# Legacy callers that pass neither stage nor message.
|
||||
progress_msg = "Fetching your content"
|
||||
|
||||
metadata_updates = {"indexed_count": indexed_count}
|
||||
if total_count is not None:
|
||||
metadata_updates["total_count"] = total_count
|
||||
progress_percent = int((indexed_count / total_count) * 100)
|
||||
metadata_updates["progress_percent"] = progress_percent
|
||||
if stage:
|
||||
metadata_updates["sync_stage"] = stage
|
||||
|
||||
message, metadata_updates = msg.progress(
|
||||
indexed_count, total_count, stage, stage_message
|
||||
)
|
||||
return await self.update_notification(
|
||||
session=session,
|
||||
notification=notification,
|
||||
message=progress_msg,
|
||||
message=message,
|
||||
status="in_progress",
|
||||
metadata_updates=metadata_updates,
|
||||
)
|
||||
|
|
@ -124,47 +85,19 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
wait_seconds: float | None = None,
|
||||
service_name: str | None = None,
|
||||
) -> Notification:
|
||||
"""Surface that an external service is rate-limiting/retrying.
|
||||
|
||||
Reusable by any connector; frames the delay as the provider's, not ours.
|
||||
"""
|
||||
if not service_name:
|
||||
service_name = notification.notification_metadata.get(
|
||||
"connector_name", "Service"
|
||||
)
|
||||
# Strip the workspace suffix, e.g. "Notion - My Workspace" -> "Notion".
|
||||
if " - " in service_name:
|
||||
service_name = service_name.split(" - ")[0]
|
||||
|
||||
# Worded so the delay reads as the provider's, not ours.
|
||||
retry_messages = {
|
||||
"rate_limit": f"{service_name} rate limit reached",
|
||||
"server_error": f"{service_name} is slow to respond",
|
||||
"timeout": f"{service_name} took too long",
|
||||
"temporary_error": f"{service_name} temporarily unavailable",
|
||||
}
|
||||
|
||||
base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}")
|
||||
|
||||
# Only surface a wait time when it's long enough to be worth showing.
|
||||
if wait_seconds and wait_seconds > 5:
|
||||
message = f"{base_message}. Retrying in {int(wait_seconds)}s..."
|
||||
else:
|
||||
message = f"{base_message}. Retrying..."
|
||||
|
||||
if indexed_count > 0:
|
||||
item_text = "item" if indexed_count == 1 else "items"
|
||||
message = f"{message} ({indexed_count} {item_text} synced so far)"
|
||||
|
||||
metadata_updates = {
|
||||
"indexed_count": indexed_count,
|
||||
"sync_stage": "waiting_retry",
|
||||
"retry_attempt": attempt,
|
||||
"retry_max_attempts": max_attempts,
|
||||
"retry_reason": retry_reason,
|
||||
"retry_wait_seconds": wait_seconds,
|
||||
}
|
||||
|
||||
"""Surface that an external service is rate-limiting/retrying."""
|
||||
connector_name = notification.notification_metadata.get(
|
||||
"connector_name", "Service"
|
||||
)
|
||||
message, metadata_updates = msg.retry(
|
||||
connector_name,
|
||||
indexed_count,
|
||||
retry_reason,
|
||||
attempt,
|
||||
max_attempts,
|
||||
wait_seconds,
|
||||
service_name,
|
||||
)
|
||||
return await self.update_notification(
|
||||
session=session,
|
||||
notification=notification,
|
||||
|
|
@ -187,52 +120,14 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
connector_name = notification.notification_metadata.get(
|
||||
"connector_name", "Connector"
|
||||
)
|
||||
|
||||
unsupported_text = ""
|
||||
if unsupported_count and unsupported_count > 0:
|
||||
file_word = "file was" if unsupported_count == 1 else "files were"
|
||||
unsupported_text = f" {unsupported_count} {file_word} not supported."
|
||||
|
||||
if error_message:
|
||||
if indexed_count > 0:
|
||||
title = f"Ready: {connector_name}"
|
||||
file_text = "file" if indexed_count == 1 else "files"
|
||||
message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}"
|
||||
status = "completed"
|
||||
elif is_warning:
|
||||
title = f"Ready: {connector_name}"
|
||||
message = f"Sync complete.{unsupported_text} {error_message}"
|
||||
status = "completed"
|
||||
else:
|
||||
title = f"Failed: {connector_name}"
|
||||
message = f"Sync failed: {error_message}"
|
||||
if unsupported_text:
|
||||
message += unsupported_text
|
||||
status = "failed"
|
||||
else:
|
||||
title = f"Ready: {connector_name}"
|
||||
if indexed_count == 0:
|
||||
if unsupported_count and unsupported_count > 0:
|
||||
message = f"Sync complete.{unsupported_text}"
|
||||
else:
|
||||
message = "Already up to date!"
|
||||
else:
|
||||
file_text = "file" if indexed_count == 1 else "files"
|
||||
message = f"Now searchable! {indexed_count} {file_text} synced."
|
||||
if unsupported_text:
|
||||
message += unsupported_text
|
||||
status = "completed"
|
||||
|
||||
metadata_updates = {
|
||||
"indexed_count": indexed_count,
|
||||
"skipped_count": skipped_count or 0,
|
||||
"unsupported_count": unsupported_count or 0,
|
||||
"sync_stage": "completed"
|
||||
if (not error_message or is_warning or indexed_count > 0)
|
||||
else "failed",
|
||||
"error_message": error_message,
|
||||
}
|
||||
|
||||
title, message, status, metadata_updates = msg.completion(
|
||||
connector_name,
|
||||
indexed_count,
|
||||
error_message,
|
||||
is_warning,
|
||||
skipped_count,
|
||||
unsupported_count,
|
||||
)
|
||||
return await self.update_notification(
|
||||
session=session,
|
||||
notification=notification,
|
||||
|
|
@ -256,7 +151,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
file_names: list[str] | None = None,
|
||||
) -> Notification:
|
||||
"""Open (or refresh) the notification when Drive indexing starts."""
|
||||
operation_id = self._generate_google_drive_operation_id(
|
||||
operation_id = msg.google_drive_operation_id(
|
||||
connector_id, folder_count, file_count
|
||||
)
|
||||
title = f"Syncing: {connector_name}"
|
||||
|
|
|
|||
|
|
@ -2,13 +2,13 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.notifications.persistence import Notification
|
||||
from app.notifications.service.base import BaseNotificationHandler
|
||||
from app.notifications.service.messages import document_processing as msg
|
||||
|
||||
|
||||
class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
||||
|
|
@ -17,17 +17,6 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
def __init__(self):
|
||||
super().__init__("document_processing")
|
||||
|
||||
def _generate_operation_id(
|
||||
self, document_type: str, filename: str, search_space_id: int
|
||||
) -> str:
|
||||
"""Build a unique id for a document processing run."""
|
||||
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
|
||||
# Create a short hash of filename to ensure uniqueness
|
||||
import hashlib
|
||||
|
||||
filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
|
||||
return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}"
|
||||
|
||||
async def notify_processing_started(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
|
|
@ -38,9 +27,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
file_size: int | None = None,
|
||||
) -> Notification:
|
||||
"""Open the notification when document processing is queued."""
|
||||
operation_id = self._generate_operation_id(
|
||||
document_type, document_name, search_space_id
|
||||
)
|
||||
operation_id = msg.operation_id(document_type, document_name, search_space_id)
|
||||
title = f"Processing: {document_name}"
|
||||
message = "Waiting in queue"
|
||||
|
||||
|
|
@ -72,19 +59,7 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
chunks_count: int | None = None,
|
||||
) -> Notification:
|
||||
"""Update the notification with the current processing stage."""
|
||||
stage_messages = {
|
||||
"parsing": "Reading your file",
|
||||
"chunking": "Preparing for search",
|
||||
"embedding": "Preparing for search",
|
||||
"storing": "Finalizing",
|
||||
}
|
||||
|
||||
message = stage_message or stage_messages.get(stage, "Processing")
|
||||
|
||||
metadata_updates = {"processing_stage": stage}
|
||||
# Store chunks_count in metadata for debugging, but don't show to user
|
||||
if chunks_count is not None:
|
||||
metadata_updates["chunks_count"] = chunks_count
|
||||
message, metadata_updates = msg.progress(stage, stage_message, chunks_count)
|
||||
|
||||
return await self.update_notification(
|
||||
session=session,
|
||||
|
|
@ -106,26 +81,9 @@ class DocumentProcessingNotificationHandler(BaseNotificationHandler):
|
|||
document_name = notification.notification_metadata.get(
|
||||
"document_name", "Document"
|
||||
)
|
||||
|
||||
if error_message:
|
||||
title = f"Failed: {document_name}"
|
||||
message = f"Processing failed: {error_message}"
|
||||
status = "failed"
|
||||
else:
|
||||
title = f"Ready: {document_name}"
|
||||
message = "Now searchable!"
|
||||
status = "completed"
|
||||
|
||||
metadata_updates = {
|
||||
"processing_stage": "completed" if not error_message else "failed",
|
||||
"error_message": error_message,
|
||||
}
|
||||
|
||||
if document_id is not None:
|
||||
metadata_updates["document_id"] = document_id
|
||||
# Store chunks_count in metadata for debugging, but don't show to user
|
||||
if chunks_count is not None:
|
||||
metadata_updates["chunks_count"] = chunks_count
|
||||
title, message, status, metadata_updates = msg.completion(
|
||||
document_name, error_message, document_id, chunks_count
|
||||
)
|
||||
|
||||
return await self.update_notification(
|
||||
session=session,
|
||||
|
|
|
|||
|
|
@ -3,13 +3,13 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.notifications.persistence import Notification
|
||||
from app.notifications.service.base import BaseNotificationHandler
|
||||
from app.notifications.service.messages import page_limit as msg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -20,15 +20,6 @@ class PageLimitNotificationHandler(BaseNotificationHandler):
|
|||
def __init__(self):
|
||||
super().__init__("page_limit_exceeded")
|
||||
|
||||
def _generate_operation_id(self, document_name: str, search_space_id: int) -> str:
|
||||
"""Build a unique id for a page-limit notification."""
|
||||
import hashlib
|
||||
|
||||
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
|
||||
# Create a short hash of document name to ensure uniqueness
|
||||
doc_hash = hashlib.md5(document_name.encode()).hexdigest()[:8]
|
||||
return f"page_limit_{search_space_id}_{timestamp}_{doc_hash}"
|
||||
|
||||
async def notify_page_limit_exceeded(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
|
|
@ -41,13 +32,10 @@ class PageLimitNotificationHandler(BaseNotificationHandler):
|
|||
pages_to_add: int,
|
||||
) -> Notification:
|
||||
"""Notify that a document was blocked by the page limit."""
|
||||
operation_id = self._generate_operation_id(document_name, search_space_id)
|
||||
|
||||
display_name = (
|
||||
document_name[:40] + "..." if len(document_name) > 40 else document_name
|
||||
operation_id = msg.operation_id(document_name, search_space_id)
|
||||
title, message = msg.summary(
|
||||
document_name, pages_used, pages_limit, pages_to_add
|
||||
)
|
||||
title = f"Page limit exceeded: {display_name}"
|
||||
message = f"This document has ~{pages_to_add} page(s) but you've used {pages_used}/{pages_limit} pages. Upgrade to process more documents."
|
||||
|
||||
metadata = {
|
||||
"operation_id": operation_id,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue