diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index c5be0d1fd..c14fe5951 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -30,7 +30,6 @@ from app.db import ( async_session_maker, get_async_session, ) -from app.services.notification_service import NotificationService from app.schemas import ( GoogleDriveIndexRequest, SearchSourceConnectorBase, @@ -38,6 +37,7 @@ from app.schemas import ( SearchSourceConnectorRead, SearchSourceConnectorUpdate, ) +from app.services.notification_service import NotificationService from app.tasks.connector_indexers import ( index_airtable_records, index_clickup_tasks, @@ -948,30 +948,16 @@ async def run_slack_indexing( start_date: Start date for indexing end_date: End date for indexing """ - try: - # Index Slack messages without updating last_indexed_at (we'll do it separately) - documents_processed, error_or_warning = await index_slack_messages( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, # Don't update timestamp in the indexing function - ) - - # Only update last_indexed_at if indexing was successful (either new docs or updated docs) - if documents_processed > 0: - await _update_connector_timestamp_by_id(session, connector_id) - logger.info( - f"Slack indexing completed successfully: {documents_processed} documents processed" - ) - else: - logger.error( - f"Slack indexing failed or no documents processed: {error_or_warning}" - ) - except Exception as e: - logger.error(f"Error in background Slack indexing task: {e!s}") + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_slack_messages, + update_timestamp_func=_update_connector_timestamp_by_id, + ) async def _run_indexing_with_notifications( @@ -1168,36 +1154,27 @@ async def run_github_indexing( start_date: str, end_date: str, ): - """Runs the GitHub indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_github_repos( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"GitHub indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"GitHub indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - await session.rollback() - logger.error( - f"Critical error in run_github_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run GitHub indexing. + + Args: + session: Database session + connector_id: ID of the GitHub connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_github_repos, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for Linear indexing @@ -1278,6 +1255,7 @@ async def run_discord_indexing( ): """ Background task to run Discord indexing. + Args: session: Database session connector_id: ID of the Discord connector @@ -1286,30 +1264,16 @@ async def run_discord_indexing( start_date: Start date for indexing end_date: End date for indexing """ - try: - # Index Discord messages without updating last_indexed_at (we'll do it separately) - documents_processed, error_or_warning = await index_discord_messages( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, # Don't update timestamp in the indexing function - ) - - # Only update last_indexed_at if indexing was successful (either new docs or updated docs) - if documents_processed > 0: - await _update_connector_timestamp_by_id(session, connector_id) - logger.info( - f"Discord indexing completed successfully: {documents_processed} documents processed" - ) - else: - logger.error( - f"Discord indexing failed or no documents processed: {error_or_warning}" - ) - except Exception as e: - logger.error(f"Error in background Discord indexing task: {e!s}") + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_discord_messages, + update_timestamp_func=_update_connector_timestamp_by_id, + ) async def run_teams_indexing_with_new_session( @@ -1339,6 +1303,7 @@ async def run_teams_indexing( ): """ Background task to run Microsoft Teams indexing. + Args: session: Database session connector_id: ID of the Teams connector @@ -1347,27 +1312,18 @@ async def run_teams_indexing( start_date: Start date for indexing end_date: End date for indexing """ - try: - from app.tasks.connector_indexers.teams_indexer import index_teams_messages + from app.tasks.connector_indexers.teams_indexer import index_teams_messages - # Index Teams messages without updating last_indexed_at (we'll do it separately) - documents_processed, error_or_warning = await index_teams_messages( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, # Don't update timestamp in the indexing function - ) - - # Update last_indexed_at after successful indexing (even if 0 new docs - they were checked) - await _update_connector_timestamp_by_id(session, connector_id) - logger.info( - f"Teams indexing completed successfully: {documents_processed} documents processed. {error_or_warning or ''}" - ) - except Exception as e: - logger.error(f"Error in background Teams indexing task: {e!s}") + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_teams_messages, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for Jira indexing @@ -1397,35 +1353,27 @@ async def run_jira_indexing( start_date: str, end_date: str, ): - """Runs the Jira indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_jira_issues( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"Jira indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"Jira indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_jira_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run Jira indexing. + + Args: + session: Database session + connector_id: ID of the Jira connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_jira_issues, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for Confluence indexing @@ -1457,35 +1405,27 @@ async def run_confluence_indexing( start_date: str, end_date: str, ): - """Runs the Confluence indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_confluence_pages( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"Confluence indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"Confluence indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_confluence_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run Confluence indexing. + + Args: + session: Database session + connector_id: ID of the Confluence connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_confluence_pages, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for ClickUp indexing @@ -1515,35 +1455,27 @@ async def run_clickup_indexing( start_date: str, end_date: str, ): - """Runs the ClickUp indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_clickup_tasks( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"ClickUp indexing failed for connector {connector_id}: {error_message}" - ) - # Optionally update status in DB to indicate failure - else: - logger.info( - f"ClickUp indexing successful for connector {connector_id}. Indexed {indexed_count} tasks." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_clickup_indexing for connector {connector_id}: {e}", - exc_info=True, - ) - # Optionally update status in DB to indicate failure + """ + Background task to run ClickUp indexing. + + Args: + session: Database session + connector_id: ID of the ClickUp connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_clickup_tasks, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for Airtable indexing @@ -1833,6 +1765,7 @@ async def run_luma_indexing( ): """ Background task to run Luma indexing. + Args: session: Database session connector_id: ID of the Luma connector @@ -1841,30 +1774,16 @@ async def run_luma_indexing( start_date: Start date for indexing end_date: End date for indexing """ - try: - # Index Luma events without updating last_indexed_at (we'll do it separately) - documents_processed, error_or_warning = await index_luma_events( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, # Don't update timestamp in the indexing function - ) - - # Only update last_indexed_at if indexing was successful (either new docs or updated docs) - if documents_processed > 0: - await _update_connector_timestamp_by_id(session, connector_id) - logger.info( - f"Luma indexing completed successfully: {documents_processed} documents processed" - ) - else: - logger.error( - f"Luma indexing failed or no documents processed: {error_or_warning}" - ) - except Exception as e: - logger.error(f"Error in background Luma indexing task: {e!s}") + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_luma_events, + update_timestamp_func=_update_connector_timestamp_by_id, + ) async def run_elasticsearch_indexing_with_new_session( @@ -1895,34 +1814,27 @@ async def run_elasticsearch_indexing( start_date: str, end_date: str, ): - """Runs the Elasticsearch indexing task and updates the timestamp.""" - try: - indexed_count, error_message = await index_elasticsearch_documents( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"Elasticsearch indexing failed for connector {connector_id}: {error_message}" - ) - else: - logger.info( - f"Elasticsearch indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() - except Exception as e: - await session.rollback() - logger.error( - f"Critical error in run_elasticsearch_indexing for connector {connector_id}: {e}", - exc_info=True, - ) + """ + Background task to run Elasticsearch indexing. + + Args: + session: Database session + connector_id: ID of the Elasticsearch connector + search_space_id: ID of the search space + user_id: ID of the user + start_date: Start date for indexing + end_date: End date for indexing + """ + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_elasticsearch_documents, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for crawled web page indexing @@ -1953,6 +1865,7 @@ async def run_web_page_indexing( ): """ Background task to run Web page indexing. + Args: session: Database session connector_id: ID of the webcrawler connector @@ -1961,29 +1874,16 @@ async def run_web_page_indexing( start_date: Start date for indexing end_date: End date for indexing """ - try: - documents_processed, error_or_warning = await index_crawled_urls( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, # Don't update timestamp in the indexing function - ) - - # Only update last_indexed_at if indexing was successful (either new docs or updated docs) - if documents_processed > 0: - await _update_connector_timestamp_by_id(session, connector_id) - logger.info( - f"Web page indexing completed successfully: {documents_processed} documents processed" - ) - else: - logger.error( - f"Web page indexing failed or no documents processed: {error_or_warning}" - ) - except Exception as e: - logger.error(f"Error in background Web page indexing task: {e!s}") + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_crawled_urls, + update_timestamp_func=_update_connector_timestamp_by_id, + ) # Add new helper functions for BookStack indexing @@ -2028,29 +1928,13 @@ async def run_bookstack_indexing( """ from app.tasks.connector_indexers import index_bookstack_pages - try: - indexed_count, error_message = await index_bookstack_pages( - session, - connector_id, - search_space_id, - user_id, - start_date, - end_date, - update_last_indexed=False, - ) - if error_message: - logger.error( - f"BookStack indexing failed for connector {connector_id}: {error_message}" - ) - else: - logger.info( - f"BookStack indexing successful for connector {connector_id}. Indexed {indexed_count} documents." - ) - # Update the last indexed timestamp only on success - await _update_connector_timestamp_by_id(session, connector_id) - await session.commit() # Commit timestamp update - except Exception as e: - logger.error( - f"Critical error in run_bookstack_indexing for connector {connector_id}: {e}", - exc_info=True, - ) + await _run_indexing_with_notifications( + session=session, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + start_date=start_date, + end_date=end_date, + indexing_function=index_bookstack_pages, + update_timestamp_func=_update_connector_timestamp_by_id, + )