diff --git a/surfsense_backend/app/agents/new_chat/tools/podcast.py b/surfsense_backend/app/agents/new_chat/tools/podcast.py index 424b04f77..1048ed881 100644 --- a/surfsense_backend/app/agents/new_chat/tools/podcast.py +++ b/surfsense_backend/app/agents/new_chat/tools/podcast.py @@ -54,7 +54,9 @@ def set_generating_podcast(search_space_id: int, podcast_id: int) -> None: client = get_redis_client() client.setex(_redis_key(search_space_id), 1800, str(podcast_id)) except Exception as e: - print(f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}") + print( + f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}" + ) def create_generate_podcast_tool( diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index e3b988676..5b8c4b993 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -4,6 +4,8 @@ Composio Google Drive Connector Module. Provides Google Drive specific methods for data retrieval and indexing via Composio. """ +import hashlib +import json import logging import os import tempfile @@ -464,6 +466,55 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def check_document_by_content_hash( + session: AsyncSession, content_hash: str +) -> Document | None: + """Check if a document with the given content hash already exists. + + This is used to prevent duplicate content from being indexed, regardless + of which connector originally indexed it. + """ + from sqlalchemy.future import select + + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + return existing_doc_result.scalars().first() + + +async def check_document_by_google_drive_file_id( + session: AsyncSession, file_id: str, search_space_id: int +) -> Document | None: + """Check if a document with this Google Drive file ID exists (from any connector). + + This checks both metadata key formats: + - 'google_drive_file_id' (normal Google Drive connector) + - 'file_id' (Composio Google Drive connector) + + This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls. + """ + from sqlalchemy import String, cast, or_ + from sqlalchemy.future import select + + # When casting JSON to String, the result includes quotes: "value" instead of value + # So we need to compare with the quoted version + quoted_file_id = f'"{file_id}"' + + existing_doc_result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + or_( + # Normal Google Drive connector format + cast(Document.document_metadata["google_drive_file_id"], String) + == quoted_file_id, + # Composio Google Drive connector format + cast(Document.document_metadata["file_id"], String) == quoted_file_id, + ), + ) + ) + return existing_doc_result.scalars().first() + + async def update_connector_last_indexed( session: AsyncSession, connector, @@ -477,6 +528,33 @@ async def update_connector_last_indexed( logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") +def generate_indexing_settings_hash( + selected_folders: list[dict], + selected_files: list[dict], + indexing_options: dict, +) -> str: + """Generate a hash of indexing settings to detect configuration changes. + + This hash is used to determine if indexing settings have changed since + the last index, which would require a full re-scan instead of delta sync. + + Args: + selected_folders: List of {id, name} for folders to index + selected_files: List of {id, name} for individual files to index + indexing_options: Dict with max_files_per_folder, include_subfolders, etc. + + Returns: + MD5 hash string of the settings + """ + settings = { + "folders": sorted([f.get("id", "") for f in selected_folders]), + "files": sorted([f.get("id", "") for f in selected_files]), + "include_subfolders": indexing_options.get("include_subfolders", True), + "max_files_per_folder": indexing_options.get("max_files_per_folder", 100), + } + return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest() + + async def index_composio_google_drive( session: AsyncSession, connector, @@ -487,12 +565,16 @@ async def index_composio_google_drive( log_entry, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """Index Google Drive files via Composio with delta sync support. + Returns: + Tuple of (documents_indexed, documents_skipped, error_message or None) + Delta Sync Flow: 1. First sync: Full scan + get initial page token 2. Subsequent syncs: Use LIST_CHANGES to process only changed files + (unless settings changed or incremental_sync is disabled) Supports folder/file selection via connector config: - selected_folders: List of {id, name} for folders to index @@ -508,12 +590,42 @@ async def index_composio_google_drive( selected_files = connector_config.get("selected_files", []) indexing_options = connector_config.get("indexing_options", {}) - # Check for stored page token for delta sync - stored_page_token = connector_config.get("drive_page_token") - use_delta_sync = stored_page_token and connector.last_indexed_at - max_files_per_folder = indexing_options.get("max_files_per_folder", 100) include_subfolders = indexing_options.get("include_subfolders", True) + incremental_sync = indexing_options.get("incremental_sync", True) + + # Generate current settings hash to detect configuration changes + current_settings_hash = generate_indexing_settings_hash( + selected_folders, selected_files, indexing_options + ) + last_settings_hash = connector_config.get("last_indexed_settings_hash") + + # Detect if settings changed since last index + settings_changed = ( + last_settings_hash is not None + and current_settings_hash != last_settings_hash + ) + + if settings_changed: + logger.info( + f"Indexing settings changed for connector {connector_id}. " + f"Will perform full re-scan to apply new configuration." + ) + + # Check for stored page token for delta sync + stored_page_token = connector_config.get("drive_page_token") + + # Determine whether to use delta sync: + # - Must have a stored page token + # - Must have been indexed before (last_indexed_at exists) + # - User must have incremental_sync enabled + # - Settings must not have changed (folder/subfolder config) + use_delta_sync = ( + incremental_sync + and stored_page_token + and connector.last_indexed_at + and not settings_changed + ) # Route to delta sync or full scan if use_delta_sync: @@ -588,6 +700,14 @@ async def index_composio_google_drive( elif token_error: logger.warning(f"Failed to get new page token: {token_error}") + # Save current settings hash for future change detection + # This allows detecting when folder/subfolder settings change + if not connector.config: + connector.config = {} + connector.config["last_indexed_settings_hash"] = current_settings_hash + flag_modified(connector, "config") + logger.info(f"Saved indexing settings hash for connector {connector_id}") + # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) @@ -628,11 +748,11 @@ async def index_composio_google_drive( }, ) - return documents_indexed, error_message + return documents_indexed, documents_skipped, error_message except Exception as e: logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Google Drive via Composio: {e!s}" + return 0, 0, f"Failed to index Google Drive via Composio: {e!s}" async def _index_composio_drive_delta_sync( @@ -953,13 +1073,28 @@ async def _process_single_drive_file( """ processing_errors = [] + # ========== EARLY DUPLICATE CHECK BY FILE ID ========== + # Check if this Google Drive file was already indexed by ANY connector + # This happens BEFORE download/ETL to save expensive API calls + existing_by_file_id = await check_document_by_google_drive_file_id( + session, file_id, search_space_id + ) + if existing_by_file_id: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): already indexed " + f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' " + f"(saved download & ETL cost)" + ) + return 0, 1, processing_errors # Skip - NO download, NO ETL! + # ====================================================== + # Generate unique identifier hash document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) unique_identifier_hash = generate_unique_identifier_hash( document_type, f"drive_{file_id}", search_space_id ) - # Check if document exists + # Check if document exists by unique identifier (same connector, same file) existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash ) @@ -1000,7 +1135,7 @@ async def _process_single_drive_file( if existing_document: if existing_document.content_hash == content_hash: - return 0, 1, processing_errors # Skipped + return 0, 1, processing_errors # Skipped - unchanged # Update existing document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1039,7 +1174,19 @@ async def _process_single_drive_file( existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - updated + + # Check if content_hash already exists (from any connector) + # This prevents duplicate content and avoids IntegrityError on unique constraint + existing_by_content_hash = await check_document_by_content_hash( + session, content_hash + ) + if existing_by_content_hash: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): identical content " + f"already indexed as '{existing_by_content_hash.title}'" + ) + return 0, 1, processing_errors # Skipped - duplicate content # Create new document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1085,7 +1232,7 @@ async def _process_single_drive_file( ) session.add(document) - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - new async def _fetch_folder_files_recursively( diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 541e25a75..38352d348 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -670,7 +670,9 @@ async def delete_thread( ) from None -@router.post("/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse) +@router.post( + "/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse +) async def complete_clone( thread_id: int, session: AsyncSession = Depends(get_async_session), @@ -702,7 +704,9 @@ async def complete_clone( raise HTTPException(status_code=400, detail="Clone already completed") if not thread.cloned_from_thread_id: - raise HTTPException(status_code=400, detail="No source thread to clone from") + raise HTTPException( + status_code=400, detail="No source thread to clone from" + ) message_count = await complete_clone_content( session=session, diff --git a/surfsense_backend/app/routes/public_chat_routes.py b/surfsense_backend/app/routes/public_chat_routes.py index 8b4f42559..4676f2ad0 100644 --- a/surfsense_backend/app/routes/public_chat_routes.py +++ b/surfsense_backend/app/routes/public_chat_routes.py @@ -53,7 +53,9 @@ async def clone_public_chat_endpoint( source_thread = await get_thread_by_share_token(session, share_token) if not source_thread: - raise HTTPException(status_code=404, detail="Chat not found or no longer public") + raise HTTPException( + status_code=404, detail="Chat not found or no longer public" + ) target_search_space_id = await get_user_default_search_space(session, user.id) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 535f579a5..26cf82e81 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -187,6 +187,7 @@ async def create_search_source_connector( user_id=str(user.id), connector_type=db_connector.connector_type, frequency_minutes=db_connector.indexing_frequency_minutes, + connector_config=db_connector.config, ) if not success: logger.warning( @@ -646,6 +647,7 @@ async def index_connector_content( # Handle different connector types response_message = "" + indexing_started = True # Use UTC for consistency with last_indexed_at storage today_str = datetime.now(UTC).strftime("%Y-%m-%d") @@ -921,14 +923,31 @@ async def index_connector_content( elif connector.connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import index_crawled_urls_task + from app.utils.webcrawler_utils import parse_webcrawler_urls - logger.info( - f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" - ) - index_crawled_urls_task.delay( - connector_id, search_space_id, str(user.id), indexing_from, indexing_to - ) - response_message = "Web page indexing started in the background." + # Check if URLs are configured before triggering indexing + connector_config = connector.config or {} + urls = parse_webcrawler_urls(connector_config.get("INITIAL_URLS")) + + if not urls: + # URLs are optional - skip indexing gracefully + logger.info( + f"Webcrawler connector {connector_id} has no URLs configured, skipping indexing" + ) + response_message = "No URLs configured for this connector. Add URLs in the connector settings to enable indexing." + indexing_started = False + else: + logger.info( + f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_crawled_urls_task.delay( + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Web page indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.OBSIDIAN_CONNECTOR: from app.config import config as app_config @@ -1025,6 +1044,7 @@ async def index_connector_content( return { "message": response_message, + "indexing_started": indexing_started, "connector_id": connector_id, "search_space_id": search_space_id, "indexing_from": indexing_from, @@ -1223,8 +1243,15 @@ async def _run_indexing_with_notifications( indexing_kwargs["on_retry_callback"] = on_retry_callback # Run the indexing function - documents_processed, error_or_warning = await indexing_function(**indexing_kwargs) - current_indexed_count = documents_processed + # Some indexers return (indexed, error), others return (indexed, skipped, error) + result = await indexing_function(**indexing_kwargs) + + # Handle both 2-tuple and 3-tuple returns for backwards compatibility + if len(result) == 3: + documents_processed, documents_skipped, error_or_warning = result + else: + documents_processed, error_or_warning = result + documents_skipped = None # Update connector timestamp if function provided and indexing was successful if documents_processed > 0 and update_timestamp_func: @@ -1252,6 +1279,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1278,6 +1306,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1319,6 +1348,7 @@ async def _run_indexing_with_notifications( indexed_count=0, error_message=notification_message, # Pass as warning, not error is_warning=True, # Flag to indicate this is a warning, not an error + skipped_count=documents_skipped, ) await ( session.commit() @@ -1334,6 +1364,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=error_or_warning, + skipped_count=documents_skipped, ) await ( session.commit() @@ -1355,6 +1386,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=None, # No error - sync succeeded + skipped_count=documents_skipped, ) await ( session.commit() @@ -1372,6 +1404,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=str(e), + skipped_count=None, # Unknown on exception ) except Exception as notif_error: logger.error(f"Failed to update notification: {notif_error!s}") diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index b420b1b91..ab6be9c9f 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -257,14 +257,11 @@ class PublicChatResponse(BaseModel): class CloneInitResponse(BaseModel): - - thread_id: int search_space_id: int share_token: str class CompleteCloneResponse(BaseModel): - status: str message_count: int diff --git a/surfsense_backend/app/schemas/podcasts.py b/surfsense_backend/app/schemas/podcasts.py index 9e5cb0262..60f9d7dc0 100644 --- a/surfsense_backend/app/schemas/podcasts.py +++ b/surfsense_backend/app/schemas/podcasts.py @@ -59,6 +59,8 @@ class PodcastRead(PodcastBase): "search_space_id": obj.search_space_id, "status": obj.status, "created_at": obj.created_at, - "transcript_entries": len(obj.podcast_transcript) if obj.podcast_transcript else None, + "transcript_entries": len(obj.podcast_transcript) + if obj.podcast_transcript + else None, } return cls(**data) diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 6a3db566b..1a91d000f 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -422,6 +422,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): indexed_count: int, error_message: str | None = None, is_warning: bool = False, + skipped_count: int | None = None, ) -> Notification: """ Update notification when connector indexing completes. @@ -432,6 +433,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): indexed_count: Total number of items indexed error_message: Error message if indexing failed, or warning message (optional) is_warning: If True, treat error_message as a warning (success case) rather than an error + skipped_count: Number of items skipped (e.g., duplicates) - optional Returns: Updated notification @@ -440,6 +442,14 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "connector_name", "Connector" ) + # Build the skipped text if there are skipped items + skipped_text = "" + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + skipped_text = ( + f" ({skipped_count} {skipped_item_text} skipped - already indexed)" + ) + # If there's an error message but items were indexed, treat it as a warning (partial success) # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) # Otherwise, treat it as a failure @@ -448,12 +458,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): # Partial success with warnings (e.g., duplicate content from other connectors) title = f"Ready: {connector_name}" item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}" + message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}" status = "completed" elif is_warning: # Warning case (e.g., duplicates found) - treat as success title = f"Ready: {connector_name}" - message = f"Sync completed. {error_message}" + message = f"Sync completed{skipped_text}. {error_message}" status = "completed" else: # Complete failure @@ -463,14 +473,21 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): else: title = f"Ready: {connector_name}" if indexed_count == 0: - message = "Already up to date! No new items to sync." + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)." + else: + message = "Already up to date! No new items to sync." else: item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced." + message = ( + f"Now searchable! {indexed_count} {item_text} synced{skipped_text}." + ) status = "completed" metadata_updates = { "indexed_count": indexed_count, + "skipped_count": skipped_count or 0, "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", diff --git a/surfsense_backend/app/services/public_chat_service.py b/surfsense_backend/app/services/public_chat_service.py index 79618974f..a5b8c9ffe 100644 --- a/surfsense_backend/app/services/public_chat_service.py +++ b/surfsense_backend/app/services/public_chat_service.py @@ -291,6 +291,9 @@ async def complete_clone_content( if old_podcast_id and old_podcast_id in podcast_id_map: result_data["podcast_id"] = podcast_id_map[old_podcast_id] + elif old_podcast_id: + # Podcast couldn't be cloned (not ready), remove reference + result_data.pop("podcast_id", None) new_message = NewChatMessage( thread_id=target_thread.id, diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 0ce714cdc..2ce8716e0 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -55,7 +55,9 @@ def _clear_generating_podcast(search_space_id: int) -> None: client = redis.from_url(redis_url, decode_responses=True) key = f"podcast:generating:{search_space_id}" client.delete(key) - logger.info(f"Cleared generating podcast key for search_space_id={search_space_id}") + logger.info( + f"Cleared generating podcast key for search_space_id={search_space_id}" + ) except Exception as e: logger.warning(f"Could not clear generating podcast key: {e}") @@ -119,9 +121,7 @@ async def _generate_content_podcast( ) -> dict: """Generate content-based podcast and update existing record.""" async with get_celery_session_maker()() as session: - result = await session.execute( - select(Podcast).filter(Podcast.id == podcast_id) - ) + result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id)) podcast = result.scalars().first() if not podcast: diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index bf80cbe78..b33e25170 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -156,6 +156,41 @@ async def _check_and_trigger_schedules(): ) await session.commit() continue + + # Special handling for Webcrawler - skip if no URLs configured + elif ( + connector.connector_type + == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR + ): + from app.utils.webcrawler_utils import parse_webcrawler_urls + + connector_config = connector.config or {} + urls = parse_webcrawler_urls( + connector_config.get("INITIAL_URLS") + ) + + if urls: + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + None, # start_date + None, # end_date + ) + else: + # No URLs configured - skip indexing but still update next_scheduled_at + logger.info( + f"Webcrawler connector {connector.id} has no URLs configured, " + "skipping periodic indexing (will check again at next scheduled time)" + ) + from datetime import timedelta + + connector.next_scheduled_at = now + timedelta( + minutes=connector.indexing_frequency_minutes + ) + await session.commit() + continue + else: task.delay( connector.id, diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index f97652114..ffc4a1f27 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -86,7 +86,7 @@ async def index_composio_connector( end_date: str | None = None, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """ Index content from a Composio connector. @@ -104,7 +104,7 @@ async def index_composio_connector( max_items: Maximum number of items to fetch Returns: - Tuple of (number_of_indexed_items, error_message or None) + Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None) """ task_logger = TaskLoggingService(session, search_space_id) @@ -132,14 +132,14 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "InvalidConnectorType"} ) - return 0, error_msg + return 0, 0, error_msg if not connector: error_msg = f"Composio connector with ID {connector_id} not found" await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ConnectorNotFound"} ) - return 0, error_msg + return 0, 0, error_msg # Get toolkit ID from config toolkit_id = connector.config.get("toolkit_id") @@ -150,7 +150,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "MissingToolkitId"} ) - return 0, error_msg + return 0, 0, error_msg # Check if toolkit is indexable if toolkit_id not in INDEXABLE_TOOLKITS: @@ -158,7 +158,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ToolkitNotIndexable"} ) - return 0, error_msg + return 0, 0, error_msg # Get indexer function from registry try: @@ -167,7 +167,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, str(e), {"error_type": "NoIndexerImplemented"} ) - return 0, str(e) + return 0, 0, str(e) # Build kwargs for the indexer function kwargs = { @@ -199,7 +199,7 @@ async def index_composio_connector( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -209,4 +209,4 @@ async def index_composio_connector( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True) - return 0, f"Failed to index Composio connector: {e!s}" + return 0, 0, f"Failed to index Composio connector: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 4d5a33b79..6bb62d716 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -20,6 +20,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -317,6 +318,24 @@ async def index_airtable_records( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Airtable record {record_id} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate document summary user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index a1067255d..e183ab333 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -308,6 +309,22 @@ async def index_bookstack_pages( logger.info(f"Successfully updated BookStack page {page_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"BookStack page {page_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index e459584f8..887c3e2e5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -302,6 +303,22 @@ async def index_clickup_tasks( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"ClickUp task {task_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index ddbefafb9..5673839bb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -306,6 +307,22 @@ async def index_confluence_pages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Confluence page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 8f0c76e53..9e401b335 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_markdown, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -454,6 +455,24 @@ async def index_discord_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 4a8df4918..fb6989bb9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -24,6 +24,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -319,6 +320,21 @@ async def _process_repository_digest( # Delete existing document to replace with new one await session.delete(existing_document) await session.flush() + else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Repository {repo_full_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + return 0 # Generate summary using LLM (ONE call per repository!) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 08d2904d6..e599abd22 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -24,7 +24,9 @@ from app.utils.document_converters import ( ) from .base import ( + calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -163,10 +165,22 @@ async def index_google_gmail_messages( credentials, session, user_id, connector_id ) + # Calculate date range using last_indexed_at if dates not provided + # This ensures Gmail uses the same date logic as other connectors + # (uses last_indexed_at → now, or 365 days back for first-time indexing) + calculated_start_date, calculated_end_date = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + # Fetch recent Google gmail messages - logger.info(f"Fetching recent emails for connector {connector_id}") + logger.info( + f"Fetching emails for connector {connector_id} " + f"from {calculated_start_date} to {calculated_end_date}" + ) messages, error = await gmail_connector.get_recent_messages( - max_results=max_messages, start_date=start_date, end_date=end_date + max_results=max_messages, + start_date=calculated_start_date, + end_date=calculated_end_date, ) if error: @@ -316,6 +330,22 @@ async def index_google_gmail_messages( logger.info(f"Successfully updated Gmail message {subject}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Gmail message {subject} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 4851a6466..d6095d20e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -284,6 +285,22 @@ async def index_jira_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Jira issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 7d8e0c30e..d00a39160 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -315,6 +316,22 @@ async def index_linear_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Linear issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index ead259a44..59890dbe4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -363,6 +364,22 @@ async def index_luma_events( logger.info(f"Successfully updated Luma event {event_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Luma event {event_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 7704dec95..b8d2297c5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -23,6 +23,7 @@ from .base import ( build_document_metadata_string, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -388,6 +389,22 @@ async def index_notion_pages( continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Notion page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Get user's long context LLM user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index 4c4dab4c2..a603d3fba 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -28,6 +28,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_string, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -426,6 +427,22 @@ async def index_obsidian_vault( indexed_count += 1 else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Obsidian note {title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + skipped_count += 1 + continue + # Create new document logger.info(f"Indexing new note: {title}") diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index f6ed4f567..f244c97f8 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -22,6 +22,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -325,6 +326,22 @@ async def index_slack_messages( logger.info(f"Successfully updated Slack message {msg_ts}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks(combined_document_string) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index b879ddfcb..66b709ddc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -21,6 +21,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -354,6 +355,27 @@ async def index_teams_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + "Teams message %s in channel %s already indexed by another connector " + "(existing document ID: %s, type: %s). Skipping.", + message_id, + channel_name, + duplicate_by_content.id, + duplicate_by_content.document_type, + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index fb1aae5f2..0c63fd2f0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -18,9 +18,11 @@ from app.utils.document_converters import ( generate_document_summary, generate_unique_identifier_hash, ) +from app.utils.webcrawler_utils import parse_webcrawler_urls from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -96,13 +98,7 @@ async def index_crawled_urls( api_key = connector.config.get("FIRECRAWL_API_KEY") # Get URLs from connector config - initial_urls = connector.config.get("INITIAL_URLS", "") - if isinstance(initial_urls, str): - urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] - elif isinstance(initial_urls, list): - urls = [url.strip() for url in initial_urls if url.strip()] - else: - urls = [] + urls = parse_webcrawler_urls(connector.config.get("INITIAL_URLS")) logger.info( f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs" @@ -281,6 +277,22 @@ async def index_crawled_urls( logger.info(f"Successfully updated URL {url}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"URL {url} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 5161fb569..6c4be0cb8 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -55,7 +55,9 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( ) # Timeout calculation constants -UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections) +UPLOAD_BYTES_PER_SECOND_SLOW = ( + 100 * 1024 +) # 100 KB/s (conservative for slow connections) MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing @@ -219,19 +221,19 @@ async def find_existing_document_with_migration( def calculate_upload_timeout(file_size_bytes: int) -> float: """ Calculate appropriate upload timeout based on file size. - + Assumes a conservative slow connection speed to handle worst-case scenarios. - + Args: file_size_bytes: Size of the file in bytes - + Returns: Timeout in seconds """ # Calculate time needed at slow connection speed # Add 50% buffer for network variability and SSL overhead estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 - + # Clamp to reasonable bounds return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) @@ -239,21 +241,21 @@ def calculate_upload_timeout(file_size_bytes: int) -> float: def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: """ Calculate job processing timeout based on page count and file size. - + Args: estimated_pages: Estimated number of pages file_size_bytes: Size of the file in bytes - + Returns: Timeout in seconds """ # Base timeout + time per page page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) - + # Also consider file size (large images take longer to process) # ~1 minute per 10MB of file size size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 - + # Use the larger of the two estimates return max(page_based_timeout, size_based_timeout) @@ -284,18 +286,18 @@ async def parse_with_llamacloud_retry( """ import os import random - + from llama_cloud_services import LlamaParse from llama_cloud_services.parse.utils import ResultType # Get file size for timeout calculations file_size_bytes = os.path.getsize(file_path) file_size_mb = file_size_bytes / (1024 * 1024) - + # Calculate dynamic timeouts based on file size and page count upload_timeout = calculate_upload_timeout(file_size_bytes) job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) - + # HTTP client timeouts - scaled based on file size # Write timeout is critical for large file uploads custom_timeout = httpx.Timeout( @@ -304,7 +306,7 @@ async def parse_with_llamacloud_retry( write=upload_timeout, # Dynamic based on file size (upload time) pool=120.0, # 2 minutes to acquire connection from pool ) - + logging.info( f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " @@ -335,14 +337,14 @@ async def parse_with_llamacloud_retry( # Parse the file asynchronously result = await parser.aparse(file_path) - + # Success - log if we had previous failures if attempt > 1: logging.info( f"LlamaCloud upload succeeded on attempt {attempt} after " f"{len(attempt_errors)} failures" ) - + return result except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: @@ -355,8 +357,7 @@ async def parse_with_llamacloud_retry( # Calculate exponential backoff with jitter # Base delay doubles each attempt, capped at max delay base_delay = min( - LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), - LLAMACLOUD_MAX_DELAY + LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_MAX_DELAY ) # Add random jitter (±25%) to prevent thundering herd jitter = base_delay * 0.25 * (2 * random.random() - 1) diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 219641933..aa8c07ce4 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -43,6 +43,7 @@ def create_periodic_schedule( user_id: str, connector_type: SearchSourceConnectorType, frequency_minutes: int, + connector_config: dict | None = None, ) -> bool: """ Trigger the first indexing run immediately when periodic indexing is enabled. @@ -57,11 +58,26 @@ def create_periodic_schedule( user_id: User ID connector_type: Type of connector frequency_minutes: Frequency in minutes (used for logging) + connector_config: Optional connector config dict for validation Returns: True if successful, False otherwise """ try: + # Special handling for connectors that require config validation + if connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: + from app.utils.webcrawler_utils import parse_webcrawler_urls + + config = connector_config or {} + urls = parse_webcrawler_urls(config.get("INITIAL_URLS")) + + if not urls: + logger.info( + f"Webcrawler connector {connector_id} has no URLs configured, " + "skipping first indexing run (will run when URLs are added)" + ) + return True # Return success - schedule is created, just no first run + logger.info( f"Periodic indexing enabled for connector {connector_id} " f"(frequency: {frequency_minutes} minutes). Triggering first run..." diff --git a/surfsense_backend/app/utils/webcrawler_utils.py b/surfsense_backend/app/utils/webcrawler_utils.py new file mode 100644 index 000000000..31d2ebe50 --- /dev/null +++ b/surfsense_backend/app/utils/webcrawler_utils.py @@ -0,0 +1,28 @@ +""" +Utility functions for webcrawler connector. +""" + + +def parse_webcrawler_urls(initial_urls: str | list | None) -> list[str]: + """ + Parse URLs from webcrawler INITIAL_URLS value. + + Handles both string (newline-separated) and list formats. + + Args: + initial_urls: The INITIAL_URLS value (string, list, or None) + + Returns: + List of parsed, stripped, non-empty URLs + """ + if initial_urls is None: + return [] + + if isinstance(initial_urls, str): + return [url.strip() for url in initial_urls.split("\n") if url.strip()] + elif isinstance(initial_urls, list): + return [ + url.strip() for url in initial_urls if isinstance(url, str) and url.strip() + ] + else: + return [] diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index f6f70f83b..1f9dd433d 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -143,6 +143,7 @@ export default function NewChatPage() { const queryClient = useQueryClient(); const [isInitializing, setIsInitializing] = useState(true); const [isCompletingClone, setIsCompletingClone] = useState(false); + const [cloneError, setCloneError] = useState(false); const [threadId, setThreadId] = useState(null); const [currentThread, setCurrentThread] = useState(null); const [messages, setMessages] = useState([]); @@ -333,7 +334,7 @@ export default function NewChatPage() { // Handle clone completion when thread has clone_pending flag useEffect(() => { - if (!currentThread?.clone_pending || isCompletingClone) return; + if (!currentThread?.clone_pending || isCompletingClone || cloneError) return; const completeClone = async () => { setIsCompletingClone(true); @@ -351,13 +352,14 @@ export default function NewChatPage() { } catch (error) { console.error("[NewChatPage] Failed to complete clone:", error); toast.error("Failed to copy chat content. Please try again."); + setCloneError(true); } finally { setIsCompletingClone(false); } }; completeClone(); - }, [currentThread?.clone_pending, currentThread?.id, isCompletingClone, initializeThread, queryClient]); + }, [currentThread?.clone_pending, currentThread?.id, isCompletingClone, cloneError, initializeThread, queryClient]); // Handle scroll to comment from URL query params (e.g., from inbox item click) const searchParams = useSearchParams(); diff --git a/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json b/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json index 9c8585a0f..b729c3f8b 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json +++ b/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json @@ -24,11 +24,6 @@ "enabled": true, "status": "warning", "statusMessage": "Some requests may be blocked if not using Firecrawl." - }, - "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": { - "enabled": false, - "status": "disabled", - "statusMessage": "Not available yet." } }, "globalSettings": {