resolve conflicts

This commit is contained in:
Manoj Aggarwal 2026-01-16 15:39:57 -08:00
commit 11b160cee9
85 changed files with 8411 additions and 4637 deletions

View file

@ -574,6 +574,12 @@ class SearchSpace(BaseModel, TimestampMixin):
order_by="Log.id",
cascade="all, delete-orphan",
)
notifications = relationship(
"Notification",
back_populates="search_space",
order_by="Notification.created_at.desc()",
cascade="all, delete-orphan",
)
search_source_connectors = relationship(
"SearchSourceConnector",
back_populates="search_space",
@ -712,6 +718,39 @@ class Log(BaseModel, TimestampMixin):
search_space = relationship("SearchSpace", back_populates="logs")
class Notification(BaseModel, TimestampMixin):
__tablename__ = "notifications"
user_id = Column(
UUID(as_uuid=True),
ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=True
)
type = Column(
String(50), nullable=False
) # 'connector_indexing', 'document_processing', etc.
title = Column(String(200), nullable=False)
message = Column(Text, nullable=False)
read = Column(
Boolean, nullable=False, default=False, server_default=text("false"), index=True
)
notification_metadata = Column("metadata", JSONB, nullable=True, default={})
updated_at = Column(
TIMESTAMP(timezone=True),
nullable=True,
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
index=True,
)
user = relationship("User", back_populates="notifications")
search_space = relationship("SearchSpace", back_populates="notifications")
class SearchSpaceRole(BaseModel, TimestampMixin):
"""
Custom roles that can be defined per search space.
@ -856,6 +895,12 @@ if config.AUTH_TYPE == "GOOGLE":
"OAuthAccount", lazy="joined"
)
search_spaces = relationship("SearchSpace", back_populates="user")
notifications = relationship(
"Notification",
back_populates="user",
order_by="Notification.created_at.desc()",
cascade="all, delete-orphan",
)
# RBAC relationships
search_space_memberships = relationship(
@ -893,6 +938,12 @@ else:
class User(SQLAlchemyBaseUserTableUUID, Base):
search_spaces = relationship("SearchSpace", back_populates="user")
notifications = relationship(
"Notification",
back_populates="user",
order_by="Notification.created_at.desc()",
cascade="all, delete-orphan",
)
# RBAC relationships
search_space_memberships = relationship(

View file

@ -25,6 +25,7 @@ from .luma_add_connector_route import router as luma_add_connector_router
from .new_chat_routes import router as new_chat_router
from .new_llm_config_routes import router as new_llm_config_router
from .notes_routes import router as notes_router
from .notifications_routes import router as notifications_router
from .notion_add_connector_route import router as notion_add_connector_router
from .podcasts_routes import router as podcasts_router
from .rbac_routes import router as rbac_router
@ -61,3 +62,4 @@ router.include_router(new_llm_config_router) # LLM configs with prompt configur
router.include_router(logs_router)
router.include_router(circleback_webhook_router) # Circleback meeting webhooks
router.include_router(surfsense_docs_router) # Surfsense documentation for citations
router.include_router(notifications_router) # Notifications with Electric SQL sync

View file

@ -0,0 +1,102 @@
"""
Notifications API routes.
These endpoints allow marking notifications as read.
Electric SQL automatically syncs the changes to all connected clients.
"""
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Notification, User, get_async_session
from app.users import current_active_user
router = APIRouter(prefix="/notifications", tags=["notifications"])
class MarkReadResponse(BaseModel):
"""Response for mark as read operations."""
success: bool
message: str
class MarkAllReadResponse(BaseModel):
"""Response for mark all as read operation."""
success: bool
message: str
updated_count: int
@router.patch("/{notification_id}/read", response_model=MarkReadResponse)
async def mark_notification_as_read(
notification_id: int,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> MarkReadResponse:
"""
Mark a single notification as read.
Electric SQL will automatically sync this change to all connected clients.
"""
# Verify the notification belongs to the user
result = await session.execute(
select(Notification).where(
Notification.id == notification_id,
Notification.user_id == user.id,
)
)
notification = result.scalar_one_or_none()
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Notification not found",
)
if notification.read:
return MarkReadResponse(
success=True,
message="Notification already marked as read",
)
# Update the notification
notification.read = True
await session.commit()
return MarkReadResponse(
success=True,
message="Notification marked as read",
)
@router.patch("/read-all", response_model=MarkAllReadResponse)
async def mark_all_notifications_as_read(
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
) -> MarkAllReadResponse:
"""
Mark all notifications as read for the current user.
Electric SQL will automatically sync these changes to all connected clients.
"""
# Update all unread notifications for the user
result = await session.execute(
update(Notification)
.where(
Notification.user_id == user.id,
Notification.read == False, # noqa: E712
)
.values(read=True)
)
await session.commit()
updated_count = result.rowcount
return MarkAllReadResponse(
success=True,
message=f"Marked {updated_count} notification(s) as read",
updated_count=updated_count,
)

View file

@ -0,0 +1,664 @@
"""Service for creating and managing notifications with Electric SQL sync."""
import logging
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.db import Notification
logger = logging.getLogger(__name__)
class BaseNotificationHandler:
"""Base class for notification handlers - provides common functionality."""
def __init__(self, notification_type: str):
"""
Initialize the notification handler.
Args:
notification_type: Type of notification (e.g., 'connector_indexing', 'document_processing')
"""
self.notification_type = notification_type
async def find_notification_by_operation(
self,
session: AsyncSession,
user_id: UUID,
operation_id: str,
search_space_id: int | None = None,
) -> Notification | None:
"""
Find an existing notification by operation ID.
Args:
session: Database session
user_id: User ID
operation_id: Unique operation identifier
search_space_id: Optional search space ID
Returns:
Notification if found, None otherwise
"""
query = select(Notification).where(
Notification.user_id == user_id,
Notification.type == self.notification_type,
Notification.notification_metadata["operation_id"].astext == operation_id,
)
if search_space_id is not None:
query = query.where(Notification.search_space_id == search_space_id)
result = await session.execute(query)
return result.scalar_one_or_none()
async def find_or_create_notification(
self,
session: AsyncSession,
user_id: UUID,
operation_id: str,
title: str,
message: str,
search_space_id: int | None = None,
initial_metadata: dict[str, Any] | None = None,
) -> Notification:
"""
Find an existing notification or create a new one.
Args:
session: Database session
user_id: User ID
operation_id: Unique operation identifier
title: Notification title
message: Notification message
search_space_id: Optional search space ID
initial_metadata: Initial metadata dictionary
Returns:
Notification: The found or created notification
"""
# Try to find existing notification
notification = await self.find_notification_by_operation(
session, user_id, operation_id, search_space_id
)
if notification:
# Update existing notification
notification.title = title
notification.message = message
if initial_metadata:
notification.notification_metadata = {
**notification.notification_metadata,
**initial_metadata,
}
# Mark JSONB column as modified so SQLAlchemy detects the change
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
logger.info(
f"Updated notification {notification.id} for operation {operation_id}"
)
return notification
# Create new notification
metadata = initial_metadata or {}
metadata["operation_id"] = operation_id
metadata["status"] = "in_progress"
metadata["started_at"] = datetime.now(UTC).isoformat()
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=self.notification_type,
title=title,
message=message,
notification_metadata=metadata,
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(
f"Created notification {notification.id} for operation {operation_id}"
)
return notification
async def update_notification(
self,
session: AsyncSession,
notification: Notification,
title: str | None = None,
message: str | None = None,
status: str | None = None,
metadata_updates: dict[str, Any] | None = None,
) -> Notification:
"""
Update an existing notification.
Args:
session: Database session
notification: Notification to update
title: New title (optional)
message: New message (optional)
status: New status (optional)
metadata_updates: Additional metadata to merge (optional)
Returns:
Updated notification
"""
if title is not None:
notification.title = title
if message is not None:
notification.message = message
if status is not None:
notification.notification_metadata["status"] = status
if status in ("completed", "failed"):
notification.notification_metadata["completed_at"] = datetime.now(
UTC
).isoformat()
# Mark JSONB column as modified so SQLAlchemy detects the change
flag_modified(notification, "notification_metadata")
if metadata_updates:
notification.notification_metadata = {
**notification.notification_metadata,
**metadata_updates,
}
# Mark JSONB column as modified
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
logger.info(f"Updated notification {notification.id}")
return notification
class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"""Handler for connector indexing notifications."""
def __init__(self):
super().__init__("connector_indexing")
def _generate_operation_id(
self,
connector_id: int,
start_date: str | None = None,
end_date: str | None = None,
) -> str:
"""
Generate a unique operation ID for a connector indexing operation.
Args:
connector_id: Connector ID
start_date: Start date (optional)
end_date: End date (optional)
Returns:
Unique operation ID string
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
date_range = ""
if start_date or end_date:
date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
return f"connector_{connector_id}_{timestamp}{date_range}"
def _generate_google_drive_operation_id(
self, connector_id: int, folder_count: int, file_count: int
) -> str:
"""
Generate a unique operation ID for a Google Drive indexing operation.
Args:
connector_id: Connector ID
folder_count: Number of folders to index
file_count: Number of files to index
Returns:
Unique operation ID string
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
items_info = f"_{folder_count}f_{file_count}files"
return f"drive_{connector_id}_{timestamp}{items_info}"
async def notify_indexing_started(
self,
session: AsyncSession,
user_id: UUID,
connector_id: int,
connector_name: str,
connector_type: str,
search_space_id: int,
start_date: str | None = None,
end_date: str | None = None,
) -> Notification:
"""
Create or update notification when connector indexing starts.
Args:
session: Database session
user_id: User ID
connector_id: Connector ID
connector_name: Connector name
connector_type: Connector type
search_space_id: Search space ID
start_date: Start date for indexing
end_date: End date for indexing
Returns:
Notification: The created or updated notification
"""
operation_id = self._generate_operation_id(connector_id, start_date, end_date)
title = f"Syncing: {connector_name}"
message = "Connecting to your account"
metadata = {
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"start_date": start_date,
"end_date": end_date,
"indexed_count": 0,
"sync_stage": "connecting",
}
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
async def notify_indexing_progress(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
total_count: int | None = None,
stage: str | None = None,
stage_message: str | None = None,
) -> Notification:
"""
Update notification with indexing progress.
Args:
session: Database session
notification: Notification to update
indexed_count: Number of items indexed so far
total_count: Total number of items (optional)
stage: Current sync stage (fetching, processing, storing) (optional)
stage_message: Optional custom message for the stage
Returns:
Updated notification
"""
# User-friendly stage messages (clean, no ellipsis - spinner shows activity)
stage_messages = {
"connecting": "Connecting to your account",
"fetching": "Fetching your content",
"processing": "Preparing for search",
"storing": "Almost done",
}
# Use stage-based message if stage provided, otherwise fallback
if stage or stage_message:
progress_msg = stage_message or stage_messages.get(stage, "Processing")
else:
# Fallback for backward compatibility
progress_msg = "Fetching your content"
metadata_updates = {"indexed_count": indexed_count}
if total_count is not None:
metadata_updates["total_count"] = total_count
progress_percent = int((indexed_count / total_count) * 100)
metadata_updates["progress_percent"] = progress_percent
if stage:
metadata_updates["sync_stage"] = stage
return await self.update_notification(
session=session,
notification=notification,
message=progress_msg,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_indexing_completed(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
error_message: str | None = None,
) -> Notification:
"""
Update notification when connector indexing completes.
Args:
session: Database session
notification: Notification to update
indexed_count: Total number of items indexed
error_message: Error message if indexing failed (optional)
Returns:
Updated notification
"""
connector_name = notification.notification_metadata.get(
"connector_name", "Connector"
)
if error_message:
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
status = "failed"
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
message = "Already up to date! No new items to sync."
else:
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced."
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"sync_stage": "completed" if not error_message else "failed",
"error_message": error_message,
}
return await self.update_notification(
session=session,
notification=notification,
title=title,
message=message,
status=status,
metadata_updates=metadata_updates,
)
async def notify_google_drive_indexing_started(
self,
session: AsyncSession,
user_id: UUID,
connector_id: int,
connector_name: str,
connector_type: str,
search_space_id: int,
folder_count: int,
file_count: int,
folder_names: list[str] | None = None,
file_names: list[str] | None = None,
) -> Notification:
"""
Create or update notification when Google Drive indexing starts.
Args:
session: Database session
user_id: User ID
connector_id: Connector ID
connector_name: Connector name
connector_type: Connector type
search_space_id: Search space ID
folder_count: Number of folders to index
file_count: Number of files to index
folder_names: List of folder names (optional)
file_names: List of file names (optional)
Returns:
Notification: The created or updated notification
"""
operation_id = self._generate_google_drive_operation_id(
connector_id, folder_count, file_count
)
title = f"Syncing: {connector_name}"
message = "Preparing your files"
metadata = {
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"folder_count": folder_count,
"file_count": file_count,
"indexed_count": 0,
"sync_stage": "connecting",
}
if folder_names:
metadata["folder_names"] = folder_names
if file_names:
metadata["file_names"] = file_names
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
class DocumentProcessingNotificationHandler(BaseNotificationHandler):
"""Handler for document processing notifications."""
def __init__(self):
super().__init__("document_processing")
def _generate_operation_id(
self, document_type: str, filename: str, search_space_id: int
) -> str:
"""
Generate a unique operation ID for a document processing operation.
Args:
document_type: Type of document (FILE, YOUTUBE_VIDEO, CRAWLED_URL, etc.)
filename: Name of the file/document
search_space_id: Search space ID
Returns:
Unique operation ID string
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S_%f")
# Create a short hash of filename to ensure uniqueness
import hashlib
filename_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
return f"doc_{document_type}_{search_space_id}_{timestamp}_{filename_hash}"
async def notify_processing_started(
self,
session: AsyncSession,
user_id: UUID,
document_type: str,
document_name: str,
search_space_id: int,
file_size: int | None = None,
) -> Notification:
"""
Create notification when document processing starts.
Args:
session: Database session
user_id: User ID
document_type: Type of document (FILE, YOUTUBE_VIDEO, CRAWLED_URL, etc.)
document_name: Name/title of the document
search_space_id: Search space ID
file_size: Size of file in bytes (optional)
Returns:
Notification: The created notification
"""
operation_id = self._generate_operation_id(
document_type, document_name, search_space_id
)
title = f"Processing: {document_name}"
message = "Waiting in queue"
metadata = {
"document_type": document_type,
"document_name": document_name,
"processing_stage": "queued",
}
if file_size is not None:
metadata["file_size"] = file_size
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
async def notify_processing_progress(
self,
session: AsyncSession,
notification: Notification,
stage: str,
stage_message: str | None = None,
chunks_count: int | None = None,
) -> Notification:
"""
Update notification with processing progress.
Args:
session: Database session
notification: Notification to update
stage: Current processing stage (parsing, chunking, embedding, storing)
stage_message: Optional custom message for the stage
chunks_count: Number of chunks created (optional, stored in metadata only)
Returns:
Updated notification
"""
# User-friendly stage messages
stage_messages = {
"parsing": "Reading your file",
"chunking": "Preparing for search",
"embedding": "Preparing for search",
"storing": "Finalizing",
}
message = stage_message or stage_messages.get(stage, "Processing")
metadata_updates = {"processing_stage": stage}
# Store chunks_count in metadata for debugging, but don't show to user
if chunks_count is not None:
metadata_updates["chunks_count"] = chunks_count
return await self.update_notification(
session=session,
notification=notification,
message=message,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_processing_completed(
self,
session: AsyncSession,
notification: Notification,
document_id: int | None = None,
chunks_count: int | None = None,
error_message: str | None = None,
) -> Notification:
"""
Update notification when document processing completes.
Args:
session: Database session
notification: Notification to update
document_id: ID of the created document (optional)
chunks_count: Total number of chunks created (optional)
error_message: Error message if processing failed (optional)
Returns:
Updated notification
"""
document_name = notification.notification_metadata.get(
"document_name", "Document"
)
if error_message:
title = f"Failed: {document_name}"
message = f"Processing failed: {error_message}"
status = "failed"
else:
title = f"Ready: {document_name}"
message = "Now searchable!"
status = "completed"
metadata_updates = {
"processing_stage": "completed" if not error_message else "failed",
"error_message": error_message,
}
if document_id is not None:
metadata_updates["document_id"] = document_id
# Store chunks_count in metadata for debugging, but don't show to user
if chunks_count is not None:
metadata_updates["chunks_count"] = chunks_count
return await self.update_notification(
session=session,
notification=notification,
title=title,
message=message,
status=status,
metadata_updates=metadata_updates,
)
class NotificationService:
"""Service for creating and managing notifications that sync via Electric SQL."""
# Handler instances
connector_indexing = ConnectorIndexingNotificationHandler()
document_processing = DocumentProcessingNotificationHandler()
@staticmethod
async def create_notification(
session: AsyncSession,
user_id: UUID,
notification_type: str,
title: str,
message: str,
search_space_id: int | None = None,
notification_metadata: dict[str, Any] | None = None,
) -> Notification:
"""
Create a notification - Electric SQL will automatically sync it to frontend.
Args:
session: Database session
user_id: User to notify
notification_type: Type of notification (e.g., 'document_processing', 'connector_indexing')
title: Notification title
message: Notification message
search_space_id: Optional search space ID
notification_metadata: Optional metadata dictionary
Returns:
Notification: The created notification
"""
notification = Notification(
user_id=user_id,
search_space_id=search_space_id,
type=notification_type,
title=title,
message=message,
notification_metadata=notification_metadata or {},
)
session.add(notification)
await session.commit()
await session.refresh(notification)
logger.info(f"Created notification {notification.id} for user {user_id}")
return notification

View file

@ -445,31 +445,13 @@ async def _index_google_gmail_messages(
end_date: str,
):
"""Index Google Gmail messages with new session."""
from datetime import datetime
from app.routes.search_source_connectors_routes import (
run_google_gmail_indexing,
)
# Parse dates to calculate days_back
max_messages = 100
days_back = 30 # Default
if start_date:
try:
# Parse start_date (format: YYYY-MM-DD)
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
# Calculate days back from now
days_back = (datetime.now() - start_dt).days
# Ensure at least 1 day
days_back = max(1, days_back)
except ValueError:
# If parsing fails, use default
days_back = 30
async with get_celery_session_maker()() as session:
await run_google_gmail_indexing(
session, connector_id, search_space_id, user_id, max_messages, days_back
session, connector_id, search_space_id, user_id, start_date, end_date
)

View file

@ -1,12 +1,14 @@
"""Celery tasks for document processing."""
import logging
from uuid import UUID
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.document_processors import (
add_extension_received_document,
@ -84,6 +86,22 @@ async def _process_extension_document(
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Truncate title for notification display
page_title = individual_document.metadata.VisitedWebPageTitle[:50]
if len(individual_document.metadata.VisitedWebPageTitle) > 50:
page_title += "..."
# Create notification for document processing
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="EXTENSION",
document_name=page_title,
search_space_id=search_space_id,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_extension_document",
source="document_processor",
@ -97,6 +115,14 @@ async def _process_extension_document(
)
try:
# Update notification: parsing stage
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading page content",
)
result = await add_extension_received_document(
session, individual_document, search_space_id, user_id
)
@ -107,12 +133,31 @@ async def _process_extension_document(
f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
{"document_id": result.id, "content_hash": result.content_hash},
)
# Update notification on success
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
await task_logger.log_task_success(
log_entry,
f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
{"duplicate_detected": True},
)
# Update notification for duplicate
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Page already saved (duplicate)",
)
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@ -120,6 +165,23 @@ async def _process_extension_document(
str(e),
{"error_type": type(e).__name__},
)
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing extension document: {e!s}")
raise
@ -150,6 +212,20 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Extract video title from URL for notification (will be updated later)
video_name = url.split("v=")[-1][:11] if "v=" in url else url
# Create notification for document processing
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="YOUTUBE_VIDEO",
document_name=f"YouTube: {video_name}",
search_space_id=search_space_id,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_youtube_video",
source="document_processor",
@ -158,6 +234,14 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
)
try:
# Update notification: parsing (fetching transcript)
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Fetching video transcript",
)
result = await add_youtube_video_document(
session, url, search_space_id, user_id
)
@ -172,12 +256,31 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
"content_hash": result.content_hash,
},
)
# Update notification on success
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
await task_logger.log_task_success(
log_entry,
f"YouTube video document already exists (duplicate): {url}",
{"duplicate_detected": True},
)
# Update notification for duplicate
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Video already exists (duplicate)",
)
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@ -185,6 +288,23 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
str(e),
{"error_type": type(e).__name__},
)
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing YouTube video: {e!s}")
raise
@ -219,11 +339,31 @@ async def _process_file_upload(
file_path: str, filename: str, search_space_id: int, user_id: str
):
"""Process file upload with new session."""
import os
from app.tasks.document_processors.file_processors import process_file_in_background
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Get file size for notification metadata
try:
file_size = os.path.getsize(file_path)
except Exception:
file_size = None
# Create notification for document processing
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
source="document_processor",
@ -237,7 +377,7 @@ async def _process_file_upload(
)
try:
await process_file_in_background(
result = await process_file_in_background(
file_path,
filename,
search_space_id,
@ -245,7 +385,29 @@ async def _process_file_upload(
session,
task_logger,
log_entry,
notification=notification,
)
# Update notification on success
if result:
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
# Duplicate detected
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Document already exists (duplicate)",
)
)
except Exception as e:
# Import here to avoid circular dependencies
from fastapi import HTTPException
@ -258,7 +420,23 @@ async def _process_file_upload(
elif isinstance(e, HTTPException) and "page limit" in str(e.detail).lower():
error_message = str(e.detail)
else:
error_message = f"Failed to process file: {filename}"
error_message = str(e)[:100]
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await (
NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
await task_logger.log_task_failure(
log_entry,
@ -323,6 +501,22 @@ async def _process_circleback_meeting(
async with get_celery_session_maker()() as session:
task_logger = TaskLoggingService(session, search_space_id)
# Get user_id from metadata if available
user_id = metadata.get("user_id")
# Create notification if user_id is available
notification = None
if user_id:
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="CIRCLEBACK",
document_name=f"Meeting: {meeting_name[:40]}",
search_space_id=search_space_id,
)
)
log_entry = await task_logger.log_task_start(
task_name="process_circleback_meeting",
source="circleback_webhook",
@ -336,6 +530,17 @@ async def _process_circleback_meeting(
)
try:
# Update notification: parsing stage
if notification:
await (
NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading meeting notes",
)
)
result = await add_circleback_meeting_document(
session=session,
meeting_id=meeting_id,
@ -355,12 +560,29 @@ async def _process_circleback_meeting(
"content_hash": result.content_hash,
},
)
# Update notification on success
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
else:
await task_logger.log_task_success(
log_entry,
f"Circleback meeting document already exists (duplicate): {meeting_name}",
{"duplicate_detected": True, "meeting_id": meeting_id},
)
# Update notification for duplicate
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Meeting already saved (duplicate)",
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
@ -368,5 +590,21 @@ async def _process_circleback_meeting(
str(e),
{"error_type": type(e).__name__, "meeting_id": meeting_id},
)
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
if notification:
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing Circleback meeting: {e!s}")
raise

View file

@ -423,9 +423,9 @@ async def stream_new_chat(
title = title[:27] + "..."
doc_names.append(title)
if len(doc_names) == 1:
processing_parts.append(f"[📖 {doc_names[0]}]")
processing_parts.append(f"[{doc_names[0]}]")
else:
processing_parts.append(f"[📖 {len(doc_names)} docs]")
processing_parts.append(f"[{len(doc_names)} docs]")
last_active_step_items = [f"{action_verb}: {' '.join(processing_parts)}"]

View file

@ -549,7 +549,10 @@ async def index_discord_messages(
logger.info(
f"Discord indexing completed: {documents_indexed} new messages, {documents_skipped} skipped"
)
return documents_indexed, result_message
return (
documents_indexed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -464,7 +464,10 @@ async def index_notion_pages(
# Clean up the async client
await notion_client.close()
return total_processed, result_message
return (
total_processed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -413,7 +413,10 @@ async def index_slack_messages(
logger.info(
f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped"
)
return total_processed, result_message
return (
total_processed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -460,7 +460,10 @@ async def index_teams_messages(
documents_indexed,
documents_skipped,
)
return total_processed, result_message
return (
total_processed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -371,17 +371,14 @@ async def index_crawled_urls(
)
await session.commit()
# Build result message
result_message = None
# Log failed URLs if any (for debugging purposes)
if failed_urls:
failed_summary = "; ".join(
[f"{url}: {error}" for url, error in failed_urls[:5]]
)
if len(failed_urls) > 5:
failed_summary += f" (and {len(failed_urls) - 5} more)"
result_message = (
f"Completed with {len(failed_urls)} failures: {failed_summary}"
)
logger.warning(f"Some URLs failed to index: {failed_summary}")
await task_logger.log_task_success(
log_entry,
@ -400,7 +397,10 @@ async def index_crawled_urls(
f"{documents_updated} updated, {documents_skipped} skipped, "
f"{len(failed_urls)} failed"
)
return total_processed, result_message
return (
total_processed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -17,8 +17,9 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
from app.db import Document, DocumentType, Log
from app.db import Document, DocumentType, Log, Notification
from app.services.llm_service import get_user_long_context_llm
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
convert_document_to_markdown,
@ -594,10 +595,23 @@ async def process_file_in_background(
log_entry: Log,
connector: dict
| None = None, # Optional: {"type": "GOOGLE_DRIVE_FILE", "metadata": {...}}
):
notification: Notification
| None = None, # Optional notification for progress updates
) -> Document | None:
try:
# Check if the file is a markdown or text file
if filename.lower().endswith((".md", ".markdown", ".txt")):
# Update notification: parsing stage
if notification:
await (
NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading file",
)
)
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
@ -617,6 +631,14 @@ async def process_file_in_background(
print("Error deleting temp file", e)
pass
# Update notification: chunking stage
if notification:
await (
NotificationService.document_processing.notify_processing_progress(
session, notification, stage="chunking"
)
)
await task_logger.log_task_progress(
log_entry,
f"Creating document from markdown content: {filename}",
@ -644,17 +666,30 @@ async def process_file_in_background(
"file_type": "markdown",
},
)
return result
else:
await task_logger.log_task_success(
log_entry,
f"Markdown file already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "markdown"},
)
return None
# Check if the file is an audio file
elif filename.lower().endswith(
(".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
):
# Update notification: parsing stage (transcription)
if notification:
await (
NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Transcribing audio",
)
)
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
@ -738,6 +773,14 @@ async def process_file_in_background(
},
)
# Update notification: chunking stage
if notification:
await (
NotificationService.document_processing.notify_processing_progress(
session, notification, stage="chunking"
)
)
# Clean up the temp file
try:
os.unlink(file_path)
@ -765,12 +808,14 @@ async def process_file_in_background(
"stt_service": stt_service_type,
},
)
return result
else:
await task_logger.log_task_success(
log_entry,
f"Audio file transcript already exists (duplicate): {filename}",
{"duplicate_detected": True, "file_type": "audio"},
)
return None
else:
# Import page limit service
@ -835,6 +880,15 @@ async def process_file_in_background(
) from e
if app_config.ETL_SERVICE == "UNSTRUCTURED":
# Update notification: parsing stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Extracting content",
)
await task_logger.log_task_progress(
log_entry,
f"Processing file with Unstructured ETL: {filename}",
@ -860,6 +914,12 @@ async def process_file_in_background(
docs = await loader.aload()
# Update notification: chunking stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="chunking", chunks_count=len(docs)
)
await task_logger.log_task_progress(
log_entry,
f"Unstructured ETL completed, creating document: {filename}",
@ -919,6 +979,7 @@ async def process_file_in_background(
"pages_processed": final_page_count,
},
)
return result
else:
await task_logger.log_task_success(
log_entry,
@ -929,8 +990,18 @@ async def process_file_in_background(
"etl_service": "UNSTRUCTURED",
},
)
return None
elif app_config.ETL_SERVICE == "LLAMACLOUD":
# Update notification: parsing stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Extracting content",
)
await task_logger.log_task_progress(
log_entry,
f"Processing file with LlamaCloud ETL: {filename}",
@ -964,6 +1035,15 @@ async def process_file_in_background(
split_by_page=False
)
# Update notification: chunking stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="chunking",
chunks_count=len(markdown_documents),
)
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud parsing completed, creating documents: {filename}",
@ -1056,6 +1136,7 @@ async def process_file_in_background(
"documents_count": len(markdown_documents),
},
)
return last_created_doc
else:
# All documents were duplicates (markdown_documents was not empty, but all returned None)
await task_logger.log_task_success(
@ -1068,8 +1149,18 @@ async def process_file_in_background(
"documents_count": len(markdown_documents),
},
)
return None
elif app_config.ETL_SERVICE == "DOCLING":
# Update notification: parsing stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Extracting content",
)
await task_logger.log_task_progress(
log_entry,
f"Processing file with Docling ETL: {filename}",
@ -1152,6 +1243,12 @@ async def process_file_in_background(
},
)
# Update notification: chunking stage
if notification:
await NotificationService.document_processing.notify_processing_progress(
session, notification, stage="chunking"
)
# Process the document using our Docling background task
doc_result = await add_received_file_document_using_docling(
session,
@ -1184,6 +1281,7 @@ async def process_file_in_background(
"pages_processed": final_page_count,
},
)
return doc_result
else:
await task_logger.log_task_success(
log_entry,
@ -1194,6 +1292,7 @@ async def process_file_in_background(
"etl_service": "DOCLING",
},
)
return None
except Exception as e:
await session.rollback()