feat: improve indexing logic and duplicate handling in connectors

- Enhanced Google Calendar and Composio connector indexing to track and log duplicate content, preventing re-indexing of already processed events.
- Implemented robust error handling during final commits to manage integrity errors gracefully, ensuring successful indexing despite potential duplicates.
- Updated notification service to differentiate between actual errors and warnings for duplicate content, improving user feedback.
- Refactored date handling to ensure valid date ranges and adjusted end dates when necessary for better indexing accuracy.
This commit is contained in:
Anish Sarkar 2026-01-23 23:36:14 +05:30
parent d20bb385b5
commit c48ba36fa4
6 changed files with 198 additions and 35 deletions

View file

@ -18,7 +18,10 @@ from app.db import Document, DocumentType
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import calculate_date_range
from app.tasks.connector_indexers.base import (
calculate_date_range,
check_duplicate_document_by_hash,
)
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
@ -256,6 +259,7 @@ async def index_composio_google_calendar(
documents_indexed = 0
documents_skipped = 0
duplicate_content_count = 0 # Track events skipped due to duplicate content_hash
for event in events:
try:
@ -349,7 +353,25 @@ async def index_composio_google_calendar(
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
)
await session.commit()
await session.commit( )
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from standard connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
# A document with the same content already exists (likely from standard connector)
logger.info(
f"Event {summary} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document
@ -429,10 +451,28 @@ async def index_composio_google_calendar(
logger.info(
f"Final commit: Total {documents_indexed} Google Calendar events processed"
)
await session.commit()
logger.info(
"Successfully committed all Composio Google Calendar document changes to database"
)
try:
await session.commit()
logger.info(
"Successfully committed all Composio Google Calendar document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower():
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if duplicates were found
warning_message = None
if duplicate_content_count > 0:
warning_message = f"{duplicate_content_count} skipped (duplicate)"
await task_logger.log_task_success(
log_entry,
@ -440,10 +480,15 @@ async def index_composio_google_calendar(
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"duplicate_content_count": duplicate_content_count,
},
)
return documents_indexed, None
logger.info(
f"Composio Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped "
f"({duplicate_content_count} due to duplicate content from other connectors)"
)
return documents_indexed, warning_message
except Exception as e:
logger.error(

View file

@ -22,6 +22,8 @@ import logging
from datetime import UTC, datetime, timedelta
from typing import Any
import pytz
from dateutil.parser import isoparse
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy.exc import IntegrityError
@ -681,6 +683,22 @@ async def index_connector_content(
]:
# Default to today if no end_date provided (users can manually select future dates)
indexing_to = today_str if end_date is None else end_date
# If start_date and end_date are the same, adjust end_date to be one day later
# to ensure valid date range (start_date must be strictly before end_date)
if indexing_from == indexing_to:
dt = isoparse(indexing_to)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
else:
dt = dt.astimezone(pytz.UTC)
# Add one day to end_date to make it strictly after start_date
dt_end = dt + timedelta(days=1)
indexing_to = dt_end.strftime("%Y-%m-%d")
logger.info(
f"Adjusted end_date from {end_date} to {indexing_to} "
f"to ensure valid date range (start_date must be strictly before end_date)"
)
else:
# For non-calendar connectors, cap at today
indexing_to = end_date if end_date else today_str
@ -1231,20 +1249,48 @@ async def _run_indexing_with_notifications(
else:
# No new documents processed - check if this is an error or just no changes
if error_or_warning:
# Actual failure
logger.error(f"Indexing failed: {error_or_warning}")
if notification:
# Refresh notification to ensure it's not stale after indexing function commits
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=error_or_warning,
# Check if this is a duplicate warning (success case) or an actual error
# Handle both normal and Composio calendar connectors
error_or_warning_lower = str(error_or_warning).lower() if error_or_warning else ""
is_duplicate_warning = "skipped (duplicate)" in error_or_warning_lower
if is_duplicate_warning:
# Duplicate warnings are success cases - sync worked, just found duplicates
logger.info(
f"Indexing completed successfully: {error_or_warning}"
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
if update_timestamp_func:
await update_timestamp_func(session, connector_id)
await session.commit() # Commit timestamp update
if notification:
# Refresh notification to ensure it's not stale after timestamp update commit
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=error_or_warning, # Pass as warning, not error
is_warning=True, # Flag to indicate this is a warning, not an error
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
else:
# Actual failure
logger.error(f"Indexing failed: {error_or_warning}")
if notification:
# Refresh notification to ensure it's not stale after indexing function commits
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=error_or_warning,
)
await (
session.commit()
) # Commit to ensure Electric SQL syncs the notification update
else:
# Success - just no new documents to index (all skipped/unchanged)
logger.info(

View file

@ -335,6 +335,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
notification: Notification,
indexed_count: int,
error_message: str | None = None,
is_warning: bool = False,
) -> Notification:
"""
Update notification when connector indexing completes.
@ -343,7 +344,8 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
session: Database session
notification: Notification to update
indexed_count: Total number of items indexed
error_message: Error message if indexing failed (optional)
error_message: Error message if indexing failed, or warning message (optional)
is_warning: If True, treat error_message as a warning (success case) rather than an error
Returns:
Updated notification
@ -352,10 +354,26 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"connector_name", "Connector"
)
# If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
# Otherwise, treat it as a failure
if error_message:
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
status = "failed"
if indexed_count > 0:
# Partial success with warnings (e.g., duplicate content from other connectors)
title = f"Ready: {connector_name}"
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}"
status = "completed"
elif is_warning:
# Warning case (e.g., duplicates found) - treat as success
title = f"Ready: {connector_name}"
message = f"Sync completed. {error_message}"
status = "completed"
else:
# Complete failure
title = f"Failed: {connector_name}"
message = f"Sync failed: {error_message}"
status = "failed"
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
@ -367,7 +385,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
metadata_updates = {
"indexed_count": indexed_count,
"sync_stage": "completed" if not error_message else "failed",
"sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed",
"error_message": error_message,
}

View file

@ -23,6 +23,7 @@ from app.utils.document_converters import (
from .base import (
check_document_by_unique_identifier,
check_duplicate_document_by_hash,
get_connector_by_id,
get_current_timestamp,
logger,
@ -289,6 +290,7 @@ async def index_google_calendar_events(
documents_indexed = 0
documents_skipped = 0
skipped_events = []
duplicate_content_count = 0 # Track events skipped due to duplicate content_hash
for event in events:
try:
@ -409,6 +411,27 @@ async def index_google_calendar_events(
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
if duplicate_by_content:
# A document with the same content already exists (likely from Composio connector)
logger.info(
f"Event {event_summary} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content."
)
duplicate_content_count += 1
documents_skipped += 1
skipped_events.append(
f"{event_summary} (already indexed by another connector)"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
@ -501,7 +524,25 @@ async def index_google_calendar_events(
logger.info(
f"Final commit: Total {documents_indexed} Google Calendar events processed"
)
await session.commit()
try:
await session.commit()
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower():
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if duplicates were found
warning_message = None
if duplicate_content_count > 0:
warning_message = f"{duplicate_content_count} skipped (duplicate)"
await task_logger.log_task_success(
log_entry,
@ -510,14 +551,16 @@ async def index_google_calendar_events(
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"duplicate_content_count": duplicate_content_count,
"skipped_events_count": len(skipped_events),
},
)
logger.info(
f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped"
f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped "
f"({duplicate_content_count} due to duplicate content from other connectors)"
)
return total_processed, None
return total_processed, warning_message
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -100,10 +100,14 @@ export const ConnectorEditView: FC<ConnectorEditViewProps> = ({
// Reset local quick indexing state when indexing completes or fails
useEffect(() => {
if (!isIndexing) {
setIsQuickIndexing(false);
if (!isIndexing && isQuickIndexing) {
// Small delay to ensure smooth transition
const timer = setTimeout(() => {
setIsQuickIndexing(false);
}, 100);
return () => clearTimeout(timer);
}
}, [isIndexing]);
}, [isIndexing, isQuickIndexing]);
const handleDisconnectClick = () => {
setShowDisconnectConfirm(true);
@ -119,11 +123,11 @@ export const ConnectorEditView: FC<ConnectorEditViewProps> = ({
};
const handleQuickIndex = useCallback(() => {
if (onQuickIndex) {
if (onQuickIndex && !isQuickIndexing && !isIndexing) {
setIsQuickIndexing(true);
onQuickIndex();
}
}, [onQuickIndex]);
}, [onQuickIndex, isQuickIndexing, isIndexing]);
return (
<div className="flex-1 flex flex-col min-h-0 overflow-hidden">

View file

@ -1409,7 +1409,12 @@ export const useConnectorDialog = () => {
startDate?: Date,
endDate?: Date
) => {
if (!searchSpaceId) return;
if (!searchSpaceId) {
if (stopIndexing) {
stopIndexing(connectorId);
}
return;
}
// Track quick index clicked event
if (connectorType) {
@ -1437,6 +1442,8 @@ export const useConnectorDialog = () => {
queryClient.invalidateQueries({
queryKey: cacheKeys.logs.summary(Number(searchSpaceId)),
});
// Note: Don't call stopIndexing here - let useIndexingConnectors hook
// detect when last_indexed_at changes via Electric SQL
} catch (error) {
console.error("Error indexing connector content:", error);
toast.error(error instanceof Error ? error.message : "Failed to start indexing");
@ -1446,7 +1453,7 @@ export const useConnectorDialog = () => {
}
}
},
[searchSpaceId, indexConnector]
[searchSpaceId, indexConnector, queryClient]
);
// Handle going back from edit view