From aab547264eeaded07f536fffbad25caa621787e4 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 28 Jan 2026 09:09:58 +0530 Subject: [PATCH] feat(connector): implement duplicate detection by Google Drive file ID and generate settings hash for indexing configuration changes --- .../composio_google_drive_connector.py | 125 +++++++++++++++++- 1 file changed, 120 insertions(+), 5 deletions(-) diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 9a1937d6b..912f63d54 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -4,6 +4,8 @@ Composio Google Drive Connector Module. Provides Google Drive specific methods for data retrieval and indexing via Composio. """ +import hashlib +import json import logging import os import tempfile @@ -480,6 +482,38 @@ async def check_document_by_content_hash( return existing_doc_result.scalars().first() +async def check_document_by_google_drive_file_id( + session: AsyncSession, file_id: str, search_space_id: int +) -> Document | None: + """Check if a document with this Google Drive file ID exists (from any connector). + + This checks both metadata key formats: + - 'google_drive_file_id' (normal Google Drive connector) + - 'file_id' (Composio Google Drive connector) + + This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls. + """ + from sqlalchemy import String, cast, or_ + from sqlalchemy.future import select + + # When casting JSON to String, the result includes quotes: "value" instead of value + # So we need to compare with the quoted version + quoted_file_id = f'"{file_id}"' + + existing_doc_result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + or_( + # Normal Google Drive connector format + cast(Document.document_metadata["google_drive_file_id"], String) == quoted_file_id, + # Composio Google Drive connector format + cast(Document.document_metadata["file_id"], String) == quoted_file_id, + ) + ) + ) + return existing_doc_result.scalars().first() + + async def update_connector_last_indexed( session: AsyncSession, connector, @@ -493,6 +527,33 @@ async def update_connector_last_indexed( logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") +def generate_indexing_settings_hash( + selected_folders: list[dict], + selected_files: list[dict], + indexing_options: dict, +) -> str: + """Generate a hash of indexing settings to detect configuration changes. + + This hash is used to determine if indexing settings have changed since + the last index, which would require a full re-scan instead of delta sync. + + Args: + selected_folders: List of {id, name} for folders to index + selected_files: List of {id, name} for individual files to index + indexing_options: Dict with max_files_per_folder, include_subfolders, etc. + + Returns: + MD5 hash string of the settings + """ + settings = { + "folders": sorted([f.get("id", "") for f in selected_folders]), + "files": sorted([f.get("id", "") for f in selected_files]), + "include_subfolders": indexing_options.get("include_subfolders", True), + "max_files_per_folder": indexing_options.get("max_files_per_folder", 100), + } + return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest() + + async def index_composio_google_drive( session: AsyncSession, connector, @@ -512,6 +573,7 @@ async def index_composio_google_drive( Delta Sync Flow: 1. First sync: Full scan + get initial page token 2. Subsequent syncs: Use LIST_CHANGES to process only changed files + (unless settings changed or incremental_sync is disabled) Supports folder/file selection via connector config: - selected_folders: List of {id, name} for folders to index @@ -527,12 +589,42 @@ async def index_composio_google_drive( selected_files = connector_config.get("selected_files", []) indexing_options = connector_config.get("indexing_options", {}) - # Check for stored page token for delta sync - stored_page_token = connector_config.get("drive_page_token") - use_delta_sync = stored_page_token and connector.last_indexed_at - max_files_per_folder = indexing_options.get("max_files_per_folder", 100) include_subfolders = indexing_options.get("include_subfolders", True) + incremental_sync = indexing_options.get("incremental_sync", True) + + # Generate current settings hash to detect configuration changes + current_settings_hash = generate_indexing_settings_hash( + selected_folders, selected_files, indexing_options + ) + last_settings_hash = connector_config.get("last_indexed_settings_hash") + + # Detect if settings changed since last index + settings_changed = ( + last_settings_hash is not None and + current_settings_hash != last_settings_hash + ) + + if settings_changed: + logger.info( + f"Indexing settings changed for connector {connector_id}. " + f"Will perform full re-scan to apply new configuration." + ) + + # Check for stored page token for delta sync + stored_page_token = connector_config.get("drive_page_token") + + # Determine whether to use delta sync: + # - Must have a stored page token + # - Must have been indexed before (last_indexed_at exists) + # - User must have incremental_sync enabled + # - Settings must not have changed (folder/subfolder config) + use_delta_sync = ( + incremental_sync and + stored_page_token and + connector.last_indexed_at and + not settings_changed + ) # Route to delta sync or full scan if use_delta_sync: @@ -607,6 +699,14 @@ async def index_composio_google_drive( elif token_error: logger.warning(f"Failed to get new page token: {token_error}") + # Save current settings hash for future change detection + # This allows detecting when folder/subfolder settings change + if not connector.config: + connector.config = {} + connector.config["last_indexed_settings_hash"] = current_settings_hash + flag_modified(connector, "config") + logger.info(f"Saved indexing settings hash for connector {connector_id}") + # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) @@ -972,13 +1072,28 @@ async def _process_single_drive_file( """ processing_errors = [] + # ========== EARLY DUPLICATE CHECK BY FILE ID ========== + # Check if this Google Drive file was already indexed by ANY connector + # This happens BEFORE download/ETL to save expensive API calls + existing_by_file_id = await check_document_by_google_drive_file_id( + session, file_id, search_space_id + ) + if existing_by_file_id: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): already indexed " + f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' " + f"(saved download & ETL cost)" + ) + return 0, 1, processing_errors # Skip - NO download, NO ETL! + # ====================================================== + # Generate unique identifier hash document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) unique_identifier_hash = generate_unique_identifier_hash( document_type, f"drive_{file_id}", search_space_id ) - # Check if document exists + # Check if document exists by unique identifier (same connector, same file) existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash )