diff --git a/surfsense_backend/app/agents/new_chat/tools/podcast.py b/surfsense_backend/app/agents/new_chat/tools/podcast.py index 424b04f77..1048ed881 100644 --- a/surfsense_backend/app/agents/new_chat/tools/podcast.py +++ b/surfsense_backend/app/agents/new_chat/tools/podcast.py @@ -54,7 +54,9 @@ def set_generating_podcast(search_space_id: int, podcast_id: int) -> None: client = get_redis_client() client.setex(_redis_key(search_space_id), 1800, str(podcast_id)) except Exception as e: - print(f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}") + print( + f"[generate_podcast] Warning: Could not set generating podcast in Redis: {e}" + ) def create_generate_podcast_tool( diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 541e25a75..38352d348 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -670,7 +670,9 @@ async def delete_thread( ) from None -@router.post("/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse) +@router.post( + "/threads/{thread_id}/complete-clone", response_model=CompleteCloneResponse +) async def complete_clone( thread_id: int, session: AsyncSession = Depends(get_async_session), @@ -702,7 +704,9 @@ async def complete_clone( raise HTTPException(status_code=400, detail="Clone already completed") if not thread.cloned_from_thread_id: - raise HTTPException(status_code=400, detail="No source thread to clone from") + raise HTTPException( + status_code=400, detail="No source thread to clone from" + ) message_count = await complete_clone_content( session=session, diff --git a/surfsense_backend/app/routes/public_chat_routes.py b/surfsense_backend/app/routes/public_chat_routes.py index 8b4f42559..4676f2ad0 100644 --- a/surfsense_backend/app/routes/public_chat_routes.py +++ b/surfsense_backend/app/routes/public_chat_routes.py @@ -53,7 +53,9 @@ async def clone_public_chat_endpoint( source_thread = await get_thread_by_share_token(session, share_token) if not source_thread: - raise HTTPException(status_code=404, detail="Chat not found or no longer public") + raise HTTPException( + status_code=404, detail="Chat not found or no longer public" + ) target_search_space_id = await get_user_default_search_space(session, user.id) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 2237ddfa8..c4d4fdb3b 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -187,6 +187,7 @@ async def create_search_source_connector( user_id=str(user.id), connector_type=db_connector.connector_type, frequency_minutes=db_connector.indexing_frequency_minutes, + connector_config=db_connector.config, ) if not success: logger.warning( @@ -646,6 +647,7 @@ async def index_connector_content( # Handle different connector types response_message = "" + indexing_started = True # Use UTC for consistency with last_indexed_at storage today_str = datetime.now(UTC).strftime("%Y-%m-%d") @@ -921,14 +923,31 @@ async def index_connector_content( elif connector.connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import index_crawled_urls_task + from app.utils.webcrawler_utils import parse_webcrawler_urls - logger.info( - f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" - ) - index_crawled_urls_task.delay( - connector_id, search_space_id, str(user.id), indexing_from, indexing_to - ) - response_message = "Web page indexing started in the background." + # Check if URLs are configured before triggering indexing + connector_config = connector.config or {} + urls = parse_webcrawler_urls(connector_config.get("INITIAL_URLS")) + + if not urls: + # URLs are optional - skip indexing gracefully + logger.info( + f"Webcrawler connector {connector_id} has no URLs configured, skipping indexing" + ) + response_message = "No URLs configured for this connector. Add URLs in the connector settings to enable indexing." + indexing_started = False + else: + logger.info( + f"Triggering web pages indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_crawled_urls_task.delay( + connector_id, + search_space_id, + str(user.id), + indexing_from, + indexing_to, + ) + response_message = "Web page indexing started in the background." elif connector.connector_type == SearchSourceConnectorType.OBSIDIAN_CONNECTOR: from app.config import config as app_config @@ -1025,6 +1044,7 @@ async def index_connector_content( return { "message": response_message, + "indexing_started": indexing_started, "connector_id": connector_id, "search_space_id": search_space_id, "indexing_from": indexing_from, diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index b420b1b91..ab6be9c9f 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -257,14 +257,11 @@ class PublicChatResponse(BaseModel): class CloneInitResponse(BaseModel): - - thread_id: int search_space_id: int share_token: str class CompleteCloneResponse(BaseModel): - status: str message_count: int diff --git a/surfsense_backend/app/schemas/podcasts.py b/surfsense_backend/app/schemas/podcasts.py index 9e5cb0262..60f9d7dc0 100644 --- a/surfsense_backend/app/schemas/podcasts.py +++ b/surfsense_backend/app/schemas/podcasts.py @@ -59,6 +59,8 @@ class PodcastRead(PodcastBase): "search_space_id": obj.search_space_id, "status": obj.status, "created_at": obj.created_at, - "transcript_entries": len(obj.podcast_transcript) if obj.podcast_transcript else None, + "transcript_entries": len(obj.podcast_transcript) + if obj.podcast_transcript + else None, } return cls(**data) diff --git a/surfsense_backend/app/services/public_chat_service.py b/surfsense_backend/app/services/public_chat_service.py index 79618974f..a5b8c9ffe 100644 --- a/surfsense_backend/app/services/public_chat_service.py +++ b/surfsense_backend/app/services/public_chat_service.py @@ -291,6 +291,9 @@ async def complete_clone_content( if old_podcast_id and old_podcast_id in podcast_id_map: result_data["podcast_id"] = podcast_id_map[old_podcast_id] + elif old_podcast_id: + # Podcast couldn't be cloned (not ready), remove reference + result_data.pop("podcast_id", None) new_message = NewChatMessage( thread_id=target_thread.id, diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 0ce714cdc..2ce8716e0 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -55,7 +55,9 @@ def _clear_generating_podcast(search_space_id: int) -> None: client = redis.from_url(redis_url, decode_responses=True) key = f"podcast:generating:{search_space_id}" client.delete(key) - logger.info(f"Cleared generating podcast key for search_space_id={search_space_id}") + logger.info( + f"Cleared generating podcast key for search_space_id={search_space_id}" + ) except Exception as e: logger.warning(f"Could not clear generating podcast key: {e}") @@ -119,9 +121,7 @@ async def _generate_content_podcast( ) -> dict: """Generate content-based podcast and update existing record.""" async with get_celery_session_maker()() as session: - result = await session.execute( - select(Podcast).filter(Podcast.id == podcast_id) - ) + result = await session.execute(select(Podcast).filter(Podcast.id == podcast_id)) podcast = result.scalars().first() if not podcast: diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index bf80cbe78..b33e25170 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -156,6 +156,41 @@ async def _check_and_trigger_schedules(): ) await session.commit() continue + + # Special handling for Webcrawler - skip if no URLs configured + elif ( + connector.connector_type + == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR + ): + from app.utils.webcrawler_utils import parse_webcrawler_urls + + connector_config = connector.config or {} + urls = parse_webcrawler_urls( + connector_config.get("INITIAL_URLS") + ) + + if urls: + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + None, # start_date + None, # end_date + ) + else: + # No URLs configured - skip indexing but still update next_scheduled_at + logger.info( + f"Webcrawler connector {connector.id} has no URLs configured, " + "skipping periodic indexing (will check again at next scheduled time)" + ) + from datetime import timedelta + + connector.next_scheduled_at = now + timedelta( + minutes=connector.indexing_frequency_minutes + ) + await session.commit() + continue + else: task.delay( connector.id, diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 4d5a33b79..6bb62d716 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -20,6 +20,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -317,6 +318,24 @@ async def index_airtable_records( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Airtable record {record_id} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate document summary user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index a1067255d..e183ab333 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -308,6 +309,22 @@ async def index_bookstack_pages( logger.info(f"Successfully updated BookStack page {page_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"BookStack page {page_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index e459584f8..887c3e2e5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -302,6 +303,22 @@ async def index_clickup_tasks( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"ClickUp task {task_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index ddbefafb9..5673839bb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -306,6 +307,22 @@ async def index_confluence_pages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Confluence page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 8f0c76e53..9e401b335 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_markdown, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -454,6 +455,24 @@ async def index_discord_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 4a8df4918..fb6989bb9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -24,6 +24,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -319,6 +320,21 @@ async def _process_repository_digest( # Delete existing document to replace with new one await session.delete(existing_document) await session.flush() + else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Repository {repo_full_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + return 0 # Generate summary using LLM (ONE call per repository!) user_llm = await get_user_long_context_llm(session, user_id, search_space_id) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 08d2904d6..e599abd22 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -24,7 +24,9 @@ from app.utils.document_converters import ( ) from .base import ( + calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -163,10 +165,22 @@ async def index_google_gmail_messages( credentials, session, user_id, connector_id ) + # Calculate date range using last_indexed_at if dates not provided + # This ensures Gmail uses the same date logic as other connectors + # (uses last_indexed_at → now, or 365 days back for first-time indexing) + calculated_start_date, calculated_end_date = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + # Fetch recent Google gmail messages - logger.info(f"Fetching recent emails for connector {connector_id}") + logger.info( + f"Fetching emails for connector {connector_id} " + f"from {calculated_start_date} to {calculated_end_date}" + ) messages, error = await gmail_connector.get_recent_messages( - max_results=max_messages, start_date=start_date, end_date=end_date + max_results=max_messages, + start_date=calculated_start_date, + end_date=calculated_end_date, ) if error: @@ -316,6 +330,22 @@ async def index_google_gmail_messages( logger.info(f"Successfully updated Gmail message {subject}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Gmail message {subject} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index 4851a6466..d6095d20e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -284,6 +285,22 @@ async def index_jira_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Jira issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 7d8e0c30e..d00a39160 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -22,6 +22,7 @@ from app.utils.document_converters import ( from .base import ( calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -315,6 +316,22 @@ async def index_linear_issues( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Linear issue {issue_identifier} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index ead259a44..59890dbe4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -21,6 +21,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -363,6 +364,22 @@ async def index_luma_events( logger.info(f"Successfully updated Luma event {event_name}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Luma event {event_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 2d36351fa..70c4917da 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -22,6 +22,7 @@ from .base import ( build_document_metadata_string, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -360,6 +361,22 @@ async def index_notion_pages( continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Notion page {page_title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Get user's long context LLM user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index 4c4dab4c2..a603d3fba 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -28,6 +28,7 @@ from app.utils.document_converters import ( from .base import ( build_document_metadata_string, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -426,6 +427,22 @@ async def index_obsidian_vault( indexed_count += 1 else: + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Obsidian note {title} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + skipped_count += 1 + continue + # Create new document logger.info(f"Indexing new note: {title}") diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index f6ed4f567..f244c97f8 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -22,6 +22,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -325,6 +326,22 @@ async def index_slack_messages( logger.info(f"Successfully updated Slack message {msg_ts}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks(combined_document_string) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index b879ddfcb..66b709ddc 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -21,6 +21,7 @@ from .base import ( build_document_metadata_markdown, calculate_date_range, check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -354,6 +355,27 @@ async def index_teams_messages( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) + ) + + if duplicate_by_content: + logger.info( + "Teams message %s in channel %s already indexed by another connector " + "(existing document ID: %s, type: %s). Skipping.", + message_id, + channel_name, + duplicate_by_content.id, + duplicate_by_content.document_type, + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Process chunks chunks = await create_document_chunks( diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index fb1aae5f2..0c63fd2f0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -18,9 +18,11 @@ from app.utils.document_converters import ( generate_document_summary, generate_unique_identifier_hash, ) +from app.utils.webcrawler_utils import parse_webcrawler_urls from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -96,13 +98,7 @@ async def index_crawled_urls( api_key = connector.config.get("FIRECRAWL_API_KEY") # Get URLs from connector config - initial_urls = connector.config.get("INITIAL_URLS", "") - if isinstance(initial_urls, str): - urls = [url.strip() for url in initial_urls.split("\n") if url.strip()] - elif isinstance(initial_urls, list): - urls = [url.strip() for url in initial_urls if url.strip()] - else: - urls = [] + urls = parse_webcrawler_urls(connector.config.get("INITIAL_URLS")) logger.info( f"Starting crawled web page indexing for connector {connector_id} with {len(urls)} URLs" @@ -281,6 +277,22 @@ async def index_crawled_urls( logger.info(f"Successfully updated URL {url}") continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + logger.info( + f"URL {url} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping." + ) + documents_skipped += 1 + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 5161fb569..6c4be0cb8 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -55,7 +55,9 @@ LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( ) # Timeout calculation constants -UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections) +UPLOAD_BYTES_PER_SECOND_SLOW = ( + 100 * 1024 +) # 100 KB/s (conservative for slow connections) MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing @@ -219,19 +221,19 @@ async def find_existing_document_with_migration( def calculate_upload_timeout(file_size_bytes: int) -> float: """ Calculate appropriate upload timeout based on file size. - + Assumes a conservative slow connection speed to handle worst-case scenarios. - + Args: file_size_bytes: Size of the file in bytes - + Returns: Timeout in seconds """ # Calculate time needed at slow connection speed # Add 50% buffer for network variability and SSL overhead estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 - + # Clamp to reasonable bounds return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) @@ -239,21 +241,21 @@ def calculate_upload_timeout(file_size_bytes: int) -> float: def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: """ Calculate job processing timeout based on page count and file size. - + Args: estimated_pages: Estimated number of pages file_size_bytes: Size of the file in bytes - + Returns: Timeout in seconds """ # Base timeout + time per page page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) - + # Also consider file size (large images take longer to process) # ~1 minute per 10MB of file size size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 - + # Use the larger of the two estimates return max(page_based_timeout, size_based_timeout) @@ -284,18 +286,18 @@ async def parse_with_llamacloud_retry( """ import os import random - + from llama_cloud_services import LlamaParse from llama_cloud_services.parse.utils import ResultType # Get file size for timeout calculations file_size_bytes = os.path.getsize(file_path) file_size_mb = file_size_bytes / (1024 * 1024) - + # Calculate dynamic timeouts based on file size and page count upload_timeout = calculate_upload_timeout(file_size_bytes) job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) - + # HTTP client timeouts - scaled based on file size # Write timeout is critical for large file uploads custom_timeout = httpx.Timeout( @@ -304,7 +306,7 @@ async def parse_with_llamacloud_retry( write=upload_timeout, # Dynamic based on file size (upload time) pool=120.0, # 2 minutes to acquire connection from pool ) - + logging.info( f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " @@ -335,14 +337,14 @@ async def parse_with_llamacloud_retry( # Parse the file asynchronously result = await parser.aparse(file_path) - + # Success - log if we had previous failures if attempt > 1: logging.info( f"LlamaCloud upload succeeded on attempt {attempt} after " f"{len(attempt_errors)} failures" ) - + return result except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: @@ -355,8 +357,7 @@ async def parse_with_llamacloud_retry( # Calculate exponential backoff with jitter # Base delay doubles each attempt, capped at max delay base_delay = min( - LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), - LLAMACLOUD_MAX_DELAY + LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), LLAMACLOUD_MAX_DELAY ) # Add random jitter (±25%) to prevent thundering herd jitter = base_delay * 0.25 * (2 * random.random() - 1) diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 219641933..aa8c07ce4 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -43,6 +43,7 @@ def create_periodic_schedule( user_id: str, connector_type: SearchSourceConnectorType, frequency_minutes: int, + connector_config: dict | None = None, ) -> bool: """ Trigger the first indexing run immediately when periodic indexing is enabled. @@ -57,11 +58,26 @@ def create_periodic_schedule( user_id: User ID connector_type: Type of connector frequency_minutes: Frequency in minutes (used for logging) + connector_config: Optional connector config dict for validation Returns: True if successful, False otherwise """ try: + # Special handling for connectors that require config validation + if connector_type == SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: + from app.utils.webcrawler_utils import parse_webcrawler_urls + + config = connector_config or {} + urls = parse_webcrawler_urls(config.get("INITIAL_URLS")) + + if not urls: + logger.info( + f"Webcrawler connector {connector_id} has no URLs configured, " + "skipping first indexing run (will run when URLs are added)" + ) + return True # Return success - schedule is created, just no first run + logger.info( f"Periodic indexing enabled for connector {connector_id} " f"(frequency: {frequency_minutes} minutes). Triggering first run..." diff --git a/surfsense_backend/app/utils/webcrawler_utils.py b/surfsense_backend/app/utils/webcrawler_utils.py new file mode 100644 index 000000000..31d2ebe50 --- /dev/null +++ b/surfsense_backend/app/utils/webcrawler_utils.py @@ -0,0 +1,28 @@ +""" +Utility functions for webcrawler connector. +""" + + +def parse_webcrawler_urls(initial_urls: str | list | None) -> list[str]: + """ + Parse URLs from webcrawler INITIAL_URLS value. + + Handles both string (newline-separated) and list formats. + + Args: + initial_urls: The INITIAL_URLS value (string, list, or None) + + Returns: + List of parsed, stripped, non-empty URLs + """ + if initial_urls is None: + return [] + + if isinstance(initial_urls, str): + return [url.strip() for url in initial_urls.split("\n") if url.strip()] + elif isinstance(initial_urls, list): + return [ + url.strip() for url in initial_urls if isinstance(url, str) and url.strip() + ] + else: + return [] diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index f6f70f83b..1f9dd433d 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -143,6 +143,7 @@ export default function NewChatPage() { const queryClient = useQueryClient(); const [isInitializing, setIsInitializing] = useState(true); const [isCompletingClone, setIsCompletingClone] = useState(false); + const [cloneError, setCloneError] = useState(false); const [threadId, setThreadId] = useState(null); const [currentThread, setCurrentThread] = useState(null); const [messages, setMessages] = useState([]); @@ -333,7 +334,7 @@ export default function NewChatPage() { // Handle clone completion when thread has clone_pending flag useEffect(() => { - if (!currentThread?.clone_pending || isCompletingClone) return; + if (!currentThread?.clone_pending || isCompletingClone || cloneError) return; const completeClone = async () => { setIsCompletingClone(true); @@ -351,13 +352,14 @@ export default function NewChatPage() { } catch (error) { console.error("[NewChatPage] Failed to complete clone:", error); toast.error("Failed to copy chat content. Please try again."); + setCloneError(true); } finally { setIsCompletingClone(false); } }; completeClone(); - }, [currentThread?.clone_pending, currentThread?.id, isCompletingClone, initializeThread, queryClient]); + }, [currentThread?.clone_pending, currentThread?.id, isCompletingClone, cloneError, initializeThread, queryClient]); // Handle scroll to comment from URL query params (e.g., from inbox item click) const searchParams = useSearchParams();