From bbb2abfc027e61c92974e3371b93c483eadca5ea Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 17 Oct 2025 02:44:44 +0530 Subject: [PATCH] fix: ran formatter as per coderrabbitai --- .../app/connectors/elasticsearch_connector.py | 10 +++++-- .../elasticsearch_indexer.py | 27 ++++++++++++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/surfsense_backend/app/connectors/elasticsearch_connector.py b/surfsense_backend/app/connectors/elasticsearch_connector.py index 23b58c4f6..1a4f53a46 100644 --- a/surfsense_backend/app/connectors/elasticsearch_connector.py +++ b/surfsense_backend/app/connectors/elasticsearch_connector.py @@ -119,7 +119,11 @@ class ElasticsearchConnector: total_hits = response.get("hits", {}).get("total", {}) # normalize total value (could be dict or int depending on server) - total_val = total_hits.get("value", total_hits) if isinstance(total_hits, dict) else total_hits + total_val = ( + total_hits.get("value", total_hits) + if isinstance(total_hits, dict) + else total_hits + ) logger.info( f"Successfully searched index '{index}', found {total_val} results" ) @@ -213,7 +217,9 @@ class ElasticsearchConnector: # Continue scrolling if scroll_id: - response = await self.client.scroll(scroll_id=scroll_id, scroll=scroll_timeout) + response = await self.client.scroll( + scroll_id=scroll_id, scroll=scroll_timeout + ) scroll_id = response.get("_scroll_id") hits = response.get("hits", {}).get("hits", []) diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 43dccf093..b99c77e95 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -24,6 +24,7 @@ from .base import check_document_by_unique_identifier, check_duplicate_document_ logger = logging.getLogger(__name__) + async def index_elasticsearch_documents( session: AsyncSession, connector_id: int, @@ -75,7 +76,10 @@ async def index_elasticsearch_documents( error_msg = f"Elasticsearch connector with ID {connector_id} not found" logger.error(error_msg) await task_logger.log_task_failure( - log_entry, "Connector not found", error_msg, {"connector_id": connector_id} + log_entry, + "Connector not found", + error_msg, + {"connector_id": connector_id}, ) return 0, error_msg @@ -152,7 +156,11 @@ async def index_elasticsearch_documents( await task_logger.log_task_progress( log_entry, "Starting scroll search", - {"index": index_name, "stage": "scroll_start", "max_documents": max_documents}, + { + "index": index_name, + "stage": "scroll_start", + "max_documents": max_documents, + }, ) # Use scroll search for large result sets async for hit in es_connector.scroll_search( @@ -210,7 +218,9 @@ async def index_elasticsearch_documents( # Build source-unique identifier and hash (prefer source id dedupe) source_identifier = f"{hit.get('_index', index_name)}:{doc_id}" unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.ELASTICSEARCH_CONNECTOR, source_identifier, search_space_id + DocumentType.ELASTICSEARCH_CONNECTOR, + source_identifier, + search_space_id, ) # Two-step duplicate detection: first by source-unique id, then by content hash @@ -245,7 +255,7 @@ async def index_elasticsearch_documents( if documents_processed % 10 == 0: await session.commit() continue - + # Create document document = Document( title=title, @@ -278,7 +288,10 @@ async def index_elasticsearch_documents( log_entry, "Document processing error", msg, - {"document_id": hit.get("_id", "unknown"), "error_type": type(e).__name__}, + { + "document_id": hit.get("_id", "unknown"), + "error_type": type(e).__name__, + }, ) continue @@ -297,7 +310,9 @@ async def index_elasticsearch_documents( # Update last indexed timestamp if requested if update_last_indexed and documents_processed > 0: # connector.last_indexed_at = datetime.now() - connector.last_indexed_at = datetime.now(UTC).isoformat().replace("+00:00", "Z") + connector.last_indexed_at = ( + datetime.now(UTC).isoformat().replace("+00:00", "Z") + ) await session.commit() await task_logger.log_task_progress( log_entry,