diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py
index ec3f78a4b..4dbc09cf8 100644
--- a/surfsense_backend/app/db.py
+++ b/surfsense_backend/app/db.py
@@ -684,7 +684,7 @@ class Notification(BaseModel, TimestampMixin):
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True
)
- type = Column(String(50), nullable=False) # 'document_processed', 'connector_indexed', 'user_mentioned', etc.
+ type = Column(String(50), nullable=False) # 'connector_indexing', 'document_processing', etc.
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)
read = Column(Boolean, nullable=False, default=False, server_default=text("false"), index=True)
diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py
index fa9bdcda5..9be9895eb 100644
--- a/surfsense_backend/app/routes/search_source_connectors_routes.py
+++ b/surfsense_backend/app/routes/search_source_connectors_routes.py
@@ -1003,6 +1003,15 @@ async def _run_indexing_with_notifications(
end_date=end_date,
)
+ # Update notification to fetching stage
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_progress(
+ session=session,
+ notification=notification,
+ indexed_count=0,
+ stage="fetching",
+ )
+
# Run the indexing function
documents_processed, error_or_warning = await indexing_function(
session=session,
@@ -1016,6 +1025,15 @@ async def _run_indexing_with_notifications(
# Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func:
+ # Update notification to storing stage
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_progress(
+ session=session,
+ notification=notification,
+ indexed_count=documents_processed,
+ stage="storing",
+ )
+
await update_timestamp_func(session, connector_id)
logger.info(
f"Indexing completed successfully: {documents_processed} documents processed"
@@ -1030,6 +1048,15 @@ async def _run_indexing_with_notifications(
error_message=None,
)
elif documents_processed > 0:
+ # Update notification to storing stage
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_progress(
+ session=session,
+ notification=notification,
+ indexed_count=documents_processed,
+ stage="storing",
+ )
+
# Success but no timestamp update function
logger.info(
f"Indexing completed successfully: {documents_processed} documents processed"
@@ -1693,6 +1720,15 @@ async def run_google_drive_indexing(
file_names=items.get_file_names() if items.files else None,
)
+ # Update notification to fetching stage
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_progress(
+ session=session,
+ notification=notification,
+ indexed_count=0,
+ stage="fetching",
+ )
+
# Index each folder
for folder in items.folders:
try:
@@ -1747,6 +1783,15 @@ async def run_google_drive_indexing(
f"Google Drive indexing completed with errors for connector {connector_id}: {error_message}"
)
else:
+ # Update notification to storing stage
+ if notification:
+ await NotificationService.connector_indexing.notify_indexing_progress(
+ session=session,
+ notification=notification,
+ indexed_count=total_indexed,
+ stage="storing",
+ )
+
logger.info(
f"Google Drive indexing successful for connector {connector_id}. Indexed {total_indexed} documents from {len(items.folders)} folder(s) and {len(items.files)} file(s)."
)
diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py
index fd53c7e63..0988eb034 100644
--- a/surfsense_backend/app/services/notification_service.py
+++ b/surfsense_backend/app/services/notification_service.py
@@ -245,8 +245,8 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
Notification: The created or updated notification
"""
operation_id = self._generate_operation_id(connector_id, start_date, end_date)
- title = f"Indexing: {connector_name}"
- message = f'Indexing "{connector_name}" in progress...'
+ title = f"Syncing: {connector_name}"
+ message = "Connecting to your account"
metadata = {
"connector_id": connector_id,
@@ -255,6 +255,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"start_date": start_date,
"end_date": end_date,
"indexed_count": 0,
+ "sync_stage": "connecting",
}
return await self.find_or_create_notification(
@@ -273,6 +274,8 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
notification: Notification,
indexed_count: int,
total_count: int | None = None,
+ stage: str | None = None,
+ stage_message: str | None = None,
) -> Notification:
"""
Update notification with indexing progress.
@@ -282,21 +285,34 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
notification: Notification to update
indexed_count: Number of items indexed so far
total_count: Total number of items (optional)
+ stage: Current sync stage (fetching, processing, storing) (optional)
+ stage_message: Optional custom message for the stage
Returns:
Updated notification
"""
- connector_name = notification.notification_metadata.get("connector_name", "Connector")
- progress_msg = f'Indexing "{connector_name}": {indexed_count} items'
- if total_count is not None:
- progress_msg += f" of {total_count}"
- progress_msg += " indexed..."
+ # User-friendly stage messages (clean, no ellipsis - spinner shows activity)
+ stage_messages = {
+ "connecting": "Connecting to your account",
+ "fetching": "Fetching your content",
+ "processing": "Preparing for search",
+ "storing": "Almost done",
+ }
+
+ # Use stage-based message if stage provided, otherwise fallback
+ if stage or stage_message:
+ progress_msg = stage_message or stage_messages.get(stage, "Processing")
+ else:
+ # Fallback for backward compatibility
+ progress_msg = "Fetching your content"
metadata_updates = {"indexed_count": indexed_count}
if total_count is not None:
metadata_updates["total_count"] = total_count
progress_percent = int((indexed_count / total_count) * 100)
metadata_updates["progress_percent"] = progress_percent
+ if stage:
+ metadata_updates["sync_stage"] = stage
return await self.update_notification(
session=session,
@@ -328,16 +344,18 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
connector_name = notification.notification_metadata.get("connector_name", "Connector")
if error_message:
- title = f"Indexing failed: {connector_name}"
- message = f'Indexing "{connector_name}" failed: {error_message}'
+ title = f"Failed: {connector_name}"
+ message = f"Sync failed: {error_message}"
status = "failed"
else:
- title = f"Indexing completed: {connector_name}"
- message = f'Indexing "{connector_name}" completed successfully. {indexed_count} items indexed.'
+ title = f"Ready: {connector_name}"
+ item_text = "item" if indexed_count == 1 else "items"
+ message = f"Now searchable! {indexed_count} {item_text} synced."
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
+ "sync_stage": "completed" if not error_message else "failed",
"error_message": error_message,
}
@@ -384,16 +402,8 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
operation_id = self._generate_google_drive_operation_id(
connector_id, folder_count, file_count
)
- title = f"Indexing: {connector_name}"
-
- # Create descriptive message
- items_desc = []
- if folder_count > 0:
- items_desc.append(f"{folder_count} folder{'s' if folder_count != 1 else ''}")
- if file_count > 0:
- items_desc.append(f"{file_count} file{'s' if file_count != 1 else ''}")
-
- message = f'Indexing "{connector_name}" ({", ".join(items_desc)}) in progress...'
+ title = f"Syncing: {connector_name}"
+ message = "Preparing your files"
metadata = {
"connector_id": connector_id,
@@ -402,6 +412,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"folder_count": folder_count,
"file_count": file_count,
"indexed_count": 0,
+ "sync_stage": "connecting",
}
if folder_names:
@@ -420,11 +431,181 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
)
+class DocumentProcessingNotificationHandler(BaseNotificationHandler):
+ """Handler for document processing notifications."""
+
+ def __init__(self):
+ super().__init__("document_processing")
+
+ def _generate_operation_id(
+ self, document_type: str, filename: str, search_space_id: int
+ ) -> str:
+ """
+ Generate a unique operation ID for a document processing operation.
+
+ Args:
+ document_type: Type of document (FILE, YOUTUBE_VIDEO, CRAWLED_URL, etc.)
+ filename: Name of the file/document
+ search_space_id: Search space ID
+
+ Returns:
+ Unique operation ID string
+ """
+ timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
+ # Create a short hash of filename to ensure uniqueness
+ import hashlib
+ filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
+ return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}"
+
+ async def notify_processing_started(
+ self,
+ session: AsyncSession,
+ user_id: UUID,
+ document_type: str,
+ document_name: str,
+ search_space_id: int,
+ file_size: int | None = None,
+ ) -> Notification:
+ """
+ Create notification when document processing starts.
+
+ Args:
+ session: Database session
+ user_id: User ID
+ document_type: Type of document (FILE, YOUTUBE_VIDEO, CRAWLED_URL, etc.)
+ document_name: Name/title of the document
+ search_space_id: Search space ID
+ file_size: Size of file in bytes (optional)
+
+ Returns:
+ Notification: The created notification
+ """
+ operation_id = self._generate_operation_id(document_type, document_name, search_space_id)
+ title = f"Processing: {document_name}"
+ message = "Waiting in queue"
+
+ metadata = {
+ "document_type": document_type,
+ "document_name": document_name,
+ "processing_stage": "queued",
+ }
+
+ if file_size is not None:
+ metadata["file_size"] = file_size
+
+ return await self.find_or_create_notification(
+ session=session,
+ user_id=user_id,
+ operation_id=operation_id,
+ title=title,
+ message=message,
+ search_space_id=search_space_id,
+ initial_metadata=metadata,
+ )
+
+ async def notify_processing_progress(
+ self,
+ session: AsyncSession,
+ notification: Notification,
+ stage: str,
+ stage_message: str | None = None,
+ chunks_count: int | None = None,
+ ) -> Notification:
+ """
+ Update notification with processing progress.
+
+ Args:
+ session: Database session
+ notification: Notification to update
+ stage: Current processing stage (parsing, chunking, embedding, storing)
+ stage_message: Optional custom message for the stage
+ chunks_count: Number of chunks created (optional, stored in metadata only)
+
+ Returns:
+ Updated notification
+ """
+ # User-friendly stage messages
+ stage_messages = {
+ "parsing": "Reading your file",
+ "chunking": "Preparing for search",
+ "embedding": "Preparing for search",
+ "storing": "Finalizing",
+ }
+
+ message = stage_message or stage_messages.get(stage, "Processing")
+
+ metadata_updates = {"processing_stage": stage}
+ # Store chunks_count in metadata for debugging, but don't show to user
+ if chunks_count is not None:
+ metadata_updates["chunks_count"] = chunks_count
+
+ return await self.update_notification(
+ session=session,
+ notification=notification,
+ message=message,
+ status="in_progress",
+ metadata_updates=metadata_updates,
+ )
+
+ async def notify_processing_completed(
+ self,
+ session: AsyncSession,
+ notification: Notification,
+ document_id: int | None = None,
+ chunks_count: int | None = None,
+ error_message: str | None = None,
+ ) -> Notification:
+ """
+ Update notification when document processing completes.
+
+ Args:
+ session: Database session
+ notification: Notification to update
+ document_id: ID of the created document (optional)
+ chunks_count: Total number of chunks created (optional)
+ error_message: Error message if processing failed (optional)
+
+ Returns:
+ Updated notification
+ """
+ document_name = notification.notification_metadata.get("document_name", "Document")
+
+ if error_message:
+ title = f"Failed: {document_name}"
+ message = f"Processing failed: {error_message}"
+ status = "failed"
+ else:
+ title = f"Ready: {document_name}"
+ message = "Now searchable!"
+ status = "completed"
+
+ metadata_updates = {
+ "processing_stage": "completed" if not error_message else "failed",
+ "error_message": error_message,
+ }
+
+ if document_id is not None:
+ metadata_updates["document_id"] = document_id
+ # Store chunks_count in metadata for debugging, but don't show to user
+ if chunks_count is not None:
+ metadata_updates["chunks_count"] = chunks_count
+
+ return await self.update_notification(
+ session=session,
+ notification=notification,
+ title=title,
+ message=message,
+ status=status,
+ metadata_updates=metadata_updates,
+ )
+
+
class NotificationService:
"""Service for creating and managing notifications that sync via Electric SQL."""
# Handler instances
connector_indexing = ConnectorIndexingNotificationHandler()
+ document_processing = DocumentProcessingNotificationHandler()
@staticmethod
async def create_notification(
@@ -442,7 +623,7 @@ class NotificationService:
Args:
session: Database session
user_id: User to notify
- notification_type: Type of notification (e.g., 'document_processed', 'connector_indexed')
+ notification_type: Type of notification (e.g., 'document_processing', 'connector_indexing')
title: Notification title
message: Notification message
search_space_id: Optional search space ID
@@ -465,43 +646,3 @@ class NotificationService:
logger.info(f"Created notification {notification.id} for user {user_id}")
return notification
- @staticmethod
- async def create_document_processed_notification(
- session: AsyncSession,
- user_id: UUID,
- document_id: int,
- document_title: str,
- status: str,
- search_space_id: int,
- ) -> Notification:
- """
- Create notification when document processing completes.
-
- Args:
- session: Database session
- user_id: User to notify
- document_id: ID of the processed document
- document_title: Title of the document
- status: Processing status ('SUCCESS', 'FAILED')
- search_space_id: Search space ID
-
- Returns:
- Notification: The created notification
- """
- status_lower = status.lower()
- title = f"Document processed: {document_title}"
- message = f'Your document "{document_title}" has been {status_lower}.'
-
- return await NotificationService.create_notification(
- session=session,
- user_id=user_id,
- notification_type="document_processed",
- title=title,
- message=message,
- search_space_id=search_space_id,
- notification_metadata={
- "document_id": document_id,
- "status": status,
- },
- )
-
diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py
index bb53fd042..cf5a06789 100644
--- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py
+++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py
@@ -1,12 +1,14 @@
"""Celery tasks for document processing."""
import logging
+from uuid import UUID
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
+from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.document_processors import (
add_extension_received_document,
@@ -84,6 +86,20 @@ async def _process_extension_document(
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
+ # Truncate title for notification display
+ page_title = individual_document.metadata.VisitedWebPageTitle[:50]
+ if len(individual_document.metadata.VisitedWebPageTitle) > 50:
+ page_title += "..."
+
+ # Create notification for document processing
+ notification = await NotificationService.document_processing.notify_processing_started(
+ session=session,
+ user_id=UUID(user_id),
+ document_type="EXTENSION",
+ document_name=page_title,
+ search_space_id=search_space_id,
+ )
+
log_entry = await task_logger.log_task_start(
task_name="process_extension_document",
source="document_processor",
@@ -97,6 +113,11 @@ async def _process_extension_document(
)
try:
+ # Update notification: parsing stage
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Reading page content"
+ )
+
result = await add_extension_received_document(
session, individual_document, search_space_id, user_id
)
@@ -107,12 +128,28 @@ async def _process_extension_document(
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
{"document_id": result.id, "content_hash": result.content_hash},
)
+
+ # Update notification on success
+ chunks_count = len(result.chunks) if hasattr(result, 'chunks') and result.chunks else None
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ document_id=result.id,
+ chunks_count=chunks_count,
+ )
else:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True},
)
+
+ # Update notification for duplicate
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message="Page already saved (duplicate)",
+ )
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@@ -120,6 +157,14 @@ async def _process_extension_document(
str(e),
{"error_type": type(e).__name__},
)
+
+ # Update notification on failure
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message=str(e)[:100],
+ )
+
logger.error(f"Error processing extension document: {e!s}")
raise
@@ -150,6 +195,18 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
+ # Extract video title from URL for notification (will be updated later)
+ video_name = url.split("v=")[-1][:11] if "v=" in url else url
+
+ # Create notification for document processing
+ notification = await NotificationService.document_processing.notify_processing_started(
+ session=session,
+ user_id=UUID(user_id),
+ document_type="YOUTUBE_VIDEO",
+ document_name=f"YouTube: {video_name}",
+ search_space_id=search_space_id,
+ )
+
log_entry = await task_logger.log_task_start(
task_name="process_youtube_video",
source="document_processor",
@@ -158,6 +215,11 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
)
try:
+ # Update notification: parsing (fetching transcript)
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Fetching video transcript"
+ )
+
result = await add_youtube_video_document(
session, url, search_space_id, user_id
)
@@ -172,12 +234,28 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
"content_hash": result.content_hash,
},
)
+
+ # Update notification on success
+ chunks_count = len(result.chunks) if hasattr(result, 'chunks') and result.chunks else None
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ document_id=result.id,
+ chunks_count=chunks_count,
+ )
else:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
+
+ # Update notification for duplicate
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message="Video already exists (duplicate)",
+ )
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@@ -185,6 +263,14 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
str(e),
{"error_type": type(e).__name__},
)
+
+ # Update notification on failure
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message=str(e)[:100],
+ )
+
logger.error(f"Error processing YouTube video: {e!s}")
raise
@@ -219,11 +305,29 @@ async def _process_file_upload(
file_path: str, filename: str, search_space_id: int, user_id: str
):
"""Process file upload with new session."""
+ import os
+
from app.tasks.document_processors.file_processors import process_file_in_background
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
+ # Get file size for notification metadata
+ try:
+ file_size = os.path.getsize(file_path)
+ except Exception:
+ file_size = None
+
+ # Create notification for document processing
+ notification = await NotificationService.document_processing.notify_processing_started(
+ session=session,
+ user_id=UUID(user_id),
+ document_type="FILE",
+ document_name=filename,
+ search_space_id=search_space_id,
+ file_size=file_size,
+ )
+
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
source="document_processor",
@@ -237,7 +341,7 @@ async def _process_file_upload(
)
try:
- await process_file_in_background(
+ result = await process_file_in_background(
file_path,
filename,
search_space_id,
@@ -245,7 +349,26 @@ async def _process_file_upload(
session,
task_logger,
log_entry,
+ notification=notification,
)
+
+ # Update notification on success
+ if result:
+ chunks_count = len(result.chunks) if hasattr(result, 'chunks') and result.chunks else None
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ document_id=result.id,
+ chunks_count=chunks_count,
+ )
+ else:
+ # Duplicate detected
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message="Document already exists (duplicate)",
+ )
+
except Exception as e:
# Import here to avoid circular dependencies
from fastapi import HTTPException
@@ -258,7 +381,14 @@ async def _process_file_upload(
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
error_message = str(e.detail)
else:
- error_message = f"Failed to process file: {filename}"
+ error_message = str(e)[:100]
+
+ # Update notification on failure
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message=error_message,
+ )
await task_logger.log_task_failure(
log_entry,
@@ -323,6 +453,20 @@ async def _process_circleback_meeting(
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
+ # Get user_id from metadata if available
+ user_id = metadata.get("user_id")
+
+ # Create notification if user_id is available
+ notification = None
+ if user_id:
+ notification = await NotificationService.document_processing.notify_processing_started(
+ session=session,
+ user_id=UUID(user_id),
+ document_type="CIRCLEBACK",
+ document_name=f"Meeting: {meeting_name[:40]}",
+ search_space_id=search_space_id,
+ )
+
log_entry = await task_logger.log_task_start(
task_name="process_circleback_meeting",
source="circleback_webhook",
@@ -336,6 +480,12 @@ async def _process_circleback_meeting(
)
try:
+ # Update notification: parsing stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Reading meeting notes"
+ )
+
result = await add_circleback_meeting_document(
session=session,
meeting_id=meeting_id,
@@ -355,12 +505,30 @@ async def _process_circleback_meeting(
"content_hash": result.content_hash,
},
)
+
+ # Update notification on success
+ if notification:
+ chunks_count = len(result.chunks) if hasattr(result, 'chunks') and result.chunks else None
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ document_id=result.id,
+ chunks_count=chunks_count,
+ )
else:
await task_logger.log_task_success(
log_entry,
f"Circleback meeting document already exists (duplicate): {meeting_name}",
{"duplicate_detected": True, "meeting_id": meeting_id},
)
+
+ # Update notification for duplicate
+ if notification:
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message="Meeting already saved (duplicate)",
+ )
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@@ -368,5 +536,14 @@ async def _process_circleback_meeting(
str(e),
{"error_type": type(e).__name__, "meeting_id": meeting_id},
)
+
+ # Update notification on failure
+ if notification:
+ await NotificationService.document_processing.notify_processing_completed(
+ session=session,
+ notification=notification,
+ error_message=str(e)[:100],
+ )
+
logger.error(f"Error processing Circleback meeting: {e!s}")
raise
diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py
index 596cd9830..eecb1ac1a 100644
--- a/surfsense_backend/app/tasks/document_processors/file_processors.py
+++ b/surfsense_backend/app/tasks/document_processors/file_processors.py
@@ -14,8 +14,9 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
-from app.db import Document, DocumentType, Log
+from app.db import Document, DocumentType, Log, Notification
from app.services.llm_service import get_user_long_context_llm
+from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
convert_document_to_markdown,
@@ -475,10 +476,17 @@ async def process_file_in_background(
log_entry: Log,
connector: dict
| None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}}
-):
+ notification: Notification | None = None, # Optional notification for progress updates
+) -> Document | None:
try:
# Check if the file is a markdown or text file
if filename.lower().endswith((".md", ".markdown", ".txt")):
+ # Update notification: parsing stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Reading file"
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
@@ -498,6 +506,12 @@ async def process_file_in_background(
print("Error deleting temp file", e)
pass
+ # Update notification: chunking stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="chunking"
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Creating document from markdown content: {filename}",
@@ -525,17 +539,25 @@ async def process_file_in_background(
"file_type": "markdown",
},
)
+ return result
else:
await task_logger.log_task_success(
log_entry,
f"Markdown file already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "markdown"},
)
+ return None
# Check if the file is an audio file
elif filename.lower().endswith(
(".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
):
+ # Update notification: parsing stage (transcription)
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Transcribing audio"
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
@@ -619,6 +641,12 @@ async def process_file_in_background(
},
)
+ # Update notification: chunking stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="chunking"
+ )
+
# Clean up the temp file
try:
os.unlink(file_path)
@@ -646,12 +674,14 @@ async def process_file_in_background(
"stt_service": stt_service_type,
},
)
+ return result
else:
await task_logger.log_task_success(
log_entry,
f"Audio file transcript already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "audio"},
)
+ return None
else:
# Import page limit service
@@ -716,6 +746,12 @@ async def process_file_in_background(
) from e
if app_config.ETL_SERVICE == "UNSTRUCTURED":
+ # Update notification: parsing stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Extracting content"
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Processing file with Unstructured ETL: {filename}",
@@ -741,6 +777,12 @@ async def process_file_in_background(
docs = await loader.aload()
+ # Update notification: chunking stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="chunking", chunks_count=len(docs)
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Unstructured ETL completed, creating document: {filename}",
@@ -800,6 +842,7 @@ async def process_file_in_background(
"pages_processed": final_page_count,
},
)
+ return result
else:
await task_logger.log_task_success(
log_entry,
@@ -810,8 +853,15 @@ async def process_file_in_background(
"etl_service": "UNSTRUCTURED",
},
)
+ return None
elif app_config.ETL_SERVICE == "LLAMACLOUD":
+ # Update notification: parsing stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Extracting content"
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Processing file with LlamaCloud ETL: {filename}",
@@ -851,6 +901,12 @@ async def process_file_in_background(
split_by_page=False
)
+ # Update notification: chunking stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="chunking", chunks_count=len(markdown_documents)
+ )
+
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud parsing completed, creating documents: {filename}",
@@ -943,6 +999,7 @@ async def process_file_in_background(
"documents_count": len(markdown_documents),
},
)
+ return last_created_doc
else:
# All documents were duplicates (markdown_documents was not empty, but all returned None)
await task_logger.log_task_success(
@@ -955,8 +1012,15 @@ async def process_file_in_background(
"documents_count": len(markdown_documents),
},
)
+ return None
elif app_config.ETL_SERVICE == "DOCLING":
+ # Update notification: parsing stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="parsing", stage_message="Extracting content"
+ )
+
await task_logger.log_task_progress(
log_entry,
f"Processing file with Docling ETL: {filename}",
@@ -1039,6 +1103,12 @@ async def process_file_in_background(
},
)
+ # Update notification: chunking stage
+ if notification:
+ await NotificationService.document_processing.notify_processing_progress(
+ session, notification, stage="chunking"
+ )
+
# Process the document using our Docling background task
doc_result = await add_received_file_document_using_docling(
session,
@@ -1071,6 +1141,7 @@ async def process_file_in_background(
"pages_processed": final_page_count,
},
)
+ return doc_result
else:
await task_logger.log_task_success(
log_entry,
@@ -1081,6 +1152,7 @@ async def process_file_in_background(
"etl_service": "DOCLING",
},
)
+ return None
except Exception as e:
await session.rollback()
diff --git a/surfsense_web/components/assistant-ui/attachment.tsx b/surfsense_web/components/assistant-ui/attachment.tsx
index 659790592..6c106fd80 100644
--- a/surfsense_web/components/assistant-ui/attachment.tsx
+++ b/surfsense_web/components/assistant-ui/attachment.tsx
@@ -357,7 +357,7 @@ export const ComposerAddAttachment: FC = () => {
- Upload Files
+ Upload Documents
diff --git a/surfsense_web/contracts/types/notification.types.ts b/surfsense_web/contracts/types/notification.types.ts
index bd832c2c9..cac49b09c 100644
--- a/surfsense_web/contracts/types/notification.types.ts
+++ b/surfsense_web/contracts/types/notification.types.ts
@@ -1,12 +1,13 @@
import { z } from "zod";
import { searchSourceConnectorTypeEnum } from "./connector.types";
+import { documentTypeEnum } from "./document.types";
/**
* Notification type enum - matches backend notification types
*/
export const notificationTypeEnum = z.enum([
"connector_indexing",
- "document_processed",
+ "document_processing",
]);
/**
@@ -18,6 +19,19 @@ export const notificationStatusEnum = z.enum([
"failed",
]);
+/**
+ * Document processing stage enum
+ */
+export const documentProcessingStageEnum = z.enum([
+ "queued",
+ "parsing",
+ "chunking",
+ "embedding",
+ "storing",
+ "completed",
+ "failed",
+]);
+
/**
* Base metadata schema shared across notification types
*/
@@ -49,11 +63,16 @@ export const connectorIndexingMetadata = baseNotificationMetadata.extend({
});
/**
- * Document processed metadata schema
+ * Document processing metadata schema
*/
-export const documentProcessedMetadata = baseNotificationMetadata.extend({
- document_id: z.number(),
- status: z.string(),
+export const documentProcessingMetadata = baseNotificationMetadata.extend({
+ document_type: documentTypeEnum,
+ document_name: z.string(),
+ processing_stage: documentProcessingStageEnum,
+ file_size: z.number().optional(),
+ chunks_count: z.number().optional(),
+ document_id: z.number().optional(),
+ error_message: z.string().nullable().optional(),
});
/**
@@ -62,7 +81,7 @@ export const documentProcessedMetadata = baseNotificationMetadata.extend({
*/
export const notificationMetadata = z.union([
connectorIndexingMetadata,
- documentProcessedMetadata,
+ documentProcessingMetadata,
baseNotificationMetadata,
]);
@@ -90,19 +109,20 @@ export const connectorIndexingNotification = notification.extend({
metadata: connectorIndexingMetadata,
});
-export const documentProcessedNotification = notification.extend({
- type: z.literal("document_processed"),
- metadata: documentProcessedMetadata,
+export const documentProcessingNotification = notification.extend({
+ type: z.literal("document_processing"),
+ metadata: documentProcessingMetadata,
});
// Inferred types
export type NotificationTypeEnum = z.infer;
export type NotificationStatusEnum = z.infer;
+export type DocumentProcessingStageEnum = z.infer;
export type BaseNotificationMetadata = z.infer;
export type ConnectorIndexingMetadata = z.infer;
-export type DocumentProcessedMetadata = z.infer;
+export type DocumentProcessingMetadata = z.infer;
export type NotificationMetadata = z.infer;
export type Notification = z.infer;
export type ConnectorIndexingNotification = z.infer;
-export type DocumentProcessedNotification = z.infer;
+export type DocumentProcessingNotification = z.infer;