feat: add pure notification message helpers

This commit is contained in:
CREDO23 2026-06-03 21:52:58 +02:00
parent f3ed1b85d0
commit c23bdc4a5e
5 changed files with 267 additions and 0 deletions

View file

@ -0,0 +1,6 @@
"""Pure, side-effect-free presentation logic for notifications.
Handlers compute their user-facing title/message/status/metadata here, then
persist the result. Keeping this layer free of I/O makes it unit-testable
without a database.
"""

View file

@ -0,0 +1,164 @@
"""Pure presentation logic for connector-indexing notifications."""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any
def operation_id(
connector_id: int,
start_date: str | None = None,
end_date: str | None = None,
) -> str:
"""Build a unique id for a connector indexing run."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
date_range = ""
if start_date or end_date:
date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
return f"connector_{connector_id}_{timestamp}{date_range}"
def google_drive_operation_id(
connector_id: int, folder_count: int, file_count: int
) -> str:
"""Build a unique id for a Google Drive indexing run."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
items_info = f"_{folder_count}f_{file_count}files"
return f"drive_{connector_id}_{timestamp}{items_info}"
def progress(
indexed_count: int,
total_count: int | None = None,
stage: str | None = None,
stage_message: str | None = None,
) -> tuple[str, dict[str, Any]]:
"""Compute the progress message and metadata updates for an indexing run."""
stage_messages = {
"connecting": "Connecting to your account",
"fetching": "Fetching your content",
"processing": "Preparing for search",
"storing": "Almost done",
}
if stage or stage_message:
progress_msg = stage_message or stage_messages.get(stage, "Processing")
else:
# Legacy callers that pass neither stage nor message.
progress_msg = "Fetching your content"
metadata_updates: dict[str, Any] = {"indexed_count": indexed_count}
if total_count is not None:
metadata_updates["total_count"] = total_count
progress_percent = int((indexed_count / total_count) * 100)
metadata_updates["progress_percent"] = progress_percent
if stage:
metadata_updates["sync_stage"] = stage
return progress_msg, metadata_updates
def retry(
connector_name: str,
indexed_count: int,
retry_reason: str,
attempt: int,
max_attempts: int,
wait_seconds: float | None = None,
service_name: str | None = None,
) -> tuple[str, dict[str, Any]]:
"""Compute the retry message and metadata, framing the delay as the provider's."""
if not service_name:
service_name = connector_name
# Strip the workspace suffix, e.g. "Notion - My Workspace" -> "Notion".
if " - " in service_name:
service_name = service_name.split(" - ")[0]
# Worded so the delay reads as the provider's, not ours.
retry_messages = {
"rate_limit": f"{service_name} rate limit reached",
"server_error": f"{service_name} is slow to respond",
"timeout": f"{service_name} took too long",
"temporary_error": f"{service_name} temporarily unavailable",
}
base_message = retry_messages.get(retry_reason, f"Waiting for {service_name}")
# Only surface a wait time when it's long enough to be worth showing.
if wait_seconds and wait_seconds > 5:
message = f"{base_message}. Retrying in {int(wait_seconds)}s..."
else:
message = f"{base_message}. Retrying..."
if indexed_count > 0:
item_text = "item" if indexed_count == 1 else "items"
message = f"{message} ({indexed_count} {item_text} synced so far)"
metadata_updates = {
"indexed_count": indexed_count,
"sync_stage": "waiting_retry",
"retry_attempt": attempt,
"retry_max_attempts": max_attempts,
"retry_reason": retry_reason,
"retry_wait_seconds": wait_seconds,
}
return message, metadata_updates
def completion(
connector_name: str,
indexed_count: int,
error_message: str | None = None,
is_warning: bool = False,
skipped_count: int | None = None,
unsupported_count: int | None = None,
) -> tuple[str, str, str, dict[str, Any]]:
"""Compute the final title, message, status, and metadata for a finished run."""
unsupported_text = ""
if unsupported_count and unsupported_count > 0:
file_word = "file was" if unsupported_count == 1 else "files were"
unsupported_text = f" {unsupported_count} {file_word} not supported."
if error_message:
if indexed_count > 0:
title = f"Ready: {connector_name}"
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced.{unsupported_text} Note: {error_message}"
status = "completed"
elif is_warning:
title = f"Ready: {connector_name}"
message = f"Sync complete.{unsupported_text} {error_message}"
status = "completed"
else:
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
if unsupported_text:
message += unsupported_text
status = "failed"
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
if unsupported_count and unsupported_count > 0:
message = f"Sync complete.{unsupported_text}"
else:
message = "Already up to date!"
else:
file_text = "file" if indexed_count == 1 else "files"
message = f"Now searchable! {indexed_count} {file_text} synced."
if unsupported_text:
message += unsupported_text
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"unsupported_count": unsupported_count or 0,
"sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",
"error_message": error_message,
}
return title, message, status, metadata_updates

View file

@ -0,0 +1,64 @@
"""Pure presentation logic for document-processing notifications."""
from __future__ import annotations
import hashlib
from datetime import UTC, datetime
from typing import Any
def operation_id(document_type: str, filename: str, search_space_id: int) -> str:
"""Build a unique id for a document processing run."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}"
def progress(
stage: str,
stage_message: str | None = None,
chunks_count: int | None = None,
) -> tuple[str, dict[str, Any]]:
"""Compute the progress message and metadata updates for a processing run."""
stage_messages = {
"parsing": "Reading your file",
"chunking": "Preparing for search",
"embedding": "Preparing for search",
"storing": "Finalizing",
}
message = stage_message or stage_messages.get(stage, "Processing")
metadata_updates: dict[str, Any] = {"processing_stage": stage}
if chunks_count is not None:
metadata_updates["chunks_count"] = chunks_count
return message, metadata_updates
def completion(
document_name: str,
error_message: str | None = None,
document_id: int | None = None,
chunks_count: int | None = None,
) -> tuple[str, str, str, dict[str, Any]]:
"""Compute the final title, message, status, and metadata for a finished run."""
if error_message:
title = f"Failed: {document_name}"
message = f"Processing failed: {error_message}"
status = "failed"
else:
title = f"Ready: {document_name}"
message = "Now searchable!"
status = "completed"
metadata_updates: dict[str, Any] = {
"processing_stage": "completed" if not error_message else "failed",
"error_message": error_message,
}
if document_id is not None:
metadata_updates["document_id"] = document_id
if chunks_count is not None:
metadata_updates["chunks_count"] = chunks_count
return title, message, status, metadata_updates

View file

@ -0,0 +1,25 @@
"""Pure presentation logic for page-limit notifications."""
from __future__ import annotations
import hashlib
from datetime import UTC, datetime
from app.notifications.service.messages.text import truncate
def operation_id(document_name: str, search_space_id: int) -> str:
"""Build a unique id for a page-limit notification."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
doc_hash = hashlib.md5(document_name.encode()).hexdigest()[:8]
return f"page_limit_{search_space_id}_{timestamp}_{doc_hash}"
def summary(
document_name: str, pages_used: int, pages_limit: int, pages_to_add: int
) -> tuple[str, str]:
"""Compute the title and message for a blocked-by-page-limit document."""
display_name = truncate(document_name, 40)
title = f"Page limit exceeded: {display_name}"
message = f"This document has ~{pages_to_add} page(s) but you've used {pages_used}/{pages_limit} pages. Upgrade to process more documents."
return title, message

View file

@ -0,0 +1,8 @@
"""Shared text helpers for notification copy."""
from __future__ import annotations
def truncate(text: str, limit: int) -> str:
"""Return ``text`` capped at ``limit`` chars, appending an ellipsis if cut."""
return text[:limit] + "..." if len(text) > limit else text