feat: update Google indexing functions to track skipped messages

- Modified the indexing functions for Google Calendar and Gmail to return the count of skipped messages alongside indexed messages, enhancing performance tracking.
- Updated related tests to accommodate the new return values, ensuring comprehensive coverage of the indexing process.
- Improved error handling to maintain consistency in returned values across different indexing functions.
This commit is contained in:
Anish Sarkar 2026-03-19 20:56:40 +05:30
parent 80f7d5f34a
commit 8e7cda31c5
5 changed files with 28 additions and 33 deletions

View file

@ -2287,10 +2287,9 @@ async def run_google_gmail_indexing(
end_date: str | None, end_date: str | None,
update_last_indexed: bool, update_last_indexed: bool,
on_heartbeat_callback=None, on_heartbeat_callback=None,
) -> tuple[int, str | None]: ) -> tuple[int, int, str | None]:
# Use a reasonable default for max_messages
max_messages = 1000 max_messages = 1000
indexed_count, error_message = await index_google_gmail_messages( indexed_count, skipped_count, error_message = await index_google_gmail_messages(
session=session, session=session,
connector_id=connector_id, connector_id=connector_id,
search_space_id=search_space_id, search_space_id=search_space_id,
@ -2301,8 +2300,7 @@ async def run_google_gmail_indexing(
max_messages=max_messages, max_messages=max_messages,
on_heartbeat_callback=on_heartbeat_callback, on_heartbeat_callback=on_heartbeat_callback,
) )
# index_google_gmail_messages returns (int, str) but we need (int, str | None) return indexed_count, skipped_count, error_message if error_message else None
return indexed_count, error_message if error_message else None
await _run_indexing_with_notifications( await _run_indexing_with_notifications(
session=session, session=session,

View file

@ -62,7 +62,7 @@ async def index_google_calendar_events(
end_date: str | None = None, end_date: str | None = None,
update_last_indexed: bool = True, update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None, on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]: ) -> tuple[int, int, str | None]:
""" """
Index Google Calendar events. Index Google Calendar events.
@ -78,7 +78,7 @@ async def index_google_calendar_events(
on_heartbeat_callback: Optional callback to update notification during long-running indexing. on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns: Returns:
Tuple containing (number of documents indexed, error message or None) Tuple containing (number of documents indexed, number of documents skipped, error message or None)
""" """
task_logger = TaskLoggingService(session, search_space_id) task_logger = TaskLoggingService(session, search_space_id)
@ -110,7 +110,7 @@ async def index_google_calendar_events(
"Connector not found", "Connector not found",
{"error_type": "ConnectorNotFound"}, {"error_type": "ConnectorNotFound"},
) )
return 0, f"Connector with ID {connector_id} not found" return 0, 0, f"Connector with ID {connector_id} not found"
# Build credentials based on connector type # Build credentials based on connector type
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
@ -124,7 +124,7 @@ async def index_google_calendar_events(
"Missing Composio account", "Missing Composio account",
{"error_type": "MissingComposioAccount"}, {"error_type": "MissingComposioAccount"},
) )
return 0, "Composio connected_account_id not found" return 0, 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id) credentials = build_composio_credentials(connected_account_id)
else: else:
config_data = connector.config config_data = connector.config
@ -158,7 +158,7 @@ async def index_google_calendar_events(
"Credential decryption failed", "Credential decryption failed",
{"error_type": "CredentialDecryptionError"}, {"error_type": "CredentialDecryptionError"},
) )
return 0, f"Failed to decrypt Google Calendar credentials: {e!s}" return 0, 0, f"Failed to decrypt Google Calendar credentials: {e!s}"
exp = config_data.get("expiry", "") exp = config_data.get("expiry", "")
if exp: if exp:
@ -184,7 +184,7 @@ async def index_google_calendar_events(
"Missing Google Calendar credentials", "Missing Google Calendar credentials",
{"error_type": "MissingCredentials"}, {"error_type": "MissingCredentials"},
) )
return 0, "Google Calendar credentials not found in connector config" return 0, 0, "Google Calendar credentials not found in connector config"
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
@ -303,7 +303,7 @@ async def index_google_calendar_events(
f"No Google Calendar events found in date range {start_date_str} to {end_date_str}", f"No Google Calendar events found in date range {start_date_str} to {end_date_str}",
{"events_found": 0}, {"events_found": 0},
) )
return 0, None return 0, 0, None
else: else:
logger.error(f"Failed to get Google Calendar events: {error}") logger.error(f"Failed to get Google Calendar events: {error}")
# Check if this is an authentication error that requires re-authentication # Check if this is an authentication error that requires re-authentication
@ -323,13 +323,13 @@ async def index_google_calendar_events(
error, error,
{"error_type": error_type}, {"error_type": error_type},
) )
return 0, error_message return 0, 0, error_message
logger.info(f"Retrieved {len(events)} events from Google Calendar API") logger.info(f"Retrieved {len(events)} events from Google Calendar API")
except Exception as e: except Exception as e:
logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True) logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True)
return 0, f"Error fetching Google Calendar events: {e!s}" return 0, 0, f"Error fetching Google Calendar events: {e!s}"
documents_indexed = 0 documents_indexed = 0
documents_skipped = 0 documents_skipped = 0
@ -631,7 +631,7 @@ async def index_google_calendar_events(
f"{documents_skipped} skipped, {documents_failed} failed " f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)" f"({duplicate_content_count} duplicate content)"
) )
return total_processed, warning_message return total_processed, documents_skipped, warning_message
except SQLAlchemyError as db_error: except SQLAlchemyError as db_error:
await session.rollback() await session.rollback()
@ -642,7 +642,7 @@ async def index_google_calendar_events(
{"error_type": "SQLAlchemyError"}, {"error_type": "SQLAlchemyError"},
) )
logger.error(f"Database error: {db_error!s}", exc_info=True) logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}" return 0, 0, f"Database error: {db_error!s}"
except Exception as e: except Exception as e:
await session.rollback() await session.rollback()
await task_logger.log_task_failure( await task_logger.log_task_failure(
@ -652,4 +652,4 @@ async def index_google_calendar_events(
{"error_type": type(e).__name__}, {"error_type": type(e).__name__},
) )
logger.error(f"Failed to index Google Calendar events: {e!s}", exc_info=True) logger.error(f"Failed to index Google Calendar events: {e!s}", exc_info=True)
return 0, f"Failed to index Google Calendar events: {e!s}" return 0, 0, f"Failed to index Google Calendar events: {e!s}"

View file

@ -68,7 +68,7 @@ async def index_google_gmail_messages(
update_last_indexed: bool = True, update_last_indexed: bool = True,
max_messages: int = 1000, max_messages: int = 1000,
on_heartbeat_callback: HeartbeatCallbackType | None = None, on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str]: ) -> tuple[int, int, str | None]:
""" """
Index Gmail messages for a specific connector. Index Gmail messages for a specific connector.
@ -84,7 +84,7 @@ async def index_google_gmail_messages(
on_heartbeat_callback: Optional callback to update notification during long-running indexing. on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns: Returns:
Tuple of (number_of_indexed_messages, status_message) Tuple of (number_of_indexed_messages, number_of_skipped_messages, status_message)
""" """
task_logger = TaskLoggingService(session, search_space_id) task_logger = TaskLoggingService(session, search_space_id)
@ -115,7 +115,7 @@ async def index_google_gmail_messages(
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
) )
return 0, error_msg return 0, 0, error_msg
# Build credentials based on connector type # Build credentials based on connector type
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
@ -129,7 +129,7 @@ async def index_google_gmail_messages(
"Missing Composio account", "Missing Composio account",
{"error_type": "MissingComposioAccount"}, {"error_type": "MissingComposioAccount"},
) )
return 0, "Composio connected_account_id not found" return 0, 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id) credentials = build_composio_credentials(connected_account_id)
else: else:
config_data = connector.config config_data = connector.config
@ -163,7 +163,7 @@ async def index_google_gmail_messages(
"Credential decryption failed", "Credential decryption failed",
{"error_type": "CredentialDecryptionError"}, {"error_type": "CredentialDecryptionError"},
) )
return 0, f"Failed to decrypt Google Gmail credentials: {e!s}" return 0, 0, f"Failed to decrypt Google Gmail credentials: {e!s}"
exp = config_data.get("expiry", "") exp = config_data.get("expiry", "")
if exp: if exp:
@ -189,7 +189,7 @@ async def index_google_gmail_messages(
"Missing Google gmail credentials", "Missing Google gmail credentials",
{"error_type": "MissingCredentials"}, {"error_type": "MissingCredentials"},
) )
return 0, "Google gmail credentials not found in connector config" return 0, 0, "Google gmail credentials not found in connector config"
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
@ -234,14 +234,14 @@ async def index_google_gmail_messages(
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, error_message, error, {"error_type": error_type} log_entry, error_message, error, {"error_type": error_type}
) )
return 0, error_message return 0, 0, error_message
if not messages: if not messages:
success_msg = "No Google gmail messages found in the specified date range" success_msg = "No Google gmail messages found in the specified date range"
await task_logger.log_task_success( await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0} log_entry, success_msg, {"messages_count": 0}
) )
return 0, success_msg return 0, 0, success_msg
logger.info(f"Found {len(messages)} Google gmail messages to index") logger.info(f"Found {len(messages)} Google gmail messages to index")
@ -550,10 +550,7 @@ async def index_google_gmail_messages(
f"{documents_skipped} skipped, {documents_failed} failed " f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)" f"({duplicate_content_count} duplicate content)"
) )
return ( return total_processed, documents_skipped, warning_message
total_processed,
warning_message,
) # Return warning_message (None on success)
except SQLAlchemyError as db_error: except SQLAlchemyError as db_error:
await session.rollback() await session.rollback()
@ -564,7 +561,7 @@ async def index_google_gmail_messages(
{"error_type": "SQLAlchemyError"}, {"error_type": "SQLAlchemyError"},
) )
logger.error(f"Database error: {db_error!s}", exc_info=True) logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}" return 0, 0, f"Database error: {db_error!s}"
except Exception as e: except Exception as e:
await session.rollback() await session.rollback()
await task_logger.log_task_failure( await task_logger.log_task_failure(
@ -574,4 +571,4 @@ async def index_google_gmail_messages(
{"error_type": type(e).__name__}, {"error_type": type(e).__name__},
) )
logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True) logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True)
return 0, f"Failed to index Google gmail emails: {e!s}" return 0, 0, f"Failed to index Google gmail emails: {e!s}"

View file

@ -106,7 +106,7 @@ async def test_composio_calendar_without_account_id_returns_error(
maker = make_session_factory(async_engine) maker = make_session_factory(async_engine)
async with maker() as session: async with maker() as session:
count, error = await index_google_calendar_events( count, _skipped, error = await index_google_calendar_events(
session=session, connector_id=data["connector_id"], session=session, connector_id=data["connector_id"],
search_space_id=data["search_space_id"], user_id=data["user_id"], search_space_id=data["search_space_id"], user_id=data["user_id"],
) )

View file

@ -106,7 +106,7 @@ async def test_composio_gmail_without_account_id_returns_error(
maker = make_session_factory(async_engine) maker = make_session_factory(async_engine)
async with maker() as session: async with maker() as session:
count, error = await index_google_gmail_messages( count, _skipped, error = await index_google_gmail_messages(
session=session, connector_id=data["connector_id"], session=session, connector_id=data["connector_id"],
search_space_id=data["search_space_id"], user_id=data["user_id"], search_space_id=data["search_space_id"], user_id=data["user_id"],
) )