feat(connector): implement duplicate detection by Google Drive file ID and generate settings hash for indexing configuration changes

This commit is contained in:
Anish Sarkar 2026-01-28 09:09:58 +05:30
parent 3af4fd0533
commit aab547264e

View file

@ -4,6 +4,8 @@ Composio Google Drive Connector Module.
Provides Google Drive specific methods for data retrieval and indexing via Composio. Provides Google Drive specific methods for data retrieval and indexing via Composio.
""" """
import hashlib
import json
import logging import logging
import os import os
import tempfile import tempfile
@ -480,6 +482,38 @@ async def check_document_by_content_hash(
return existing_doc_result.scalars().first() return existing_doc_result.scalars().first()
async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
"""Check if a document with this Google Drive file ID exists (from any connector).
This checks both metadata key formats:
- 'google_drive_file_id' (normal Google Drive connector)
- 'file_id' (Composio Google Drive connector)
This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls.
"""
from sqlalchemy import String, cast, or_
from sqlalchemy.future import select
# When casting JSON to String, the result includes quotes: "value" instead of value
# So we need to compare with the quoted version
quoted_file_id = f'"{file_id}"'
existing_doc_result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
or_(
# Normal Google Drive connector format
cast(Document.document_metadata["google_drive_file_id"], String) == quoted_file_id,
# Composio Google Drive connector format
cast(Document.document_metadata["file_id"], String) == quoted_file_id,
)
)
)
return existing_doc_result.scalars().first()
async def update_connector_last_indexed( async def update_connector_last_indexed(
session: AsyncSession, session: AsyncSession,
connector, connector,
@ -493,6 +527,33 @@ async def update_connector_last_indexed(
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
def generate_indexing_settings_hash(
selected_folders: list[dict],
selected_files: list[dict],
indexing_options: dict,
) -> str:
"""Generate a hash of indexing settings to detect configuration changes.
This hash is used to determine if indexing settings have changed since
the last index, which would require a full re-scan instead of delta sync.
Args:
selected_folders: List of {id, name} for folders to index
selected_files: List of {id, name} for individual files to index
indexing_options: Dict with max_files_per_folder, include_subfolders, etc.
Returns:
MD5 hash string of the settings
"""
settings = {
"folders": sorted([f.get("id", "") for f in selected_folders]),
"files": sorted([f.get("id", "") for f in selected_files]),
"include_subfolders": indexing_options.get("include_subfolders", True),
"max_files_per_folder": indexing_options.get("max_files_per_folder", 100),
}
return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest()
async def index_composio_google_drive( async def index_composio_google_drive(
session: AsyncSession, session: AsyncSession,
connector, connector,
@ -512,6 +573,7 @@ async def index_composio_google_drive(
Delta Sync Flow: Delta Sync Flow:
1. First sync: Full scan + get initial page token 1. First sync: Full scan + get initial page token
2. Subsequent syncs: Use LIST_CHANGES to process only changed files 2. Subsequent syncs: Use LIST_CHANGES to process only changed files
(unless settings changed or incremental_sync is disabled)
Supports folder/file selection via connector config: Supports folder/file selection via connector config:
- selected_folders: List of {id, name} for folders to index - selected_folders: List of {id, name} for folders to index
@ -527,12 +589,42 @@ async def index_composio_google_drive(
selected_files = connector_config.get("selected_files", []) selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get("indexing_options", {}) indexing_options = connector_config.get("indexing_options", {})
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
use_delta_sync = stored_page_token and connector.last_indexed_at
max_files_per_folder = indexing_options.get("max_files_per_folder", 100) max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True) include_subfolders = indexing_options.get("include_subfolders", True)
incremental_sync = indexing_options.get("incremental_sync", True)
# Generate current settings hash to detect configuration changes
current_settings_hash = generate_indexing_settings_hash(
selected_folders, selected_files, indexing_options
)
last_settings_hash = connector_config.get("last_indexed_settings_hash")
# Detect if settings changed since last index
settings_changed = (
last_settings_hash is not None and
current_settings_hash != last_settings_hash
)
if settings_changed:
logger.info(
f"Indexing settings changed for connector {connector_id}. "
f"Will perform full re-scan to apply new configuration."
)
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
# Determine whether to use delta sync:
# - Must have a stored page token
# - Must have been indexed before (last_indexed_at exists)
# - User must have incremental_sync enabled
# - Settings must not have changed (folder/subfolder config)
use_delta_sync = (
incremental_sync and
stored_page_token and
connector.last_indexed_at and
not settings_changed
)
# Route to delta sync or full scan # Route to delta sync or full scan
if use_delta_sync: if use_delta_sync:
@ -607,6 +699,14 @@ async def index_composio_google_drive(
elif token_error: elif token_error:
logger.warning(f"Failed to get new page token: {token_error}") logger.warning(f"Failed to get new page token: {token_error}")
# Save current settings hash for future change detection
# This allows detecting when folder/subfolder settings change
if not connector.config:
connector.config = {}
connector.config["last_indexed_settings_hash"] = current_settings_hash
flag_modified(connector, "config")
logger.info(f"Saved indexing settings hash for connector {connector_id}")
# CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed) await update_connector_last_indexed(session, connector, update_last_indexed)
@ -972,13 +1072,28 @@ async def _process_single_drive_file(
""" """
processing_errors = [] processing_errors = []
# ========== EARLY DUPLICATE CHECK BY FILE ID ==========
# Check if this Google Drive file was already indexed by ANY connector
# This happens BEFORE download/ETL to save expensive API calls
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
if existing_by_file_id:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' "
f"(saved download & ETL cost)"
)
return 0, 1, processing_errors # Skip - NO download, NO ETL!
# ======================================================
# Generate unique identifier hash # Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash( unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id document_type, f"drive_{file_id}", search_space_id
) )
# Check if document exists # Check if document exists by unique identifier (same connector, same file)
existing_document = await check_document_by_unique_identifier( existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash session, unique_identifier_hash
) )