diff --git a/surfsense_backend/app/connectors/google_drive/change_tracker.py b/surfsense_backend/app/connectors/google_drive/change_tracker.py index dee828219..3b7f804ac 100644 --- a/surfsense_backend/app/connectors/google_drive/change_tracker.py +++ b/surfsense_backend/app/connectors/google_drive/change_tracker.py @@ -58,7 +58,7 @@ async def get_changes( params = { "pageToken": page_token, "pageSize": 100, - "fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, size, webViewLink, parents, trashed))", + "fields": "nextPageToken, newStartPageToken, changes(fileId, removed, file(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, trashed))", "supportsAllDrives": True, "includeItemsFromAllDrives": True, } diff --git a/surfsense_backend/app/connectors/google_drive/client.py b/surfsense_backend/app/connectors/google_drive/client.py index aec5704b8..a001e42be 100644 --- a/surfsense_backend/app/connectors/google_drive/client.py +++ b/surfsense_backend/app/connectors/google_drive/client.py @@ -47,7 +47,7 @@ class GoogleDriveClient: async def list_files( self, query: str = "", - fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink, parents, owners, createdTime, description)", + fields: str = "nextPageToken, files(id, name, mimeType, modifiedTime, md5Checksum, size, webViewLink, parents, owners, createdTime, description)", page_size: int = 100, page_token: str | None = None, ) -> tuple[list[dict[str, Any]], str | None, str | None]: diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 28c14a757..61b427970 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -102,6 +102,8 @@ async def download_and_process_file( connector_info["metadata"]["file_size"] = file["size"] if "webViewLink" in file: connector_info["metadata"]["web_view_link"] = file["webViewLink"] + if "md5Checksum" in file: + connector_info["metadata"]["md5_checksum"] = file["md5Checksum"] if is_google_workspace_file(mime_type): connector_info["metadata"]["exported_as"] = "pdf" diff --git a/surfsense_backend/app/connectors/google_drive/folder_manager.py b/surfsense_backend/app/connectors/google_drive/folder_manager.py index e28505f11..c0535ee58 100644 --- a/surfsense_backend/app/connectors/google_drive/folder_manager.py +++ b/surfsense_backend/app/connectors/google_drive/folder_manager.py @@ -157,7 +157,7 @@ async def get_file_by_id( try: file, error = await client.get_file_metadata( file_id, - fields="id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink", + fields="id, name, mimeType, parents, createdTime, modifiedTime, md5Checksum, size, webViewLink, iconLink", ) if error: @@ -228,7 +228,7 @@ async def list_folder_contents( while True: items, next_token, error = await client.list_files( query=query, - fields="files(id, name, mimeType, parents, createdTime, modifiedTime, size, webViewLink, iconLink)", + fields="files(id, name, mimeType, parents, createdTime, modifiedTime, md5Checksum, size, webViewLink, iconLink)", page_size=1000, # Max allowed by Google Drive API page_token=page_token, ) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 6201452a9..f070f650b 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1716,7 +1716,7 @@ async def run_google_drive_indexing( connector_id: int, search_space_id: int, user_id: str, - items_dict: dict, # Dictionary with 'folders' and 'files' lists + items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options' ): """Runs the Google Drive indexing task for folders and files with notifications.""" from uuid import UUID @@ -1730,6 +1730,7 @@ async def run_google_drive_indexing( # Parse the structured data items = GoogleDriveIndexRequest(**items_dict) + indexing_options = items.indexing_options total_indexed = 0 errors = [] @@ -1765,7 +1766,7 @@ async def run_google_drive_indexing( stage="fetching", ) - # Index each folder + # Index each folder with indexing options for folder in items.folders: try: indexed_count, error_message = await index_google_drive_files( @@ -1775,8 +1776,10 @@ async def run_google_drive_indexing( user_id, folder_id=folder.id, folder_name=folder.name, - use_delta_sync=True, + use_delta_sync=indexing_options.incremental_sync, update_last_indexed=False, + max_files=indexing_options.max_files_per_folder, + include_subfolders=indexing_options.include_subfolders, ) if error_message: errors.append(f"Folder '{folder.name}': {error_message}") @@ -1837,6 +1840,8 @@ async def run_google_drive_indexing( # Update notification on completion if notification: + # Refresh notification to reload attributes that may have been expired by earlier commits + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, diff --git a/surfsense_backend/app/schemas/__init__.py b/surfsense_backend/app/schemas/__init__.py index 076ac5915..017c78577 100644 --- a/surfsense_backend/app/schemas/__init__.py +++ b/surfsense_backend/app/schemas/__init__.py @@ -10,7 +10,7 @@ from .documents import ( ExtensionDocumentMetadata, PaginatedResponse, ) -from .google_drive import DriveItem, GoogleDriveIndexRequest +from .google_drive import DriveItem, GoogleDriveIndexingOptions, GoogleDriveIndexRequest from .logs import LogBase, LogCreate, LogFilter, LogRead, LogUpdate from .new_chat import ( ChatMessage, @@ -94,6 +94,7 @@ __all__ = [ "ExtensionDocumentMetadata", "GlobalNewLLMConfigRead", "GoogleDriveIndexRequest", + "GoogleDriveIndexingOptions", # Base schemas "IDModel", # RBAC schemas diff --git a/surfsense_backend/app/schemas/google_drive.py b/surfsense_backend/app/schemas/google_drive.py index 3f57b92ca..04d4c33dc 100644 --- a/surfsense_backend/app/schemas/google_drive.py +++ b/surfsense_backend/app/schemas/google_drive.py @@ -10,6 +10,25 @@ class DriveItem(BaseModel): name: str = Field(..., description="Item display name") +class GoogleDriveIndexingOptions(BaseModel): + """Indexing options for Google Drive connector.""" + + max_files_per_folder: int = Field( + default=100, + ge=1, + le=1000, + description="Maximum number of files to index from each folder (1-1000)", + ) + incremental_sync: bool = Field( + default=True, + description="Only sync changes since last index (faster). Disable for a full re-index.", + ) + include_subfolders: bool = Field( + default=True, + description="Recursively index files in subfolders of selected folders", + ) + + class GoogleDriveIndexRequest(BaseModel): """Request body for indexing Google Drive content.""" @@ -19,6 +38,10 @@ class GoogleDriveIndexRequest(BaseModel): files: list[DriveItem] = Field( default_factory=list, description="List of specific files to index" ) + indexing_options: GoogleDriveIndexingOptions = Field( + default_factory=GoogleDriveIndexingOptions, + description="Indexing configuration options", + ) def has_items(self) -> bool: """Check if any items are selected.""" diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index a88ac32c0..78fa595de 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -461,7 +461,7 @@ def index_google_drive_files_task( connector_id: int, search_space_id: int, user_id: str, - items_dict: dict, # Dictionary with 'folders' and 'files' lists + items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options' ): """Celery task to index Google Drive folders and files.""" import asyncio @@ -486,7 +486,7 @@ async def _index_google_drive_files( connector_id: int, search_space_id: int, user_id: str, - items_dict: dict, # Dictionary with 'folders' and 'files' lists + items_dict: dict, # Dictionary with 'folders', 'files', and 'indexing_options' ): """Index Google Drive folders and files with new session.""" from app.routes.search_source_connectors_routes import ( diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index dbc326406..21855f73f 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -72,6 +72,7 @@ async def _check_and_trigger_schedules(): index_elasticsearch_documents_task, index_github_repos_task, index_google_calendar_events_task, + index_google_drive_files_task, index_google_gmail_messages_task, index_jira_issues_task, index_linear_issues_task, @@ -96,6 +97,7 @@ async def _check_and_trigger_schedules(): SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task, + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task, } # Trigger indexing for each due connector @@ -106,13 +108,57 @@ async def _check_and_trigger_schedules(): f"Triggering periodic indexing for connector {connector.id} " f"({connector.connector_type.value})" ) - task.delay( - connector.id, - connector.search_space_id, - str(connector.user_id), - None, # start_date - uses last_indexed_at - None, # end_date - uses now - ) + + # Special handling for Google Drive - uses config for folder/file selection + if ( + connector.connector_type + == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR + ): + connector_config = connector.config or {} + selected_folders = connector_config.get("selected_folders", []) + selected_files = connector_config.get("selected_files", []) + indexing_options = connector_config.get( + "indexing_options", + { + "max_files_per_folder": 100, + "incremental_sync": True, + "include_subfolders": True, + }, + ) + + if selected_folders or selected_files: + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + { + "folders": selected_folders, + "files": selected_files, + "indexing_options": indexing_options, + }, + ) + else: + # No folders/files selected - skip indexing but still update next_scheduled_at + # to prevent checking every minute + logger.info( + f"Google Drive connector {connector.id} has no folders or files selected, " + "skipping periodic indexing (will check again at next scheduled time)" + ) + from datetime import timedelta + + connector.next_scheduled_at = now + timedelta( + minutes=connector.indexing_frequency_minutes + ) + await session.commit() + continue + else: + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + None, # start_date - uses last_indexed_at + None, # end_date - uses now + ) # Update next_scheduled_at for next run from datetime import timedelta diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 9eeb46fc8..48282a1af 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -37,6 +37,7 @@ async def index_google_drive_files( use_delta_sync: bool = True, update_last_indexed: bool = True, max_files: int = 500, + include_subfolders: bool = False, ) -> tuple[int, str | None]: """ Index Google Drive files for a specific connector. @@ -51,6 +52,7 @@ async def index_google_drive_files( use_delta_sync: Whether to use change tracking for incremental sync update_last_indexed: Whether to update last_indexed_at timestamp max_files: Maximum number of files to index + include_subfolders: Whether to recursively index files in subfolders Returns: Tuple of (number_of_indexed_files, error_message) @@ -144,6 +146,7 @@ async def index_google_drive_files( task_logger=task_logger, log_entry=log_entry, max_files=max_files, + include_subfolders=include_subfolders, ) else: logger.info(f"Using full scan for connector {connector_id}") @@ -159,6 +162,7 @@ async def index_google_drive_files( task_logger=task_logger, log_entry=log_entry, max_files=max_files, + include_subfolders=include_subfolders, ) documents_indexed, documents_skipped = result @@ -168,6 +172,9 @@ async def index_google_drive_files( if new_token and not token_error: from sqlalchemy.orm.attributes import flag_modified + # Refresh connector to reload attributes that may have been expired by earlier commits + await session.refresh(connector) + if "folder_tokens" not in connector.config: connector.config["folder_tokens"] = {} connector.config["folder_tokens"][target_folder_id] = new_token @@ -375,60 +382,89 @@ async def _index_full_scan( task_logger: TaskLoggingService, log_entry: any, max_files: int, + include_subfolders: bool = False, ) -> tuple[int, int]: """Perform full scan indexing of a folder.""" await task_logger.log_task_progress( log_entry, - f"Starting full scan of folder: {folder_name}", - {"stage": "full_scan", "folder_id": folder_id}, + f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})", + { + "stage": "full_scan", + "folder_id": folder_id, + "include_subfolders": include_subfolders, + }, ) documents_indexed = 0 documents_skipped = 0 - page_token = None files_processed = 0 - while files_processed < max_files: - files, next_token, error = await get_files_in_folder( - drive_client, folder_id, include_subfolders=False, page_token=page_token - ) + # Queue of folders to process: (folder_id, folder_name) + folders_to_process = [(folder_id, folder_name)] - if error: - logger.error(f"Error listing files: {error}") - break + while folders_to_process and files_processed < max_files: + current_folder_id, current_folder_name = folders_to_process.pop(0) + logger.info(f"Processing folder: {current_folder_name} ({current_folder_id})") + page_token = None - if not files: - break - - for file in files: - if files_processed >= max_files: - break - - files_processed += 1 - - indexed, skipped = await _process_single_file( - drive_client=drive_client, - session=session, - file=file, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, + while files_processed < max_files: + # Get files and folders in current folder + # include_subfolders=True here so we get folder items to queue them + files, next_token, error = await get_files_in_folder( + drive_client, + current_folder_id, + include_subfolders=True, + page_token=page_token, ) - documents_indexed += indexed - documents_skipped += skipped + if error: + logger.error(f"Error listing files in {current_folder_name}: {error}") + break - if documents_indexed % 10 == 0 and documents_indexed > 0: - await session.commit() - logger.info( - f"Committed batch: {documents_indexed} files indexed so far" + if not files: + break + + for file in files: + if files_processed >= max_files: + break + + mime_type = file.get("mimeType", "") + + # If this is a folder and include_subfolders is enabled, queue it for processing + if mime_type == "application/vnd.google-apps.folder": + if include_subfolders: + folders_to_process.append( + (file["id"], file.get("name", "Unknown")) + ) + logger.debug(f"Queued subfolder: {file.get('name', 'Unknown')}") + continue + + # Process the file + files_processed += 1 + + indexed, skipped = await _process_single_file( + drive_client=drive_client, + session=session, + file=file, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, ) - page_token = next_token - if not page_token: - break + documents_indexed += indexed + documents_skipped += skipped + + if documents_indexed % 10 == 0 and documents_indexed > 0: + await session.commit() + logger.info( + f"Committed batch: {documents_indexed} files indexed so far" + ) + + page_token = next_token + if not page_token: + break logger.info( f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped" @@ -448,8 +484,13 @@ async def _index_with_delta_sync( task_logger: TaskLoggingService, log_entry: any, max_files: int, + include_subfolders: bool = False, ) -> tuple[int, int]: - """Perform delta sync indexing using change tracking.""" + """Perform delta sync indexing using change tracking. + + Note: include_subfolders is accepted for API consistency but delta sync + automatically tracks changes across all folders including subfolders. + """ await task_logger.log_task_progress( log_entry, f"Starting delta sync from token: {start_page_token[:20]}...", @@ -515,6 +556,131 @@ async def _index_with_delta_sync( return documents_indexed, documents_skipped +async def _check_rename_only_update( + session: AsyncSession, + file: dict, + search_space_id: int, +) -> tuple[bool, str | None]: + """ + Check if a file only needs a rename update (no content change). + + Uses md5Checksum comparison (preferred) or modifiedTime (fallback for Google Workspace files) + to detect if content has changed. This optimization prevents unnecessary ETL API calls + (Docling/LlamaCloud) for rename-only operations. + + Args: + session: Database session + file: File metadata from Google Drive API + search_space_id: ID of the search space + + Returns: + Tuple of (is_rename_only, message) + - (True, message): Only filename changed, document was updated + - (False, None): Content changed or new file, needs full processing + """ + from sqlalchemy import select + from sqlalchemy.orm.attributes import flag_modified + + from app.db import Document + + file_id = file.get("id") + file_name = file.get("name", "Unknown") + incoming_md5 = file.get("md5Checksum") # None for Google Workspace files + incoming_modified_time = file.get("modifiedTime") + + if not file_id: + return False, None + + # Try to find existing document by file_id-based hash (primary method) + primary_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + existing_document = await check_document_by_unique_identifier(session, primary_hash) + + # If not found by primary hash, try searching by metadata (for legacy documents) + if not existing_document: + result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, + Document.document_metadata["google_drive_file_id"].astext == file_id, + ) + ) + existing_document = result.scalar_one_or_none() + if existing_document: + logger.debug(f"Found legacy document by metadata for file_id: {file_id}") + + if not existing_document: + # New file, needs full processing + return False, None + + # Get stored checksums/timestamps from document metadata + doc_metadata = existing_document.document_metadata or {} + stored_md5 = doc_metadata.get("md5_checksum") + stored_modified_time = doc_metadata.get("modified_time") + + # Determine if content changed using md5Checksum (preferred) or modifiedTime (fallback) + content_unchanged = False + + if incoming_md5 and stored_md5: + # Best case: Compare md5 checksums (only changes when content changes, not on rename) + content_unchanged = incoming_md5 == stored_md5 + logger.debug(f"MD5 comparison for {file_name}: unchanged={content_unchanged}") + elif incoming_md5 and not stored_md5: + # Have incoming md5 but no stored md5 (legacy doc) - need to reprocess to store it + logger.debug( + f"No stored md5 for {file_name}, will reprocess to store md5_checksum" + ) + return False, None + elif not incoming_md5: + # Google Workspace file (no md5Checksum available) - fall back to modifiedTime + # Note: modifiedTime is less reliable as it changes on rename too, but it's the best we have + if incoming_modified_time and stored_modified_time: + content_unchanged = incoming_modified_time == stored_modified_time + logger.debug( + f"ModifiedTime fallback for Google Workspace file {file_name}: unchanged={content_unchanged}" + ) + else: + # No stored modifiedTime (legacy) - reprocess to store it + return False, None + + if content_unchanged: + # Content hasn't changed - check if filename changed + old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get( + "google_drive_file_name" + ) + + if old_name and old_name != file_name: + # Rename-only update - update the document without re-processing + existing_document.title = file_name + if not existing_document.document_metadata: + existing_document.document_metadata = {} + existing_document.document_metadata["FILE_NAME"] = file_name + existing_document.document_metadata["google_drive_file_name"] = file_name + # Also update modified_time for Google Workspace files (since it changed on rename) + if incoming_modified_time: + existing_document.document_metadata["modified_time"] = ( + incoming_modified_time + ) + flag_modified(existing_document, "document_metadata") + await session.commit() + + logger.info( + f"Rename-only update: '{old_name}' → '{file_name}' (skipped ETL)" + ) + return ( + True, + f"File renamed: '{old_name}' → '{file_name}' (no content change)", + ) + else: + # Neither content nor name changed + logger.debug(f"File unchanged: {file_name}") + return True, "File unchanged (same content and name)" + + # Content changed - needs full processing + return False, None + + async def _process_single_file( drive_client: GoogleDriveClient, session: AsyncSession, @@ -537,6 +703,27 @@ async def _process_single_file( try: logger.info(f"Processing file: {file_name} ({mime_type})") + # Early check: Is this a rename-only update? + # This optimization prevents downloading and ETL processing for files + # where only the name changed but content is the same. + is_rename_only, rename_message = await _check_rename_only_update( + session=session, + file=file, + search_space_id=search_space_id, + ) + + if is_rename_only: + await task_logger.log_task_progress( + log_entry, + f"Skipped ETL for {file_name}: {rename_message}", + {"status": "rename_only", "reason": rename_message}, + ) + # Return 1 for renamed files (they are "indexed" in the sense that they're updated) + # Return 0 for unchanged files + if "renamed" in (rename_message or "").lower(): + return 1, 0 + return 0, 1 + _, error, _ = await download_and_process_file( client=drive_client, file=file, @@ -564,7 +751,15 @@ async def _process_single_file( async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): - """Remove a document that was deleted in Drive.""" + """Remove a document that was deleted in Drive. + + Handles both new (file_id-based) and legacy (filename-based) hash schemes. + """ + from sqlalchemy import select + + from app.db import Document + + # First try with file_id-based hash (new method) unique_identifier_hash = generate_unique_identifier_hash( DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id ) @@ -573,6 +768,19 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id: session, unique_identifier_hash ) + # If not found, search by metadata (for legacy documents with filename-based hash) + if not existing_document: + result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, + Document.document_metadata["google_drive_file_id"].astext == file_id, + ) + ) + existing_document = result.scalar_one_or_none() + if existing_document: + logger.info(f"Found legacy document by metadata for file_id: {file_id}") + if existing_document: await session.delete(existing_document) logger.info(f"Removed deleted file document: {file_id}") diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 7b823112e..0a22c20c2 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -31,6 +31,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document, get_current_timestamp, ) from .markdown_processor import add_received_markdown_file_document @@ -49,6 +50,160 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( ) +def get_google_drive_unique_identifier( + connector: dict | None, + filename: str, + search_space_id: int, +) -> tuple[str, str | None]: + """ + Get unique identifier hash for a file, with special handling for Google Drive. + + For Google Drive files, uses file_id as the unique identifier (doesn't change on rename). + For other files, uses filename. + + Args: + connector: Optional connector info dict with type and metadata + filename: The filename (used for non-Google Drive files or as fallback) + search_space_id: The search space ID + + Returns: + Tuple of (primary_hash, legacy_hash or None) + - For Google Drive: (file_id_based_hash, filename_based_hash for migration) + - For other sources: (filename_based_hash, None) + """ + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + metadata = connector.get("metadata", {}) + file_id = metadata.get("google_drive_file_id") + + if file_id: + # New method: use file_id as unique identifier (doesn't change on rename) + primary_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + # Legacy method: for backward compatibility with existing documents + # that were indexed with filename-based hash + legacy_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id + ) + return primary_hash, legacy_hash + + # For non-Google Drive files, use filename as before + primary_hash = generate_unique_identifier_hash( + DocumentType.FILE, filename, search_space_id + ) + return primary_hash, None + + +async def handle_existing_document_update( + session: AsyncSession, + existing_document: Document, + content_hash: str, + connector: dict | None, + filename: str, + primary_hash: str, +) -> tuple[bool, Document | None]: + """ + Handle update logic for an existing document. + + Args: + session: Database session + existing_document: The existing document found in database + content_hash: Hash of the new content + connector: Optional connector info + filename: Current filename + primary_hash: The primary hash (file_id based for Google Drive) + + Returns: + Tuple of (should_skip_processing, document_to_return) + - (True, document): Content unchanged, just return existing document + - (False, None): Content changed, need to re-process + """ + # Check if this document needs hash migration (found via legacy hash) + if existing_document.unique_identifier_hash != primary_hash: + existing_document.unique_identifier_hash = primary_hash + logging.info(f"Migrated document to file_id-based identifier: {filename}") + + # Check if content has changed + if existing_document.content_hash == content_hash: + # Content unchanged - check if we need to update metadata (e.g., filename changed) + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + connector_metadata = connector.get("metadata", {}) + new_name = connector_metadata.get("google_drive_file_name") + # Check both possible keys for old name (FILE_NAME is used in stored documents) + doc_metadata = existing_document.document_metadata or {} + old_name = doc_metadata.get("FILE_NAME") or doc_metadata.get( + "google_drive_file_name" + ) + + if new_name and old_name and old_name != new_name: + # File was renamed - update title and metadata, skip expensive processing + from sqlalchemy.orm.attributes import flag_modified + + existing_document.title = new_name + if not existing_document.document_metadata: + existing_document.document_metadata = {} + existing_document.document_metadata["FILE_NAME"] = new_name + existing_document.document_metadata["google_drive_file_name"] = new_name + flag_modified(existing_document, "document_metadata") + await session.commit() + logging.info( + f"File renamed in Google Drive: '{old_name}' → '{new_name}' (no re-processing needed)" + ) + + logging.info(f"Document for file {filename} unchanged. Skipping.") + return True, existing_document + else: + # Content has changed - need to re-process + logging.info(f"Content changed for file {filename}. Updating document.") + return False, None + + +async def find_existing_document_with_migration( + session: AsyncSession, + primary_hash: str, + legacy_hash: str | None, + content_hash: str | None = None, +) -> Document | None: + """ + Find existing document, checking both new hash and legacy hash for migration, + with fallback to content_hash for cross-source deduplication. + + Args: + session: Database session + primary_hash: The primary hash (file_id based for Google Drive) + legacy_hash: The legacy hash (filename based) for migration, or None + content_hash: The content hash for fallback deduplication, or None + + Returns: + Existing document if found, None otherwise + """ + # First check with primary hash (new method) + existing_document = await check_document_by_unique_identifier(session, primary_hash) + + # If not found and we have a legacy hash, check with that (migration path) + if not existing_document and legacy_hash: + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + if existing_document: + logging.info( + "Found legacy document (filename-based hash), will migrate to file_id-based hash" + ) + + # Fallback: check by content_hash to catch duplicates from different sources + # This prevents unique constraint violations when the same content exists + # under a different unique_identifier (e.g., manual upload vs Google Drive) + if not existing_document and content_hash: + existing_document = await check_duplicate_document(session, content_hash) + if existing_document: + logging.info( + f"Found duplicate content from different source (content_hash match). " + f"Original document ID: {existing_document.id}, type: {existing_document.document_type}" + ) + + return existing_document + + async def parse_with_llamacloud_retry( file_path: str, estimated_pages: int, @@ -158,6 +313,7 @@ async def add_received_file_document_using_unstructured( unstructured_processed_elements: list[LangChainDocument], search_space_id: int, user_id: str, + connector: dict | None = None, ) -> Document | None: """ Process and store a file document using Unstructured service. @@ -168,6 +324,7 @@ async def add_received_file_document_using_unstructured( unstructured_processed_elements: Processed elements from Unstructured search_space_id: ID of the search space user_id: ID of the user + connector: Optional connector info for Google Drive files Returns: Document object if successful, None if failed @@ -177,29 +334,32 @@ async def add_received_file_document_using_unstructured( unstructured_processed_elements ) - # Generate unique identifier hash for this file - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.FILE, file_name, search_space_id + # Generate unique identifier hash (uses file_id for Google Drive, filename for others) + primary_hash, legacy_hash = get_google_drive_unique_identifier( + connector, file_name, search_space_id ) # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash + # Check if document exists (with migration support for Google Drive and content_hash fallback) + existing_document = await find_existing_document_with_migration( + session, primary_hash, legacy_hash, content_hash ) if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - logging.info(f"Document for file {file_name} unchanged. Skipping.") - return existing_document - else: - # Content has changed - update the existing document - logging.info( - f"Content changed for file {file_name}. Updating document." - ) + # Handle existing document (rename detection, content change check) + should_skip, doc = await handle_existing_document_update( + session, + existing_document, + content_hash, + connector, + file_name, + primary_hash, + ) + if should_skip: + return doc + # Content changed - continue to update # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -251,10 +411,15 @@ async def add_received_file_document_using_unstructured( document = existing_document else: # Create new document + # Determine document type based on connector + doc_type = DocumentType.FILE + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + doc_type = DocumentType.GOOGLE_DRIVE_FILE + document = Document( search_space_id=search_space_id, title=file_name, - document_type=DocumentType.FILE, + document_type=doc_type, document_metadata={ "FILE_NAME": file_name, "ETL_SERVICE": "UNSTRUCTURED", @@ -263,7 +428,7 @@ async def add_received_file_document_using_unstructured( embedding=summary_embedding, chunks=chunks, content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, + unique_identifier_hash=primary_hash, blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), @@ -288,6 +453,7 @@ async def add_received_file_document_using_llamacloud( llamacloud_markdown_document: str, search_space_id: int, user_id: str, + connector: dict | None = None, ) -> Document | None: """ Process and store document content parsed by LlamaCloud. @@ -298,6 +464,7 @@ async def add_received_file_document_using_llamacloud( llamacloud_markdown_document: Markdown content from LlamaCloud parsing search_space_id: ID of the search space user_id: ID of the user + connector: Optional connector info for Google Drive files Returns: Document object if successful, None if failed @@ -306,29 +473,32 @@ async def add_received_file_document_using_llamacloud( # Combine all markdown documents into one file_in_markdown = llamacloud_markdown_document - # Generate unique identifier hash for this file - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.FILE, file_name, search_space_id + # Generate unique identifier hash (uses file_id for Google Drive, filename for others) + primary_hash, legacy_hash = get_google_drive_unique_identifier( + connector, file_name, search_space_id ) # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash + # Check if document exists (with migration support for Google Drive and content_hash fallback) + existing_document = await find_existing_document_with_migration( + session, primary_hash, legacy_hash, content_hash ) if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - logging.info(f"Document for file {file_name} unchanged. Skipping.") - return existing_document - else: - # Content has changed - update the existing document - logging.info( - f"Content changed for file {file_name}. Updating document." - ) + # Handle existing document (rename detection, content change check) + should_skip, doc = await handle_existing_document_update( + session, + existing_document, + content_hash, + connector, + file_name, + primary_hash, + ) + if should_skip: + return doc + # Content changed - continue to update # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -380,10 +550,15 @@ async def add_received_file_document_using_llamacloud( document = existing_document else: # Create new document + # Determine document type based on connector + doc_type = DocumentType.FILE + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + doc_type = DocumentType.GOOGLE_DRIVE_FILE + document = Document( search_space_id=search_space_id, title=file_name, - document_type=DocumentType.FILE, + document_type=doc_type, document_metadata={ "FILE_NAME": file_name, "ETL_SERVICE": "LLAMACLOUD", @@ -392,7 +567,7 @@ async def add_received_file_document_using_llamacloud( embedding=summary_embedding, chunks=chunks, content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, + unique_identifier_hash=primary_hash, blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), @@ -419,6 +594,7 @@ async def add_received_file_document_using_docling( docling_markdown_document: str, search_space_id: int, user_id: str, + connector: dict | None = None, ) -> Document | None: """ Process and store document content parsed by Docling. @@ -429,6 +605,7 @@ async def add_received_file_document_using_docling( docling_markdown_document: Markdown content from Docling parsing search_space_id: ID of the search space user_id: ID of the user + connector: Optional connector info for Google Drive files Returns: Document object if successful, None if failed @@ -436,35 +613,38 @@ async def add_received_file_document_using_docling( try: file_in_markdown = docling_markdown_document - # Generate unique identifier hash for this file - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.FILE, file_name, search_space_id + # Generate unique identifier hash (uses file_id for Google Drive, filename for others) + primary_hash, legacy_hash = get_google_drive_unique_identifier( + connector, file_name, search_space_id ) # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash + # Check if document exists (with migration support for Google Drive and content_hash fallback) + existing_document = await find_existing_document_with_migration( + session, primary_hash, legacy_hash, content_hash ) if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - logging.info(f"Document for file {file_name} unchanged. Skipping.") - return existing_document - else: - # Content has changed - update the existing document - logging.info( - f"Content changed for file {file_name}. Updating document." - ) + # Handle existing document (rename detection, content change check) + should_skip, doc = await handle_existing_document_update( + session, + existing_document, + content_hash, + connector, + file_name, + primary_hash, + ) + if should_skip: + return doc + # Content changed - continue to update # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) if not user_llm: raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" + f"No long context LLM configured for user {user_id} in search_space {search_space_id}" ) # Generate summary using chunked processing for large documents @@ -534,10 +714,15 @@ async def add_received_file_document_using_docling( document = existing_document else: # Create new document + # Determine document type based on connector + doc_type = DocumentType.FILE + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + doc_type = DocumentType.GOOGLE_DRIVE_FILE + document = Document( search_space_id=search_space_id, title=file_name, - document_type=DocumentType.FILE, + document_type=doc_type, document_metadata={ "FILE_NAME": file_name, "ETL_SERVICE": "DOCLING", @@ -546,15 +731,15 @@ async def add_received_file_document_using_docling( embedding=summary_embedding, chunks=chunks, content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, + unique_identifier_hash=primary_hash, blocknote_document=blocknote_json, content_needs_reindexing=False, updated_at=get_current_timestamp(), ) - session.add(document) - await session.commit() - await session.refresh(document) + session.add(document) + await session.commit() + await session.refresh(document) return document except SQLAlchemyError as db_error: @@ -650,7 +835,7 @@ async def process_file_in_background( # Process markdown directly through specialized function result = await add_received_markdown_file_document( - session, filename, markdown_content, search_space_id, user_id + session, filename, markdown_content, search_space_id, user_id, connector ) if connector: @@ -790,7 +975,7 @@ async def process_file_in_background( # Process transcription as markdown document result = await add_received_markdown_file_document( - session, filename, transcribed_text, search_space_id, user_id + session, filename, transcribed_text, search_space_id, user_id, connector ) if connector: @@ -955,7 +1140,7 @@ async def process_file_in_background( # Pass the documents to the existing background task result = await add_received_file_document_using_unstructured( - session, filename, docs, search_space_id, user_id + session, filename, docs, search_space_id, user_id, connector ) if connector: @@ -1103,6 +1288,7 @@ async def process_file_in_background( llamacloud_markdown_document=markdown_content, search_space_id=search_space_id, user_id=user_id, + connector=connector, ) # Track if this document was successfully created @@ -1256,6 +1442,7 @@ async def process_file_in_background( docling_markdown_document=result["content"], search_space_id=search_space_id, user_id=user_id, + connector=connector, ) if doc_result: diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index e11a6efeb..3a9867fd6 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -19,16 +19,157 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document, get_current_timestamp, ) +def _get_google_drive_unique_identifier( + connector: dict | None, + filename: str, + search_space_id: int, +) -> tuple[str, str | None]: + """ + Get unique identifier hash for a file, with special handling for Google Drive. + + For Google Drive files, uses file_id as the unique identifier (doesn't change on rename). + For other files, uses filename. + + Args: + connector: Optional connector info dict with type and metadata + filename: The filename (used for non-Google Drive files or as fallback) + search_space_id: The search space ID + + Returns: + Tuple of (primary_hash, legacy_hash or None) + """ + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + metadata = connector.get("metadata", {}) + file_id = metadata.get("google_drive_file_id") + + if file_id: + primary_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id + ) + legacy_hash = generate_unique_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE, filename, search_space_id + ) + return primary_hash, legacy_hash + + primary_hash = generate_unique_identifier_hash( + DocumentType.FILE, filename, search_space_id + ) + return primary_hash, None + + +async def _find_existing_document_with_migration( + session: AsyncSession, + primary_hash: str, + legacy_hash: str | None, + content_hash: str | None = None, +) -> Document | None: + """ + Find existing document, checking both new hash and legacy hash for migration, + with fallback to content_hash for cross-source deduplication. + """ + existing_document = await check_document_by_unique_identifier(session, primary_hash) + + if not existing_document and legacy_hash: + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + if existing_document: + logging.info( + "Found legacy document (filename-based hash), will migrate to file_id-based hash" + ) + + # Fallback: check by content_hash to catch duplicates from different sources + if not existing_document and content_hash: + existing_document = await check_duplicate_document(session, content_hash) + if existing_document: + logging.info( + f"Found duplicate content from different source (content_hash match). " + f"Original document ID: {existing_document.id}, type: {existing_document.document_type}" + ) + + return existing_document + + +async def _handle_existing_document_update( + session: AsyncSession, + existing_document: Document, + content_hash: str, + connector: dict | None, + filename: str, + primary_hash: str, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[bool, Document | None]: + """ + Handle update logic for an existing document. + + Returns: + Tuple of (should_skip_processing, document_to_return) + """ + # Check if this document needs hash migration + if existing_document.unique_identifier_hash != primary_hash: + existing_document.unique_identifier_hash = primary_hash + logging.info(f"Migrated document to file_id-based identifier: {filename}") + + # Check if content has changed + if existing_document.content_hash == content_hash: + # Content unchanged - check if we need to update metadata (e.g., filename changed) + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + connector_metadata = connector.get("metadata", {}) + new_name = connector_metadata.get("google_drive_file_name") + # Check both possible keys for old name (FILE_NAME is used in stored documents) + doc_metadata = existing_document.document_metadata or {} + old_name = ( + doc_metadata.get("FILE_NAME") + or doc_metadata.get("google_drive_file_name") + or doc_metadata.get("file_name") + ) + + if new_name and old_name and old_name != new_name: + # File was renamed - update title and metadata, skip expensive processing + from sqlalchemy.orm.attributes import flag_modified + + existing_document.title = new_name + if not existing_document.document_metadata: + existing_document.document_metadata = {} + existing_document.document_metadata["FILE_NAME"] = new_name + existing_document.document_metadata["file_name"] = new_name + existing_document.document_metadata["google_drive_file_name"] = new_name + flag_modified(existing_document, "document_metadata") + await session.commit() + logging.info( + f"File renamed in Google Drive: '{old_name}' → '{new_name}' (no re-processing needed)" + ) + + await task_logger.log_task_success( + log_entry, + f"Markdown file document unchanged: {filename}", + { + "duplicate_detected": True, + "existing_document_id": existing_document.id, + }, + ) + logging.info(f"Document for markdown file {filename} unchanged. Skipping.") + return True, existing_document + else: + logging.info( + f"Content changed for markdown file {filename}. Updating document." + ) + return False, None + + async def add_received_markdown_file_document( session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int, user_id: str, + connector: dict | None = None, ) -> Document | None: """ Process and store a markdown file document. @@ -39,6 +180,7 @@ async def add_received_markdown_file_document( file_in_markdown: Content of the markdown file search_space_id: ID of the search space user_id: ID of the user + connector: Optional connector info for Google Drive files Returns: Document object if successful, None if failed @@ -58,39 +200,34 @@ async def add_received_markdown_file_document( ) try: - # Generate unique identifier hash for this markdown file - unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.FILE, file_name, search_space_id + # Generate unique identifier hash (uses file_id for Google Drive, filename for others) + primary_hash, legacy_hash = _get_google_drive_unique_identifier( + connector, file_name, search_space_id ) # Generate content hash content_hash = generate_content_hash(file_in_markdown, search_space_id) - # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash + # Check if document exists (with migration support for Google Drive and content_hash fallback) + existing_document = await _find_existing_document_with_migration( + session, primary_hash, legacy_hash, content_hash ) if existing_document: - # Document exists - check if content has changed - if existing_document.content_hash == content_hash: - await task_logger.log_task_success( - log_entry, - f"Markdown file document unchanged: {file_name}", - { - "duplicate_detected": True, - "existing_document_id": existing_document.id, - }, - ) - logging.info( - f"Document for markdown file {file_name} unchanged. Skipping." - ) - return existing_document - else: - # Content has changed - update the existing document - logging.info( - f"Content changed for markdown file {file_name}. Updating document." - ) + # Handle existing document (rename detection, content change check) + should_skip, doc = await _handle_existing_document_update( + session, + existing_document, + content_hash, + connector, + file_name, + primary_hash, + task_logger, + log_entry, + ) + if should_skip: + return doc + # Content changed - continue to update # Get user's long context LLM (needed for both create and update) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -139,10 +276,15 @@ async def add_received_markdown_file_document( document = existing_document else: # Create new document + # Determine document type based on connector + doc_type = DocumentType.FILE + if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: + doc_type = DocumentType.GOOGLE_DRIVE_FILE + document = Document( search_space_id=search_space_id, title=file_name, - document_type=DocumentType.FILE, + document_type=doc_type, document_metadata={ "FILE_NAME": file_name, }, @@ -150,7 +292,7 @@ async def add_received_markdown_file_document( embedding=summary_embedding, chunks=chunks, content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, + unique_identifier_hash=primary_hash, blocknote_document=blocknote_json, updated_at=get_current_timestamp(), ) diff --git a/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx index f390b1d1b..1d52f0182 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/components/periodic-sync-config.tsx @@ -1,6 +1,7 @@ "use client"; import type { FC } from "react"; +import { AlertCircle } from "lucide-react"; import { Label } from "@/components/ui/label"; import { Select, @@ -16,6 +17,8 @@ interface PeriodicSyncConfigProps { frequencyMinutes: string; onEnabledChange: (enabled: boolean) => void; onFrequencyChange: (frequency: string) => void; + disabled?: boolean; + disabledMessage?: string; } export const PeriodicSyncConfig: FC = ({ @@ -23,6 +26,8 @@ export const PeriodicSyncConfig: FC = ({ frequencyMinutes, onEnabledChange, onFrequencyChange, + disabled = false, + disabledMessage, }) => { return (
@@ -33,9 +38,17 @@ export const PeriodicSyncConfig: FC = ({ Automatically re-index at regular intervals

- + + {/* Show disabled message when periodic sync can't be enabled */} + {disabled && disabledMessage && ( +
+ +

{disabledMessage}

+
+ )} + {enabled && (
diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx index 18b1819c1..d0bc96872 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/google-drive-config.tsx @@ -1,11 +1,19 @@ "use client"; -import { Info } from "lucide-react"; +import { File, FileText, FileSpreadsheet, FolderClosed, Image, Presentation } from "lucide-react"; import type { FC } from "react"; import { useEffect, useState } from "react"; import { GoogleDriveFolderTree } from "@/components/connectors/google-drive-folder-tree"; -import { Alert, AlertDescription } from "@/components/ui/alert"; import { Button } from "@/components/ui/button"; +import { Label } from "@/components/ui/label"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Switch } from "@/components/ui/switch"; import type { ConnectorConfigProps } from "../index"; interface SelectedFolder { @@ -13,128 +21,292 @@ interface SelectedFolder { name: string; } +interface IndexingOptions { + max_files_per_folder: number; + incremental_sync: boolean; + include_subfolders: boolean; +} + +const DEFAULT_INDEXING_OPTIONS: IndexingOptions = { + max_files_per_folder: 100, + incremental_sync: true, + include_subfolders: true, +}; + +// Helper to get appropriate icon for file type based on file name +function getFileIconFromName(fileName: string, className: string = "size-3.5 shrink-0") { + const lowerName = fileName.toLowerCase(); + // Spreadsheets + if ( + lowerName.endsWith(".xlsx") || + lowerName.endsWith(".xls") || + lowerName.endsWith(".csv") || + lowerName.includes("spreadsheet") + ) { + return ; + } + // Presentations + if ( + lowerName.endsWith(".pptx") || + lowerName.endsWith(".ppt") || + lowerName.includes("presentation") + ) { + return ; + } + // Documents (word, text only - not PDF) + if ( + lowerName.endsWith(".docx") || + lowerName.endsWith(".doc") || + lowerName.endsWith(".txt") || + lowerName.includes("document") || + lowerName.includes("word") || + lowerName.includes("text") + ) { + return ; + } + // Images + if ( + lowerName.endsWith(".png") || + lowerName.endsWith(".jpg") || + lowerName.endsWith(".jpeg") || + lowerName.endsWith(".gif") || + lowerName.endsWith(".webp") || + lowerName.endsWith(".svg") + ) { + return ; + } + // Default (including PDF) + return ; +} + export const GoogleDriveConfig: FC = ({ connector, onConfigChange }) => { // Initialize with existing selected folders and files from connector config const existingFolders = (connector.config?.selected_folders as SelectedFolder[] | undefined) || []; const existingFiles = (connector.config?.selected_files as SelectedFolder[] | undefined) || []; + const existingIndexingOptions = + (connector.config?.indexing_options as IndexingOptions | undefined) || DEFAULT_INDEXING_OPTIONS; + const [selectedFolders, setSelectedFolders] = useState(existingFolders); const [selectedFiles, setSelectedFiles] = useState(existingFiles); const [showFolderSelector, setShowFolderSelector] = useState(false); + const [indexingOptions, setIndexingOptions] = useState(existingIndexingOptions); // Update selected folders and files when connector config changes useEffect(() => { const folders = (connector.config?.selected_folders as SelectedFolder[] | undefined) || []; const files = (connector.config?.selected_files as SelectedFolder[] | undefined) || []; + const options = + (connector.config?.indexing_options as IndexingOptions | undefined) || + DEFAULT_INDEXING_OPTIONS; setSelectedFolders(folders); setSelectedFiles(files); + setIndexingOptions(options); }, [connector.config]); - const handleSelectFolders = (folders: SelectedFolder[]) => { - setSelectedFolders(folders); + const updateConfig = ( + folders: SelectedFolder[], + files: SelectedFolder[], + options: IndexingOptions + ) => { if (onConfigChange) { - // Store folder IDs and names in config for indexing onConfigChange({ ...connector.config, selected_folders: folders, - selected_files: selectedFiles, // Preserve existing files + selected_files: files, + indexing_options: options, }); } }; + const handleSelectFolders = (folders: SelectedFolder[]) => { + setSelectedFolders(folders); + updateConfig(folders, selectedFiles, indexingOptions); + }; + const handleSelectFiles = (files: SelectedFolder[]) => { setSelectedFiles(files); - if (onConfigChange) { - // Store file IDs and names in config for indexing - onConfigChange({ - ...connector.config, - selected_folders: selectedFolders, // Preserve existing folders - selected_files: files, - }); - } + updateConfig(selectedFolders, files, indexingOptions); + }; + + const handleIndexingOptionChange = (key: keyof IndexingOptions, value: number | boolean) => { + const newOptions = { ...indexingOptions, [key]: value }; + setIndexingOptions(newOptions); + updateConfig(selectedFolders, selectedFiles, newOptions); }; const totalSelected = selectedFolders.length + selectedFiles.length; return ( -
-
-

Folder & File Selection

-

- Select specific folders and/or individual files to index. Only files directly in each - folder will be processed—subfolders must be selected separately. -

-
- - {totalSelected > 0 && ( -
-

- Selected {totalSelected} item{totalSelected > 1 ? "s" : ""}: - {selectedFolders.length > 0 && - ` ${selectedFolders.length} folder${selectedFolders.length > 1 ? "s" : ""}`} - {selectedFiles.length > 0 && - ` ${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`} +

+ {/* Folder & File Selection */} +
+
+

Folder & File Selection

+

+ Select specific folders and/or individual files to index.

-
- {selectedFolders.map((folder) => ( -

- 📁 {folder.name} -

- ))} - {selectedFiles.map((file) => ( -

- 📄 {file.name} -

- ))} -
- )} - {showFolderSelector ? ( -
- + {totalSelected > 0 && ( +
+

+ Selected {totalSelected} item{totalSelected > 1 ? "s" : ""}: {(() => { + const parts: string[] = []; + if (selectedFolders.length > 0) { + parts.push( + `${selectedFolders.length} folder${selectedFolders.length > 1 ? "s" : ""}` + ); + } + if (selectedFiles.length > 0) { + parts.push(`${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`); + } + return parts.length > 0 ? `(${parts.join(" ")})` : ""; + })()} +

+
+ {selectedFolders.map((folder) => ( +

+ + {folder.name} +

+ ))} + {selectedFiles.map((file) => ( +

+ {getFileIconFromName(file.name)} + {file.name} +

+ ))} +
+
+ )} + + {showFolderSelector ? ( +
+ + +
+ ) : ( -
- ) : ( - - )} + )} +
- - - - Folder and file selection is used when indexing. You can change this selection when you - start indexing. - - + {/* Indexing Options */} +
+
+

Indexing Options

+

+ Configure how files are indexed from your Google Drive. +

+
+ + {/* Max files per folder */} +
+
+
+ +

+ Maximum number of files to index from each folder +

+
+ +
+
+ + {/* Incremental sync toggle */} +
+
+ +

+ Only sync changes since last index (faster). Disable for a full re-index. +

+
+ handleIndexingOptionChange("incremental_sync", checked)} + /> +
+ + {/* Include subfolders toggle */} +
+
+ +

+ Recursively index files in subfolders of selected folders +

+
+ handleIndexingOptionChange("include_subfolders", checked)} + /> +
+
); }; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx index 17e24cd62..515a3a47b 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx @@ -222,15 +222,36 @@ export const ConnectorEditView: FC = ({ /> )} - {/* Periodic sync - not shown for Google Drive */} - {connector.connector_type !== "GOOGLE_DRIVE_CONNECTOR" && ( - - )} + {/* Periodic sync - shown for all indexable connectors */} + {(() => { + // Check if Google Drive has folders/files selected + const isGoogleDrive = connector.connector_type === "GOOGLE_DRIVE_CONNECTOR"; + const selectedFolders = + (connector.config?.selected_folders as + | Array<{ id: string; name: string }> + | undefined) || []; + const selectedFiles = + (connector.config?.selected_files as + | Array<{ id: string; name: string }> + | undefined) || []; + const hasItemsSelected = selectedFolders.length > 0 || selectedFiles.length > 0; + const isDisabled = isGoogleDrive && !hasItemsSelected; + + return ( + + ); + })()} )} diff --git a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts index 1344abfce..7ac0d3e0f 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts @@ -219,11 +219,9 @@ export const useConnectorDialog = () => { setEditingConnector(connector); setConnectorConfig(connector.config); setConnectorName(connector.name); - // Load existing periodic sync settings (disabled for Google Drive and non-indexable connectors) + // Load existing periodic sync settings (disabled for non-indexable connectors) setPeriodicEnabled( - connector.connector_type === "GOOGLE_DRIVE_CONNECTOR" || !connector.is_indexable - ? false - : connector.periodic_indexing_enabled + !connector.is_indexable ? false : connector.periodic_indexing_enabled ); setFrequencyMinutes(connector.indexing_frequency_minutes?.toString() || "1440"); // Reset dates - user can set new ones for re-indexing @@ -882,20 +880,14 @@ export const useConnectorDialog = () => { const endDateStr = endDate ? format(endDate, "yyyy-MM-dd") : undefined; // Update connector with periodic sync settings and config changes - // Note: Periodic sync is disabled for Google Drive connectors if (periodicEnabled || indexingConnectorConfig) { const frequency = periodicEnabled ? parseInt(frequencyMinutes, 10) : undefined; await updateConnector({ id: indexingConfig.connectorId, data: { - ...(periodicEnabled && - indexingConfig.connectorType !== "GOOGLE_DRIVE_CONNECTOR" && { - periodic_indexing_enabled: true, - indexing_frequency_minutes: frequency, - }), - ...(indexingConfig.connectorType === "GOOGLE_DRIVE_CONNECTOR" && { - periodic_indexing_enabled: false, - indexing_frequency_minutes: null, + ...(periodicEnabled && { + periodic_indexing_enabled: true, + indexing_frequency_minutes: frequency, }), ...(indexingConnectorConfig && { config: indexingConnectorConfig, @@ -912,11 +904,18 @@ export const useConnectorDialog = () => { const selectedFiles = indexingConnectorConfig.selected_files as | Array<{ id: string; name: string }> | undefined; + const indexingOptions = indexingConnectorConfig.indexing_options as + | { + max_files_per_folder: number; + incremental_sync: boolean; + include_subfolders: boolean; + } + | undefined; if ( (selectedFolders && selectedFolders.length > 0) || (selectedFiles && selectedFiles.length > 0) ) { - // Index with folder/file selection + // Index with folder/file selection and indexing options await indexConnector({ connector_id: indexingConfig.connectorId, queryParams: { @@ -925,6 +924,11 @@ export const useConnectorDialog = () => { body: { folders: selectedFolders || [], files: selectedFiles || [], + indexing_options: indexingOptions || { + max_files_per_folder: 100, + incremental_sync: true, + include_subfolders: true, + }, }, }); } else { @@ -964,7 +968,7 @@ export const useConnectorDialog = () => { ); // Track periodic indexing started if enabled - if (periodicEnabled && indexingConfig.connectorType !== "GOOGLE_DRIVE_CONNECTOR") { + if (periodicEnabled) { trackPeriodicIndexingStarted( Number(searchSpaceId), indexingConfig.connectorType, @@ -1072,12 +1076,8 @@ export const useConnectorDialog = () => { setEditingConnector(connector); setConnectorName(connector.name); - // Load existing periodic sync settings (disabled for Google Drive and non-indexable connectors) - setPeriodicEnabled( - connector.connector_type === "GOOGLE_DRIVE_CONNECTOR" || !connector.is_indexable - ? false - : connector.periodic_indexing_enabled - ); + // Load existing periodic sync settings (disabled for non-indexable connectors) + setPeriodicEnabled(!connector.is_indexable ? false : connector.periodic_indexing_enabled); setFrequencyMinutes(connector.indexing_frequency_minutes?.toString() || "1440"); // Reset dates - user can set new ones for re-indexing setStartDate(undefined); @@ -1117,6 +1117,24 @@ export const useConnectorDialog = () => { return; } + // Prevent periodic indexing for Google Drive without folders/files selected + if (periodicEnabled && editingConnector.connector_type === "GOOGLE_DRIVE_CONNECTOR") { + const selectedFolders = (connectorConfig || editingConnector.config)?.selected_folders as + | Array<{ id: string; name: string }> + | undefined; + const selectedFiles = (connectorConfig || editingConnector.config)?.selected_files as + | Array<{ id: string; name: string }> + | undefined; + const hasItemsSelected = + (selectedFolders && selectedFolders.length > 0) || + (selectedFiles && selectedFiles.length > 0); + + if (!hasItemsSelected) { + toast.error("Select at least one folder or file to enable periodic sync"); + return; + } + } + // Validate frequency minutes if periodic is enabled (only for indexable connectors) if (periodicEnabled && editingConnector.is_indexable) { const frequencyValidation = frequencyMinutesSchema.safeParse(frequencyMinutes); @@ -1132,23 +1150,14 @@ export const useConnectorDialog = () => { const endDateStr = endDate ? format(endDate, "yyyy-MM-dd") : undefined; // Update connector with periodic sync settings, config changes, and name - // Note: Periodic sync is disabled for Google Drive connectors and non-indexable connectors const frequency = periodicEnabled && editingConnector.is_indexable ? parseInt(frequencyMinutes, 10) : null; await updateConnector({ id: editingConnector.id, data: { name: connectorName || editingConnector.name, - periodic_indexing_enabled: - editingConnector.connector_type === "GOOGLE_DRIVE_CONNECTOR" || - !editingConnector.is_indexable - ? false - : periodicEnabled, - indexing_frequency_minutes: - editingConnector.connector_type === "GOOGLE_DRIVE_CONNECTOR" || - !editingConnector.is_indexable - ? null - : frequency, + periodic_indexing_enabled: !editingConnector.is_indexable ? false : periodicEnabled, + indexing_frequency_minutes: !editingConnector.is_indexable ? null : frequency, config: connectorConfig || editingConnector.config, }, }); @@ -1166,6 +1175,13 @@ export const useConnectorDialog = () => { const selectedFiles = (connectorConfig || editingConnector.config)?.selected_files as | Array<{ id: string; name: string }> | undefined; + const indexingOptions = (connectorConfig || editingConnector.config)?.indexing_options as + | { + max_files_per_folder: number; + incremental_sync: boolean; + include_subfolders: boolean; + } + | undefined; if ( (selectedFolders && selectedFolders.length > 0) || (selectedFiles && selectedFiles.length > 0) @@ -1178,6 +1194,11 @@ export const useConnectorDialog = () => { body: { folders: selectedFolders || [], files: selectedFiles || [], + indexing_options: indexingOptions || { + max_files_per_folder: 100, + incremental_sync: true, + include_subfolders: true, + }, }, }); const totalItems = (selectedFolders?.length || 0) + (selectedFiles?.length || 0); @@ -1221,12 +1242,8 @@ export const useConnectorDialog = () => { ); } - // Track periodic indexing if enabled (for non-Google Drive connectors) - if ( - periodicEnabled && - editingConnector.is_indexable && - editingConnector.connector_type !== "GOOGLE_DRIVE_CONNECTOR" - ) { + // Track periodic indexing if enabled + if (periodicEnabled && editingConnector.is_indexable) { trackPeriodicIndexingStarted( Number(searchSpaceId), editingConnector.connector_type, diff --git a/surfsense_web/components/connectors/google-drive-folder-tree.tsx b/surfsense_web/components/connectors/google-drive-folder-tree.tsx index 4a3b0ad52..894564167 100644 --- a/surfsense_web/components/connectors/google-drive-folder-tree.tsx +++ b/surfsense_web/components/connectors/google-drive-folder-tree.tsx @@ -5,13 +5,13 @@ import { ChevronRight, File, FileText, - Folder, + FolderClosed, FolderOpen, HardDrive, Image, Loader2, Presentation, - Sheet, + FileSpreadsheet, } from "lucide-react"; import { useState } from "react"; import { Checkbox } from "@/components/ui/checkbox"; @@ -53,16 +53,16 @@ interface GoogleDriveFolderTreeProps { // Helper to get appropriate icon for file type function getFileIcon(mimeType: string, className: string = "h-4 w-4") { if (mimeType.includes("spreadsheet") || mimeType.includes("excel")) { - return ; + return ; } if (mimeType.includes("presentation") || mimeType.includes("powerpoint")) { - return ; + return ; } if (mimeType.includes("document") || mimeType.includes("word") || mimeType.includes("text")) { - return ; + return ; } if (mimeType.includes("image")) { - return ; + return ; } return ; } @@ -280,9 +280,9 @@ export function GoogleDriveFolderTree({
{isFolder ? ( isExpanded ? ( - + ) : ( - + ) ) : ( getFileIcon(item.mimeType, "h-3 w-3 sm:h-4 sm:w-4")