Merge pull request #749 from AnishSarkar22/fix/drive-connector

fix: composio drive connector
This commit is contained in:
Rohan Verma 2026-01-28 16:39:40 -08:00 committed by GitHub
commit f774aa4af0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 203 additions and 30 deletions

View file

@ -4,6 +4,8 @@ Composio Google Drive Connector Module.
Provides Google Drive specific methods for data retrieval and indexing via Composio.
"""
import hashlib
import json
import logging
import os
import tempfile
@ -464,6 +466,55 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first()
async def check_document_by_content_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
"""Check if a document with the given content hash already exists.
This is used to prevent duplicate content from being indexed, regardless
of which connector originally indexed it.
"""
from sqlalchemy.future import select
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()
async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
"""Check if a document with this Google Drive file ID exists (from any connector).
This checks both metadata key formats:
- 'google_drive_file_id' (normal Google Drive connector)
- 'file_id' (Composio Google Drive connector)
This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls.
"""
from sqlalchemy import String, cast, or_
from sqlalchemy.future import select
# When casting JSON to String, the result includes quotes: "value" instead of value
# So we need to compare with the quoted version
quoted_file_id = f'"{file_id}"'
existing_doc_result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
or_(
# Normal Google Drive connector format
cast(Document.document_metadata["google_drive_file_id"], String)
== quoted_file_id,
# Composio Google Drive connector format
cast(Document.document_metadata["file_id"], String) == quoted_file_id,
),
)
)
return existing_doc_result.scalars().first()
async def update_connector_last_indexed(
session: AsyncSession,
connector,
@ -477,6 +528,33 @@ async def update_connector_last_indexed(
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
def generate_indexing_settings_hash(
selected_folders: list[dict],
selected_files: list[dict],
indexing_options: dict,
) -> str:
"""Generate a hash of indexing settings to detect configuration changes.
This hash is used to determine if indexing settings have changed since
the last index, which would require a full re-scan instead of delta sync.
Args:
selected_folders: List of {id, name} for folders to index
selected_files: List of {id, name} for individual files to index
indexing_options: Dict with max_files_per_folder, include_subfolders, etc.
Returns:
MD5 hash string of the settings
"""
settings = {
"folders": sorted([f.get("id", "") for f in selected_folders]),
"files": sorted([f.get("id", "") for f in selected_files]),
"include_subfolders": indexing_options.get("include_subfolders", True),
"max_files_per_folder": indexing_options.get("max_files_per_folder", 100),
}
return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest()
async def index_composio_google_drive(
session: AsyncSession,
connector,
@ -487,12 +565,16 @@ async def index_composio_google_drive(
log_entry,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
) -> tuple[int, int, str | None]:
"""Index Google Drive files via Composio with delta sync support.
Returns:
Tuple of (documents_indexed, documents_skipped, error_message or None)
Delta Sync Flow:
1. First sync: Full scan + get initial page token
2. Subsequent syncs: Use LIST_CHANGES to process only changed files
(unless settings changed or incremental_sync is disabled)
Supports folder/file selection via connector config:
- selected_folders: List of {id, name} for folders to index
@ -508,12 +590,42 @@ async def index_composio_google_drive(
selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get("indexing_options", {})
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
use_delta_sync = stored_page_token and connector.last_indexed_at
max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True)
incremental_sync = indexing_options.get("incremental_sync", True)
# Generate current settings hash to detect configuration changes
current_settings_hash = generate_indexing_settings_hash(
selected_folders, selected_files, indexing_options
)
last_settings_hash = connector_config.get("last_indexed_settings_hash")
# Detect if settings changed since last index
settings_changed = (
last_settings_hash is not None
and current_settings_hash != last_settings_hash
)
if settings_changed:
logger.info(
f"Indexing settings changed for connector {connector_id}. "
f"Will perform full re-scan to apply new configuration."
)
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
# Determine whether to use delta sync:
# - Must have a stored page token
# - Must have been indexed before (last_indexed_at exists)
# - User must have incremental_sync enabled
# - Settings must not have changed (folder/subfolder config)
use_delta_sync = (
incremental_sync
and stored_page_token
and connector.last_indexed_at
and not settings_changed
)
# Route to delta sync or full scan
if use_delta_sync:
@ -588,6 +700,14 @@ async def index_composio_google_drive(
elif token_error:
logger.warning(f"Failed to get new page token: {token_error}")
# Save current settings hash for future change detection
# This allows detecting when folder/subfolder settings change
if not connector.config:
connector.config = {}
connector.config["last_indexed_settings_hash"] = current_settings_hash
flag_modified(connector, "config")
logger.info(f"Saved indexing settings hash for connector {connector_id}")
# CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
@ -628,11 +748,11 @@ async def index_composio_google_drive(
},
)
return documents_indexed, error_message
return documents_indexed, documents_skipped, error_message
except Exception as e:
logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive via Composio: {e!s}"
return 0, 0, f"Failed to index Google Drive via Composio: {e!s}"
async def _index_composio_drive_delta_sync(
@ -953,13 +1073,28 @@ async def _process_single_drive_file(
"""
processing_errors = []
# ========== EARLY DUPLICATE CHECK BY FILE ID ==========
# Check if this Google Drive file was already indexed by ANY connector
# This happens BEFORE download/ETL to save expensive API calls
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
if existing_by_file_id:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' "
f"(saved download & ETL cost)"
)
return 0, 1, processing_errors # Skip - NO download, NO ETL!
# ======================================================
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists
# Check if document exists by unique identifier (same connector, same file)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
@ -1000,7 +1135,7 @@ async def _process_single_drive_file(
if existing_document:
if existing_document.content_hash == content_hash:
return 0, 1, processing_errors # Skipped
return 0, 1, processing_errors # Skipped - unchanged
# Update existing document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1039,7 +1174,19 @@ async def _process_single_drive_file(
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
return 1, 0, processing_errors # Indexed
return 1, 0, processing_errors # Indexed - updated
# Check if content_hash already exists (from any connector)
# This prevents duplicate content and avoids IntegrityError on unique constraint
existing_by_content_hash = await check_document_by_content_hash(
session, content_hash
)
if existing_by_content_hash:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): identical content "
f"already indexed as '{existing_by_content_hash.title}'"
)
return 0, 1, processing_errors # Skipped - duplicate content
# Create new document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
@ -1085,7 +1232,7 @@ async def _process_single_drive_file(
)
session.add(document)
return 1, 0, processing_errors # Indexed
return 1, 0, processing_errors # Indexed - new
async def _fetch_folder_files_recursively(

View file

@ -1180,7 +1180,8 @@ async def _run_indexing_with_notifications(
)
# Run the indexing function
documents_processed, error_or_warning = await indexing_function(
# Some indexers return (indexed, error), others return (indexed, skipped, error)
result = await indexing_function(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
@ -1190,6 +1191,13 @@ async def _run_indexing_with_notifications(
update_last_indexed=False,
)
# Handle both 2-tuple and 3-tuple returns for backwards compatibility
if len(result) == 3:
documents_processed, documents_skipped, error_or_warning = result
else:
documents_processed, error_or_warning = result
documents_skipped = None
# Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func:
# Update notification to storing stage
@ -1216,6 +1224,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1242,6 +1251,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1283,6 +1293,7 @@ async def _run_indexing_with_notifications(
indexed_count=0,
error_message=notification_message, # Pass as warning, not error
is_warning=True, # Flag to indicate this is a warning, not an error
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1298,6 +1309,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=error_or_warning,
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1319,6 +1331,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=None, # No error - sync succeeded
skipped_count=documents_skipped,
)
await (
session.commit()
@ -1336,6 +1349,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=str(e),
skipped_count=None, # Unknown on exception
)
except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}")

View file

@ -336,6 +336,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
indexed_count: int,
error_message: str | None = None,
is_warning: bool = False,
skipped_count: int | None = None,
) -> Notification:
"""
Update notification when connector indexing completes.
@ -346,6 +347,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
indexed_count: Total number of items indexed
error_message: Error message if indexing failed, or warning message (optional)
is_warning: If True, treat error_message as a warning (success case) rather than an error
skipped_count: Number of items skipped (e.g., duplicates) - optional
Returns:
Updated notification
@ -354,6 +356,14 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
"connector_name", "Connector"
)
# Build the skipped text if there are skipped items
skipped_text = ""
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
skipped_text = (
f" ({skipped_count} {skipped_item_text} skipped - already indexed)"
)
# If there's an error message but items were indexed, treat it as a warning (partial success)
# If is_warning is True, treat it as success even with 0 items (e.g., duplicates found)
# Otherwise, treat it as a failure
@ -362,12 +372,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
# Partial success with warnings (e.g., duplicate content from other connectors)
title = f"Ready: {connector_name}"
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}"
message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}"
status = "completed"
elif is_warning:
# Warning case (e.g., duplicates found) - treat as success
title = f"Ready: {connector_name}"
message = f"Sync completed. {error_message}"
message = f"Sync completed{skipped_text}. {error_message}"
status = "completed"
else:
# Complete failure
@ -377,14 +387,21 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
else:
title = f"Ready: {connector_name}"
if indexed_count == 0:
message = "Already up to date! No new items to sync."
if skipped_count and skipped_count > 0:
skipped_item_text = "item" if skipped_count == 1 else "items"
message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)."
else:
message = "Already up to date! No new items to sync."
else:
item_text = "item" if indexed_count == 1 else "items"
message = f"Now searchable! {indexed_count} {item_text} synced."
message = (
f"Now searchable! {indexed_count} {item_text} synced{skipped_text}."
)
status = "completed"
metadata_updates = {
"indexed_count": indexed_count,
"skipped_count": skipped_count or 0,
"sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",

View file

@ -86,7 +86,7 @@ async def index_composio_connector(
end_date: str | None = None,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
) -> tuple[int, int, str | None]:
"""
Index content from a Composio connector.
@ -104,7 +104,7 @@ async def index_composio_connector(
max_items: Maximum number of items to fetch
Returns:
Tuple of (number_of_indexed_items, error_message or None)
Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
@ -132,14 +132,14 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "InvalidConnectorType"}
)
return 0, error_msg
return 0, 0, error_msg
if not connector:
error_msg = f"Composio connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
return 0, 0, error_msg
# Get toolkit ID from config
toolkit_id = connector.config.get("toolkit_id")
@ -150,7 +150,7 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingToolkitId"}
)
return 0, error_msg
return 0, 0, error_msg
# Check if toolkit is indexable
if toolkit_id not in INDEXABLE_TOOLKITS:
@ -158,7 +158,7 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ToolkitNotIndexable"}
)
return 0, error_msg
return 0, 0, error_msg
# Get indexer function from registry
try:
@ -167,7 +167,7 @@ async def index_composio_connector(
await task_logger.log_task_failure(
log_entry, str(e), {"error_type": "NoIndexerImplemented"}
)
return 0, str(e)
return 0, 0, str(e)
# Build kwargs for the indexer function
kwargs = {
@ -199,7 +199,7 @@ async def index_composio_connector(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -209,4 +209,4 @@ async def index_composio_connector(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True)
return 0, f"Failed to index Composio connector: {e!s}"
return 0, 0, f"Failed to index Composio connector: {e!s}"

View file

@ -24,11 +24,6 @@
"enabled": true,
"status": "warning",
"statusMessage": "Some requests may be blocked if not using Firecrawl."
},
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR": {
"enabled": false,
"status": "disabled",
"statusMessage": "Not available yet."
}
},
"globalSettings": {