feat: gogole drive custom notification system

This commit is contained in:
Anish Sarkar 2026-01-13 11:39:22 +05:30
parent 271ddffbf5
commit b0bdbf158b
2 changed files with 138 additions and 3 deletions

View file

@ -1660,7 +1660,10 @@ async def run_google_drive_indexing(
user_id: str, user_id: str,
items_dict: dict, # Dictionary with 'folders' and 'files' lists items_dict: dict, # Dictionary with 'folders' and 'files' lists
): ):
"""Runs the Google Drive indexing task for folders and files and updates the timestamp.""" """Runs the Google Drive indexing task for folders and files with notifications."""
from uuid import UUID
notification = None
try: try:
from app.tasks.connector_indexers.google_drive_indexer import ( from app.tasks.connector_indexers.google_drive_indexer import (
index_google_drive_files, index_google_drive_files,
@ -1672,6 +1675,27 @@ async def run_google_drive_indexing(
total_indexed = 0 total_indexed = 0
errors = [] errors = []
# Get connector info for notification
connector_result = await session.execute(
select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id)
)
connector = connector_result.scalar_one_or_none()
if connector:
# Create notification when indexing starts
notification = await NotificationService.connector_indexing.notify_google_drive_indexing_started(
session=session,
user_id=UUID(user_id),
connector_id=connector_id,
connector_name=connector.name,
connector_type=connector.connector_type.value,
search_space_id=search_space_id,
folder_count=len(items.folders),
file_count=len(items.files),
folder_names=items.get_folder_names() if items.folders else None,
file_names=items.get_file_names() if items.files else None,
)
# Index each folder # Index each folder
for folder in items.folders: for folder in items.folders:
try: try:
@ -1718,9 +1742,12 @@ async def run_google_drive_indexing(
exc_info=True, exc_info=True,
) )
# Prepare error message for notification
error_message = None
if errors: if errors:
error_message = "; ".join(errors)
logger.error( logger.error(
f"Google Drive indexing completed with errors for connector {connector_id}: {'; '.join(errors)}" f"Google Drive indexing completed with errors for connector {connector_id}: {error_message}"
) )
else: else:
logger.info( logger.info(
@ -1729,12 +1756,33 @@ async def run_google_drive_indexing(
# Update the last indexed timestamp only on full success # Update the last indexed timestamp only on full success
await _update_connector_timestamp_by_id(session, connector_id) await _update_connector_timestamp_by_id(session, connector_id)
await session.commit() # Commit timestamp update await session.commit() # Commit timestamp update
# Update notification on completion
if notification:
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=total_indexed,
error_message=error_message,
)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}", f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}",
exc_info=True, exc_info=True,
) )
# Optionally update status in DB to indicate failure
# Update notification on exception
if notification:
try:
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=str(e),
)
except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}")
# Add new helper functions for luma indexing # Add new helper functions for luma indexing

View file

@ -199,6 +199,24 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
date_range = f"_{start_date or 'none'}_{end_date or 'none'}" date_range = f"_{start_date or 'none'}_{end_date or 'none'}"
return f"connector_{connector_id}_{timestamp}{date_range}" return f"connector_{connector_id}_{timestamp}{date_range}"
def _generate_google_drive_operation_id(
self, connector_id: int, folder_count: int, file_count: int
) -> str:
"""
Generate a unique operation ID for a Google Drive indexing operation.
Args:
connector_id: Connector ID
folder_count: Number of folders to index
file_count: Number of files to index
Returns:
Unique operation ID string
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
items_info = f"_{folder_count}f_{file_count}files"
return f"drive_{connector_id}_{timestamp}{items_info}"
async def notify_indexing_started( async def notify_indexing_started(
self, self,
session: AsyncSession, session: AsyncSession,
@ -332,6 +350,75 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
metadata_updates=metadata_updates, metadata_updates=metadata_updates,
) )
async def notify_google_drive_indexing_started(
self,
session: AsyncSession,
user_id: UUID,
connector_id: int,
connector_name: str,
connector_type: str,
search_space_id: int,
folder_count: int,
file_count: int,
folder_names: list[str] | None = None,
file_names: list[str] | None = None,
) -> Notification:
"""
Create or update notification when Google Drive indexing starts.
Args:
session: Database session
user_id: User ID
connector_id: Connector ID
connector_name: Connector name
connector_type: Connector type
search_space_id: Search space ID
folder_count: Number of folders to index
file_count: Number of files to index
folder_names: List of folder names (optional)
file_names: List of file names (optional)
Returns:
Notification: The created or updated notification
"""
operation_id = self._generate_google_drive_operation_id(
connector_id, folder_count, file_count
)
title = f"Indexing: {connector_name}"
# Create descriptive message
items_desc = []
if folder_count > 0:
items_desc.append(f"{folder_count} folder{'s' if folder_count != 1 else ''}")
if file_count > 0:
items_desc.append(f"{file_count} file{'s' if file_count != 1 else ''}")
message = f'Indexing "{connector_name}" ({", ".join(items_desc)}) in progress...'
metadata = {
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"folder_count": folder_count,
"file_count": file_count,
"indexed_count": 0,
}
if folder_names:
metadata["folder_names"] = folder_names
if file_names:
metadata["file_names"] = file_names
return await self.find_or_create_notification(
session=session,
user_id=user_id,
operation_id=operation_id,
title=title,
message=message,
search_space_id=search_space_id,
initial_metadata=metadata,
)
class NotificationService: class NotificationService:
"""Service for creating and managing notifications that sync via Electric SQL.""" """Service for creating and managing notifications that sync via Electric SQL."""