diff --git a/surfsense_backend/app/indexing_pipeline/exceptions.py b/surfsense_backend/app/indexing_pipeline/exceptions.py index ba6b68520..8c9c6f2d5 100644 --- a/surfsense_backend/app/indexing_pipeline/exceptions.py +++ b/surfsense_backend/app/indexing_pipeline/exceptions.py @@ -38,7 +38,6 @@ EMBEDDING_ERRORS = ( RuntimeError, # local device failure or API backend normalization OSError, # model files missing or corrupted (local backends) MemoryError, # document too large for available RAM - ValueError, # invalid input to encode() ) @@ -57,13 +56,9 @@ class PipelineMessages: LLM_UNPROCESSABLE = "Document exceeds the LLM context window even after optimization." LLM_RESPONSE = "LLM returned an invalid response." - DB_TRANSIENT = "Database error during indexing. Will retry on next sync." - DB_SESSION_DEAD = "Database session is in an unrecoverable state." - EMBEDDING_FAILED = "Embedding failed. Check your embedding model configuration or service." EMBEDDING_MODEL = "Embedding model files are missing or corrupted." EMBEDDING_MEMORY = "Not enough memory to embed this document." - EMBEDDING_INPUT = "Document content is invalid for the embedding model." CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk." @@ -121,8 +116,6 @@ def embedding_message(exc: Exception) -> str: return PipelineMessages.EMBEDDING_MODEL if isinstance(exc, MemoryError): return PipelineMessages.EMBEDDING_MEMORY - if isinstance(exc, ValueError): - return PipelineMessages.EMBEDDING_INPUT return safe_exception_message(exc) except Exception: return "Something went wrong when generating the embedding." diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 23f4022b0..9c51d99da 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -23,6 +23,7 @@ from app.indexing_pipeline.exceptions import ( ) from app.indexing_pipeline.pipeline_logger import ( PipelineLogContext, + log_batch_aborted, log_chunking_overflow, log_doc_skipped_unknown, log_document_queued, @@ -129,13 +130,17 @@ class IndexingPipelineService: try: await self.session.commit() return documents - except IntegrityError as e: + except IntegrityError: # A concurrent worker committed a document with the same content_hash # or unique_identifier_hash between our check and our INSERT. # The document already exists — roll back and let the next sync run handle it. log_race_condition(batch_ctx) await self.session.rollback() return [] + except Exception as e: + log_batch_aborted(batch_ctx, e) + await self.session.rollback() + return [] async def index( self, document: Document, connector_doc: ConnectorDocument, llm diff --git a/surfsense_backend/app/indexing_pipeline/pipeline_logger.py b/surfsense_backend/app/indexing_pipeline/pipeline_logger.py index fb1c7e5e0..a445e4e49 100644 --- a/surfsense_backend/app/indexing_pipeline/pipeline_logger.py +++ b/surfsense_backend/app/indexing_pipeline/pipeline_logger.py @@ -17,7 +17,6 @@ class LogMessages: DOCUMENT_QUEUED = "New document queued for indexing." DOCUMENT_UPDATED = "Document content changed, re-queued for indexing." DOCUMENT_REQUEUED = "Stuck document re-queued for indexing." - DOC_SKIPPED_DB = "Transient DB error — document skipped, will retry on next sync." DOC_SKIPPED_UNKNOWN = "Unexpected error — document skipped." BATCH_ABORTED = "Fatal DB error — aborting prepare batch." RACE_CONDITION = "Concurrent worker beat us to the commit — rolling back batch." @@ -29,8 +28,6 @@ class LogMessages: LLM_PERMANENT = "Permanent LLM error — document marked failed." EMBEDDING_FAILED = "Embedding error — document marked failed." CHUNKING_OVERFLOW = "Chunking overflow — document marked failed." - DB_TRANSIENT = "Transient DB error — document marked failed." - DB_FATAL = "Fatal DB error — session is dead, re-raising." UNEXPECTED = "Unexpected error — document marked failed." @@ -87,6 +84,10 @@ def log_race_condition(ctx: PipelineLogContext) -> None: _safe_log(logger.warning, LogMessages.RACE_CONDITION, ctx) +def log_batch_aborted(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.error, LogMessages.BATCH_ABORTED, ctx, exc_info=exc, error=exc) + + # ── index ───────────────────────────────────────────────────────────────────── def log_index_started(ctx: PipelineLogContext) -> None: @@ -98,7 +99,7 @@ def log_index_success(ctx: PipelineLogContext, chunk_count: int) -> None: def log_retryable_llm_error(ctx: PipelineLogContext, exc: Exception) -> None: - _safe_log(logger.warning, LogMessages.LLM_RETRYABLE, ctx, error=exc) + _safe_log(logger.warning, LogMessages.LLM_RETRYABLE, ctx, exc_info=exc, error=exc) def log_permanent_llm_error(ctx: PipelineLogContext, exc: Exception) -> None: