fix: ran formatter as per coderrabbitai

This commit is contained in:
Anish Sarkar 2025-10-17 02:44:44 +05:30
parent 0ff1b586a2
commit bbb2abfc02
2 changed files with 29 additions and 8 deletions

View file

@ -119,7 +119,11 @@ class ElasticsearchConnector:
total_hits = response.get("hits", {}).get("total", {}) total_hits = response.get("hits", {}).get("total", {})
# normalize total value (could be dict or int depending on server) # normalize total value (could be dict or int depending on server)
total_val = total_hits.get("value", total_hits) if isinstance(total_hits, dict) else total_hits total_val = (
total_hits.get("value", total_hits)
if isinstance(total_hits, dict)
else total_hits
)
logger.info( logger.info(
f"Successfully searched index '{index}', found {total_val} results" f"Successfully searched index '{index}', found {total_val} results"
) )
@ -213,7 +217,9 @@ class ElasticsearchConnector:
# Continue scrolling # Continue scrolling
if scroll_id: if scroll_id:
response = await self.client.scroll(scroll_id=scroll_id, scroll=scroll_timeout) response = await self.client.scroll(
scroll_id=scroll_id, scroll=scroll_timeout
)
scroll_id = response.get("_scroll_id") scroll_id = response.get("_scroll_id")
hits = response.get("hits", {}).get("hits", []) hits = response.get("hits", {}).get("hits", [])

View file

@ -24,6 +24,7 @@ from .base import check_document_by_unique_identifier, check_duplicate_document_
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def index_elasticsearch_documents( async def index_elasticsearch_documents(
session: AsyncSession, session: AsyncSession,
connector_id: int, connector_id: int,
@ -75,7 +76,10 @@ async def index_elasticsearch_documents(
error_msg = f"Elasticsearch connector with ID {connector_id} not found" error_msg = f"Elasticsearch connector with ID {connector_id} not found"
logger.error(error_msg) logger.error(error_msg)
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, "Connector not found", error_msg, {"connector_id": connector_id} log_entry,
"Connector not found",
error_msg,
{"connector_id": connector_id},
) )
return 0, error_msg return 0, error_msg
@ -152,7 +156,11 @@ async def index_elasticsearch_documents(
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
"Starting scroll search", "Starting scroll search",
{"index": index_name, "stage": "scroll_start", "max_documents": max_documents}, {
"index": index_name,
"stage": "scroll_start",
"max_documents": max_documents,
},
) )
# Use scroll search for large result sets # Use scroll search for large result sets
async for hit in es_connector.scroll_search( async for hit in es_connector.scroll_search(
@ -210,7 +218,9 @@ async def index_elasticsearch_documents(
# Build source-unique identifier and hash (prefer source id dedupe) # Build source-unique identifier and hash (prefer source id dedupe)
source_identifier = f"{hit.get('_index', index_name)}:{doc_id}" source_identifier = f"{hit.get('_index', index_name)}:{doc_id}"
unique_identifier_hash = generate_unique_identifier_hash( unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.ELASTICSEARCH_CONNECTOR, source_identifier, search_space_id DocumentType.ELASTICSEARCH_CONNECTOR,
source_identifier,
search_space_id,
) )
# Two-step duplicate detection: first by source-unique id, then by content hash # Two-step duplicate detection: first by source-unique id, then by content hash
@ -245,7 +255,7 @@ async def index_elasticsearch_documents(
if documents_processed % 10 == 0: if documents_processed % 10 == 0:
await session.commit() await session.commit()
continue continue
# Create document # Create document
document = Document( document = Document(
title=title, title=title,
@ -278,7 +288,10 @@ async def index_elasticsearch_documents(
log_entry, log_entry,
"Document processing error", "Document processing error",
msg, msg,
{"document_id": hit.get("_id", "unknown"), "error_type": type(e).__name__}, {
"document_id": hit.get("_id", "unknown"),
"error_type": type(e).__name__,
},
) )
continue continue
@ -297,7 +310,9 @@ async def index_elasticsearch_documents(
# Update last indexed timestamp if requested # Update last indexed timestamp if requested
if update_last_indexed and documents_processed > 0: if update_last_indexed and documents_processed > 0:
# connector.last_indexed_at = datetime.now() # connector.last_indexed_at = datetime.now()
connector.last_indexed_at = datetime.now(UTC).isoformat().replace("+00:00", "Z") connector.last_indexed_at = (
datetime.now(UTC).isoformat().replace("+00:00", "Z")
)
await session.commit() await session.commit()
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,