From 8e7cda31c59ed49d301cd4129727c674ba2dffcc Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Thu, 19 Mar 2026 20:56:40 +0530 Subject: [PATCH] feat: update Google indexing functions to track skipped messages - Modified the indexing functions for Google Calendar and Gmail to return the count of skipped messages alongside indexed messages, enhancing performance tracking. - Updated related tests to accommodate the new return values, ensuring comprehensive coverage of the indexing process. - Improved error handling to maintain consistency in returned values across different indexing functions. --- .../routes/search_source_connectors_routes.py | 8 +++--- .../google_calendar_indexer.py | 24 +++++++++--------- .../google_gmail_indexer.py | 25 ++++++++----------- .../test_calendar_indexer_credentials.py | 2 +- .../test_gmail_indexer_credentials.py | 2 +- 5 files changed, 28 insertions(+), 33 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 0252d0882..e6ed007d7 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -2287,10 +2287,9 @@ async def run_google_gmail_indexing( end_date: str | None, update_last_indexed: bool, on_heartbeat_callback=None, - ) -> tuple[int, str | None]: - # Use a reasonable default for max_messages + ) -> tuple[int, int, str | None]: max_messages = 1000 - indexed_count, error_message = await index_google_gmail_messages( + indexed_count, skipped_count, error_message = await index_google_gmail_messages( session=session, connector_id=connector_id, search_space_id=search_space_id, @@ -2301,8 +2300,7 @@ async def run_google_gmail_indexing( max_messages=max_messages, on_heartbeat_callback=on_heartbeat_callback, ) - # index_google_gmail_messages returns (int, str) but we need (int, str | None) - return indexed_count, error_message if error_message else None + return indexed_count, skipped_count, error_message if error_message else None await _run_indexing_with_notifications( session=session, diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 9ea76c851..2a866e411 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -62,7 +62,7 @@ async def index_google_calendar_events( end_date: str | None = None, update_last_indexed: bool = True, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str | None]: +) -> tuple[int, int, str | None]: """ Index Google Calendar events. @@ -78,7 +78,7 @@ async def index_google_calendar_events( on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: - Tuple containing (number of documents indexed, error message or None) + Tuple containing (number of documents indexed, number of documents skipped, error message or None) """ task_logger = TaskLoggingService(session, search_space_id) @@ -110,7 +110,7 @@ async def index_google_calendar_events( "Connector not found", {"error_type": "ConnectorNotFound"}, ) - return 0, f"Connector with ID {connector_id} not found" + return 0, 0, f"Connector with ID {connector_id} not found" # Build credentials based on connector type if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: @@ -124,7 +124,7 @@ async def index_google_calendar_events( "Missing Composio account", {"error_type": "MissingComposioAccount"}, ) - return 0, "Composio connected_account_id not found" + return 0, 0, "Composio connected_account_id not found" credentials = build_composio_credentials(connected_account_id) else: config_data = connector.config @@ -158,7 +158,7 @@ async def index_google_calendar_events( "Credential decryption failed", {"error_type": "CredentialDecryptionError"}, ) - return 0, f"Failed to decrypt Google Calendar credentials: {e!s}" + return 0, 0, f"Failed to decrypt Google Calendar credentials: {e!s}" exp = config_data.get("expiry", "") if exp: @@ -184,7 +184,7 @@ async def index_google_calendar_events( "Missing Google Calendar credentials", {"error_type": "MissingCredentials"}, ) - return 0, "Google Calendar credentials not found in connector config" + return 0, 0, "Google Calendar credentials not found in connector config" await task_logger.log_task_progress( log_entry, @@ -303,7 +303,7 @@ async def index_google_calendar_events( f"No Google Calendar events found in date range {start_date_str} to {end_date_str}", {"events_found": 0}, ) - return 0, None + return 0, 0, None else: logger.error(f"Failed to get Google Calendar events: {error}") # Check if this is an authentication error that requires re-authentication @@ -323,13 +323,13 @@ async def index_google_calendar_events( error, {"error_type": error_type}, ) - return 0, error_message + return 0, 0, error_message logger.info(f"Retrieved {len(events)} events from Google Calendar API") except Exception as e: logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True) - return 0, f"Error fetching Google Calendar events: {e!s}" + return 0, 0, f"Error fetching Google Calendar events: {e!s}" documents_indexed = 0 documents_skipped = 0 @@ -631,7 +631,7 @@ async def index_google_calendar_events( f"{documents_skipped} skipped, {documents_failed} failed " f"({duplicate_content_count} duplicate content)" ) - return total_processed, warning_message + return total_processed, documents_skipped, warning_message except SQLAlchemyError as db_error: await session.rollback() @@ -642,7 +642,7 @@ async def index_google_calendar_events( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -652,4 +652,4 @@ async def index_google_calendar_events( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Google Calendar events: {e!s}", exc_info=True) - return 0, f"Failed to index Google Calendar events: {e!s}" + return 0, 0, f"Failed to index Google Calendar events: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index a1eee91d9..88b25adaa 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -68,7 +68,7 @@ async def index_google_gmail_messages( update_last_indexed: bool = True, max_messages: int = 1000, on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """ Index Gmail messages for a specific connector. @@ -84,7 +84,7 @@ async def index_google_gmail_messages( on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: - Tuple of (number_of_indexed_messages, status_message) + Tuple of (number_of_indexed_messages, number_of_skipped_messages, status_message) """ task_logger = TaskLoggingService(session, search_space_id) @@ -115,7 +115,7 @@ async def index_google_gmail_messages( await task_logger.log_task_failure( log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) - return 0, error_msg + return 0, 0, error_msg # Build credentials based on connector type if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: @@ -129,7 +129,7 @@ async def index_google_gmail_messages( "Missing Composio account", {"error_type": "MissingComposioAccount"}, ) - return 0, "Composio connected_account_id not found" + return 0, 0, "Composio connected_account_id not found" credentials = build_composio_credentials(connected_account_id) else: config_data = connector.config @@ -163,7 +163,7 @@ async def index_google_gmail_messages( "Credential decryption failed", {"error_type": "CredentialDecryptionError"}, ) - return 0, f"Failed to decrypt Google Gmail credentials: {e!s}" + return 0, 0, f"Failed to decrypt Google Gmail credentials: {e!s}" exp = config_data.get("expiry", "") if exp: @@ -189,7 +189,7 @@ async def index_google_gmail_messages( "Missing Google gmail credentials", {"error_type": "MissingCredentials"}, ) - return 0, "Google gmail credentials not found in connector config" + return 0, 0, "Google gmail credentials not found in connector config" await task_logger.log_task_progress( log_entry, @@ -234,14 +234,14 @@ async def index_google_gmail_messages( await task_logger.log_task_failure( log_entry, error_message, error, {"error_type": error_type} ) - return 0, error_message + return 0, 0, error_message if not messages: success_msg = "No Google gmail messages found in the specified date range" await task_logger.log_task_success( log_entry, success_msg, {"messages_count": 0} ) - return 0, success_msg + return 0, 0, success_msg logger.info(f"Found {len(messages)} Google gmail messages to index") @@ -550,10 +550,7 @@ async def index_google_gmail_messages( f"{documents_skipped} skipped, {documents_failed} failed " f"({duplicate_content_count} duplicate content)" ) - return ( - total_processed, - warning_message, - ) # Return warning_message (None on success) + return total_processed, documents_skipped, warning_message except SQLAlchemyError as db_error: await session.rollback() @@ -564,7 +561,7 @@ async def index_google_gmail_messages( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -574,4 +571,4 @@ async def index_google_gmail_messages( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True) - return 0, f"Failed to index Google gmail emails: {e!s}" + return 0, 0, f"Failed to index Google gmail emails: {e!s}" diff --git a/surfsense_backend/tests/integration/google_unification/test_calendar_indexer_credentials.py b/surfsense_backend/tests/integration/google_unification/test_calendar_indexer_credentials.py index b5d60e55b..7e4723dce 100644 --- a/surfsense_backend/tests/integration/google_unification/test_calendar_indexer_credentials.py +++ b/surfsense_backend/tests/integration/google_unification/test_calendar_indexer_credentials.py @@ -106,7 +106,7 @@ async def test_composio_calendar_without_account_id_returns_error( maker = make_session_factory(async_engine) async with maker() as session: - count, error = await index_google_calendar_events( + count, _skipped, error = await index_google_calendar_events( session=session, connector_id=data["connector_id"], search_space_id=data["search_space_id"], user_id=data["user_id"], ) diff --git a/surfsense_backend/tests/integration/google_unification/test_gmail_indexer_credentials.py b/surfsense_backend/tests/integration/google_unification/test_gmail_indexer_credentials.py index 8fbf1f320..4097b2e95 100644 --- a/surfsense_backend/tests/integration/google_unification/test_gmail_indexer_credentials.py +++ b/surfsense_backend/tests/integration/google_unification/test_gmail_indexer_credentials.py @@ -106,7 +106,7 @@ async def test_composio_gmail_without_account_id_returns_error( maker = make_session_factory(async_engine) async with maker() as session: - count, error = await index_google_gmail_messages( + count, _skipped, error = await index_google_gmail_messages( session=session, connector_id=data["connector_id"], search_space_id=data["search_space_id"], user_id=data["user_id"], )