feat(indexing): add content hash check to prevent duplicate indexing and update return values for indexing functions

This commit is contained in:
Anish Sarkar 2026-01-28 03:55:25 +05:30
parent 8a5f6ecce1
commit 3af4fd0533
5 changed files with 76 additions and 25 deletions

View file

@ -464,6 +464,22 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first() return existing_doc_result.scalars().first()
async def check_document_by_content_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
"""Check if a document with the given content hash already exists.
This is used to prevent duplicate content from being indexed, regardless
of which connector originally indexed it.
"""
from sqlalchemy.future import select
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()
async def update_connector_last_indexed( async def update_connector_last_indexed(
session: AsyncSession, session: AsyncSession,
connector, connector,
@ -487,8 +503,11 @@ async def index_composio_google_drive(
log_entry, log_entry,
update_last_indexed: bool = True, update_last_indexed: bool = True,
max_items: int = 1000, max_items: int = 1000,
) -> tuple[int, str]: ) -> tuple[int, int, str | None]:
"""Index Google Drive files via Composio with delta sync support. """Index Google Drive files via Composio with delta sync support.
Returns:
Tuple of (documents_indexed, documents_skipped, error_message or None)
Delta Sync Flow: Delta Sync Flow:
1. First sync: Full scan + get initial page token 1. First sync: Full scan + get initial page token
@ -628,11 +647,11 @@ async def index_composio_google_drive(
}, },
) )
return documents_indexed, error_message return documents_indexed, documents_skipped, error_message
except Exception as e: except Exception as e:
logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive via Composio: {e!s}" return 0, 0, f"Failed to index Google Drive via Composio: {e!s}"
async def _index_composio_drive_delta_sync( async def _index_composio_drive_delta_sync(
@ -1000,7 +1019,7 @@ async def _process_single_drive_file(
if existing_document: if existing_document:
if existing_document.content_hash == content_hash: if existing_document.content_hash == content_hash:
return 0, 1, processing_errors # Skipped return 0, 1, processing_errors # Skipped - unchanged
# Update existing document # Update existing document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id) user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1039,7 +1058,17 @@ async def _process_single_drive_file(
existing_document.chunks = chunks existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp() existing_document.updated_at = get_current_timestamp()
return 1, 0, processing_errors # Indexed return 1, 0, processing_errors # Indexed - updated
# Check if content_hash already exists (from any connector)
# This prevents duplicate content and avoids IntegrityError on unique constraint
existing_by_content_hash = await check_document_by_content_hash(session, content_hash)
if existing_by_content_hash:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): identical content "
f"already indexed as '{existing_by_content_hash.title}'"
)
return 0, 1, processing_errors # Skipped - duplicate content
# Create new document # Create new document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id) user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1085,7 +1114,7 @@ async def _process_single_drive_file(
) )
session.add(document) session.add(document)
return 1, 0, processing_errors # Indexed return 1, 0, processing_errors # Indexed - new
async def _fetch_folder_files_recursively( async def _fetch_folder_files_recursively(

View file

@ -1180,7 +1180,8 @@ async def _run_indexing_with_notifications(
) )
# Run the indexing function # Run the indexing function
documents_processed, error_or_warning = await indexing_function( # Some indexers return (indexed, error), others return (indexed, skipped, error)
result = await indexing_function(
session=session, session=session,
connector_id=connector_id, connector_id=connector_id,
search_space_id=search_space_id, search_space_id=search_space_id,
@ -1189,6 +1190,13 @@ async def _run_indexing_with_notifications(
end_date=end_date, end_date=end_date,
update_last_indexed=False, update_last_indexed=False,
) )
# Handle both 2-tuple and 3-tuple returns for backwards compatibility
if len(result) == 3:
documents_processed, documents_skipped, error_or_warning = result
else:
documents_processed, error_or_warning = result
documents_skipped = None
# Update connector timestamp if function provided and indexing was successful # Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func: if documents_processed > 0 and update_timestamp_func:
@ -1216,6 +1224,7 @@ async def _run_indexing_with_notifications(
notification=notification, notification=notification,
indexed_count=documents_processed, indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
) )
await ( await (
session.commit() session.commit()
@ -1242,6 +1251,7 @@ async def _run_indexing_with_notifications(
notification=notification, notification=notification,
indexed_count=documents_processed, indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
) )
await ( await (
session.commit() session.commit()
@ -1283,6 +1293,7 @@ async def _run_indexing_with_notifications(
indexed_count=0, indexed_count=0,
error_message=notification_message, # Pass as warning, not error error_message=notification_message, # Pass as warning, not error
is_warning=True, # Flag to indicate this is a warning, not an error is_warning=True, # Flag to indicate this is a warning, not an error
skipped_count=documents_skipped,
) )
await ( await (
session.commit() session.commit()
@ -1298,6 +1309,7 @@ async def _run_indexing_with_notifications(
notification=notification, notification=notification,
indexed_count=0, indexed_count=0,
error_message=error_or_warning, error_message=error_or_warning,
skipped_count=documents_skipped,
) )
await ( await (
session.commit() session.commit()
@ -1319,6 +1331,7 @@ async def _run_indexing_with_notifications(
notification=notification, notification=notification,
indexed_count=0, indexed_count=0,
error_message=None, # No error - sync succeeded error_message=None, # No error - sync succeeded
skipped_count=documents_skipped,
) )
await ( await (
session.commit() session.commit()
@ -1336,6 +1349,7 @@ async def _run_indexing_with_notifications(
notification=notification, notification=notification,
indexed_count=0, indexed_count=0,
error_message=str(e), error_message=str(e),
skipped_count=None, # Unknown on exception
) )
except Exception as notif_error: except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}") logger.error(f"Failed to update notification: {notif_error!s}")

View file

@ -336,6 +336,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
indexed_count: int, indexed_count: int,
error_message: str | None = None, error_message: str | None = None,
is_warning: bool = False, is_warning: bool = False,
skipped_count: int | None = None,
) -> Notification: ) -> Notification:
""" """
Update notification when connector indexing completes. Update notification when connector indexing completes.
@ -346,6 +347,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
indexed_count: Total number of items indexed indexed_count: Total number of items indexed
error_message: Error message if indexing failed, or warning message (optional) error_message: Error message if indexing failed, or warning message (optional)
is_warning: If True, treat error_message as a warning (success case) rather than an error is_warning: If True, treat error_message as a warning (success case) rather than an error
skipped_count: Number of items skipped (e.g., duplicates) - optional
Returns: Returns:
Updated notification Updated notification
@ -354,6 +356,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"connector_name", "Connector" "connector_name", "Connector"
) )
# Build the skipped text if there are skipped items
skipped_text = ""
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
skipped_text = f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
# If there's an error message but items were indexed, treat it as a warning (partial success) # If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
# Otherwise, treat it as a failure # Otherwise, treat it as a failure
@ -362,12 +370,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
# Partial success with warnings (e.g., duplicate content from other connectors) # Partial success with warnings (e.g., duplicate content from other connectors)
title = f"Ready: {connector_name}" title = f"Ready: {connector_name}"
item_text = "item" if indexed_count == 1 else "items" item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}" message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}"
status = "completed" status = "completed"
elif is_warning: elif is_warning:
# Warning case (e.g., duplicates found) - treat as success # Warning case (e.g., duplicates found) - treat as success
title = f"Ready: {connector_name}" title = f"Ready: {connector_name}"
message = f"Sync completed. {error_message}" message = f"Sync completed{skipped_text}. {error_message}"
status = "completed" status = "completed"
else: else:
# Complete failure # Complete failure
@ -377,14 +385,19 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
else: else:
title = f"Ready: {connector_name}" title = f"Ready: {connector_name}"
if indexed_count == 0: if indexed_count == 0:
message = "Already up to date! No new items to sync." if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)."
else:
message = "Already up to date! No new items to sync."
else: else:
item_text = "item" if indexed_count == 1 else "items" item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced." message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
status = "completed" status = "completed"
metadata_updates = { metadata_updates = {
"indexed_count": indexed_count, "indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"sync_stage": "completed" "sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0) if (not error_message or is_warning or indexed_count > 0)
else "failed", else "failed",

View file

@ -86,7 +86,7 @@ async def index_composio_connector(
end_date: str | None = None, end_date: str | None = None,
update_last_indexed: bool = True, update_last_indexed: bool = True,
max_items: int = 1000, max_items: int = 1000,
) -> tuple[int, str]: ) -> tuple[int, int, str | None]:
""" """
Index content from a Composio connector. Index content from a Composio connector.
@ -104,7 +104,7 @@ async def index_composio_connector(
max_items: Maximum number of items to fetch max_items: Maximum number of items to fetch
Returns: Returns:
Tuple of (number_of_indexed_items, error_message or None) Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None)
""" """
task_logger = TaskLoggingService(session, search_space_id) task_logger = TaskLoggingService(session, search_space_id)
@ -132,14 +132,14 @@ async def index_composio_connector(
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "InvalidConnectorType"} log_entry, error_msg, {"error_type": "InvalidConnectorType"}
) )
return 0, error_msg return 0, 0, error_msg
if not connector: if not connector:
error_msg = f"Composio connector with ID {connector_id} not found" error_msg = f"Composio connector with ID {connector_id} not found"
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"} log_entry, error_msg, {"error_type": "ConnectorNotFound"}
) )
return 0, error_msg return 0, 0, error_msg
# Get toolkit ID from config # Get toolkit ID from config
toolkit_id = connector.config.get("toolkit_id") toolkit_id = connector.config.get("toolkit_id")
@ -150,7 +150,7 @@ async def index_composio_connector(
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingToolkitId"} log_entry, error_msg, {"error_type": "MissingToolkitId"}
) )
return 0, error_msg return 0, 0, error_msg
# Check if toolkit is indexable # Check if toolkit is indexable
if toolkit_id not in INDEXABLE_TOOLKITS: if toolkit_id not in INDEXABLE_TOOLKITS:
@ -158,7 +158,7 @@ async def index_composio_connector(
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ToolkitNotIndexable"} log_entry, error_msg, {"error_type": "ToolkitNotIndexable"}
) )
return 0, error_msg return 0, 0, error_msg
# Get indexer function from registry # Get indexer function from registry
try: try:
@ -167,7 +167,7 @@ async def index_composio_connector(
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, str(e), {"error_type": "NoIndexerImplemented"} log_entry, str(e), {"error_type": "NoIndexerImplemented"}
) )
return 0, str(e) return 0, 0, str(e)
# Build kwargs for the indexer function # Build kwargs for the indexer function
kwargs = { kwargs = {
@ -199,7 +199,7 @@ async def index_composio_connector(
{"error_type": "SQLAlchemyError"}, {"error_type": "SQLAlchemyError"},
) )
logger.error(f"Database error: {db_error!s}", exc_info=True) logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}" return 0, 0, f"Database error: {db_error!s}"
except Exception as e: except Exception as e:
await session.rollback() await session.rollback()
await task_logger.log_task_failure( await task_logger.log_task_failure(
@ -209,4 +209,4 @@ async def index_composio_connector(
{"error_type": type(e).__name__}, {"error_type": type(e).__name__},
) )
logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True) logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True)
return 0, f"Failed to index Composio connector: {e!s}" return 0, 0, f"Failed to index Composio connector: {e!s}"

View file

@ -24,11 +24,6 @@
"enabled": true, "enabled": true,
"status": "warning", "status": "warning",
"statusMessage": "Some requests may be blocked if not using Firecrawl." "statusMessage": "Some requests may be blocked if not using Firecrawl."
},
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": {
"enabled": false,
"status": "disabled",
"statusMessage": "Not available yet."
} }
}, },
"globalSettings": { "globalSettings": {