diff --git a/surfsense_backend/app/indexing_pipeline/exceptions.py b/surfsense_backend/app/indexing_pipeline/exceptions.py index bf4d80062..14f2df066 100644 --- a/surfsense_backend/app/indexing_pipeline/exceptions.py +++ b/surfsense_backend/app/indexing_pipeline/exceptions.py @@ -12,7 +12,7 @@ from litellm.exceptions import ( Timeout, UnprocessableEntityError, ) -from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError, SQLAlchemyError +from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError from sqlalchemy.orm.exc import DetachedInstanceError # Tuples for use directly in except clauses. @@ -48,10 +48,10 @@ FATAL_DB_ERRORS = ( # (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError). EMBEDDING_ERRORS = ( - RuntimeError, - OSError, - MemoryError, - ValueError, + RuntimeError, # local device failure or API backend normalization + OSError, # model files missing or corrupted (local backends) + MemoryError, # document too large for available RAM + ValueError, # invalid input to encode() ) @@ -81,45 +81,61 @@ class PipelineMessages: CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk." +def safe_exception_message(exc: Exception) -> str: + try: + return str(exc) + except Exception: + return "Something went wrong during indexing. Error details could not be retrieved." + + def llm_retryable_message(exc: Exception) -> str: - if isinstance(exc, RateLimitError): - return PipelineMessages.RATE_LIMIT - if isinstance(exc, Timeout): - return PipelineMessages.LLM_TIMEOUT - if isinstance(exc, ServiceUnavailableError): - return PipelineMessages.LLM_UNAVAILABLE - if isinstance(exc, BadGatewayError): - return PipelineMessages.LLM_BAD_GATEWAY - if isinstance(exc, InternalServerError): - return PipelineMessages.LLM_SERVER_ERROR - if isinstance(exc, APIConnectionError): - return PipelineMessages.LLM_CONNECTION - return str(exc) + try: + if isinstance(exc, RateLimitError): + return PipelineMessages.RATE_LIMIT + if isinstance(exc, Timeout): + return PipelineMessages.LLM_TIMEOUT + if isinstance(exc, ServiceUnavailableError): + return PipelineMessages.LLM_UNAVAILABLE + if isinstance(exc, BadGatewayError): + return PipelineMessages.LLM_BAD_GATEWAY + if isinstance(exc, InternalServerError): + return PipelineMessages.LLM_SERVER_ERROR + if isinstance(exc, APIConnectionError): + return PipelineMessages.LLM_CONNECTION + return safe_exception_message(exc) + except Exception: + return "Something went wrong when calling the LLM." def llm_permanent_message(exc: Exception) -> str: - if isinstance(exc, AuthenticationError): - return PipelineMessages.LLM_AUTH - if isinstance(exc, PermissionDeniedError): - return PipelineMessages.LLM_PERMISSION - if isinstance(exc, NotFoundError): - return PipelineMessages.LLM_NOT_FOUND - if isinstance(exc, BadRequestError): - return PipelineMessages.LLM_BAD_REQUEST - if isinstance(exc, UnprocessableEntityError): - return PipelineMessages.LLM_UNPROCESSABLE - if isinstance(exc, APIResponseValidationError): - return PipelineMessages.LLM_RESPONSE - return str(exc) + try: + if isinstance(exc, AuthenticationError): + return PipelineMessages.LLM_AUTH + if isinstance(exc, PermissionDeniedError): + return PipelineMessages.LLM_PERMISSION + if isinstance(exc, NotFoundError): + return PipelineMessages.LLM_NOT_FOUND + if isinstance(exc, BadRequestError): + return PipelineMessages.LLM_BAD_REQUEST + if isinstance(exc, UnprocessableEntityError): + return PipelineMessages.LLM_UNPROCESSABLE + if isinstance(exc, APIResponseValidationError): + return PipelineMessages.LLM_RESPONSE + return safe_exception_message(exc) + except Exception: + return "Something went wrong when calling the LLM." def embedding_message(exc: Exception) -> str: - if isinstance(exc, RuntimeError): - return PipelineMessages.EMBEDDING_FAILED - if isinstance(exc, OSError): - return PipelineMessages.EMBEDDING_MODEL - if isinstance(exc, MemoryError): - return PipelineMessages.EMBEDDING_MEMORY - if isinstance(exc, ValueError): - return PipelineMessages.EMBEDDING_INPUT - return str(exc) + try: + if isinstance(exc, RuntimeError): + return PipelineMessages.EMBEDDING_FAILED + if isinstance(exc, OSError): + return PipelineMessages.EMBEDDING_MODEL + if isinstance(exc, MemoryError): + return PipelineMessages.EMBEDDING_MEMORY + if isinstance(exc, ValueError): + return PipelineMessages.EMBEDDING_INPUT + return safe_exception_message(exc) + except Exception: + return "Something went wrong when generating the embedding." diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 3a9a1de5c..2fff9358c 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -21,6 +21,26 @@ from app.indexing_pipeline.exceptions import ( embedding_message, llm_permanent_message, llm_retryable_message, + safe_exception_message, +) +from app.indexing_pipeline.pipeline_logger import ( + PipelineLogContext, + log_batch_aborted, + log_chunking_overflow, + log_db_fatal_error, + log_db_transient_error, + log_doc_skipped_db, + log_doc_skipped_unknown, + log_document_queued, + log_document_requeued, + log_document_updated, + log_embedding_error, + log_index_started, + log_index_success, + log_permanent_llm_error, + log_race_condition, + log_retryable_llm_error, + log_unexpected_error, ) @@ -38,8 +58,18 @@ class IndexingPipelineService: """ documents = [] seen_hashes: set[str] = set() + batch_ctx = PipelineLogContext( + connector_id=connector_docs[0].connector_id if connector_docs else 0, + search_space_id=connector_docs[0].search_space_id if connector_docs else 0, + unique_id="batch", + ) for connector_doc in connector_docs: + ctx = PipelineLogContext( + connector_id=connector_doc.connector_id, + search_space_id=connector_doc.search_space_id, + unique_id=connector_doc.unique_id, + ) try: unique_identifier_hash = compute_unique_identifier_hash(connector_doc) content_hash = compute_content_hash(connector_doc) @@ -62,6 +92,7 @@ class IndexingPipelineService: existing.status = DocumentStatus.pending() existing.updated_at = datetime.now(UTC) documents.append(existing) + log_document_requeued(ctx) continue existing.title = connector_doc.title @@ -71,6 +102,7 @@ class IndexingPipelineService: existing.updated_at = datetime.now(UTC) existing.status = DocumentStatus.pending() documents.append(existing) + log_document_updated(ctx) continue duplicate = await self.session.execute( @@ -95,23 +127,27 @@ class IndexingPipelineService: ) self.session.add(document) documents.append(document) + log_document_queued(ctx) - except FATAL_DB_ERRORS: - # Session is broken — abort the entire batch, nothing else is safe to do. + except FATAL_DB_ERRORS as e: + log_batch_aborted(ctx, e) await self.session.rollback() return [] - except TRANSIENT_DB_ERRORS: + except TRANSIENT_DB_ERRORS as e: + log_doc_skipped_db(ctx, e) await self.session.rollback() - except Exception: + except Exception as e: + log_doc_skipped_unknown(ctx, e) continue try: await self.session.commit() return documents - except IntegrityError: + except IntegrityError as e: # A concurrent worker committed a document with the same content_hash # or unique_identifier_hash between our check and our INSERT. # The document already exists — roll back and let the next sync run handle it. + log_race_condition(batch_ctx) await self.session.rollback() return [] @@ -121,7 +157,14 @@ class IndexingPipelineService: """ Run summarization, embedding, and chunking for a document and persist the results. """ + ctx = PipelineLogContext( + connector_id=connector_doc.connector_id, + search_space_id=connector_doc.search_space_id, + unique_id=connector_doc.unique_id, + doc_id=document.id, + ) try: + log_index_started(ctx) document.status = DocumentStatus.processing() await self.session.commit() @@ -154,24 +197,32 @@ class IndexingPipelineService: document.updated_at = datetime.now(UTC) document.status = DocumentStatus.ready() await self.session.commit() + log_index_success(ctx, chunk_count=len(chunks)) except RETRYABLE_LLM_ERRORS as e: + log_retryable_llm_error(ctx, e) await rollback_and_persist_failure(self.session, document, llm_retryable_message(e)) except PERMANENT_LLM_ERRORS as e: + log_permanent_llm_error(ctx, e) await rollback_and_persist_failure(self.session, document, llm_permanent_message(e)) except EMBEDDING_ERRORS as e: + log_embedding_error(ctx, e) await rollback_and_persist_failure(self.session, document, embedding_message(e)) - except RecursionError: + except RecursionError as e: + log_chunking_overflow(ctx, e) await rollback_and_persist_failure(self.session, document, PipelineMessages.CHUNKING_OVERFLOW) - except FATAL_DB_ERRORS: + except FATAL_DB_ERRORS as e: + log_db_fatal_error(ctx, e) raise - except TRANSIENT_DB_ERRORS: + except TRANSIENT_DB_ERRORS as e: + log_db_transient_error(ctx, e) await rollback_and_persist_failure(self.session, document, PipelineMessages.DB_TRANSIENT) except Exception as e: - await rollback_and_persist_failure(self.session, document, str(e)) + log_unexpected_error(ctx, e) + await rollback_and_persist_failure(self.session, document, safe_exception_message(e)) diff --git a/surfsense_backend/app/indexing_pipeline/pipeline_logger.py b/surfsense_backend/app/indexing_pipeline/pipeline_logger.py new file mode 100644 index 000000000..c0df3f130 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/pipeline_logger.py @@ -0,0 +1,133 @@ +import logging +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +@dataclass +class PipelineLogContext: + connector_id: int + search_space_id: int + unique_id: str # always available from ConnectorDocument + doc_id: int | None = None # set once the DB row exists (index phase only) + + +class LogMessages: + # prepare_for_indexing + DOCUMENT_QUEUED = "New document queued for indexing." + DOCUMENT_UPDATED = "Document content changed, re-queued for indexing." + DOCUMENT_REQUEUED = "Stuck document re-queued for indexing." + DOC_SKIPPED_DB = "Transient DB error — document skipped, will retry on next sync." + DOC_SKIPPED_UNKNOWN = "Unexpected error — document skipped." + BATCH_ABORTED = "Fatal DB error — aborting prepare batch." + RACE_CONDITION = "Concurrent worker beat us to the commit — rolling back batch." + + # index + INDEX_STARTED = "Document indexing started." + INDEX_SUCCESS = "Document indexed successfully." + LLM_RETRYABLE = "Retryable LLM error — document marked failed, will retry on next sync." + LLM_PERMANENT = "Permanent LLM error — document marked failed." + EMBEDDING_FAILED = "Embedding error — document marked failed." + CHUNKING_OVERFLOW = "Chunking overflow — document marked failed." + DB_TRANSIENT = "Transient DB error — document marked failed." + DB_FATAL = "Fatal DB error — session is dead, re-raising." + UNEXPECTED = "Unexpected error — document marked failed." + + +def _format_context(ctx: PipelineLogContext) -> str: + parts = [ + f"connector_id={ctx.connector_id}", + f"search_space_id={ctx.search_space_id}", + f"unique_id={ctx.unique_id}", + ] + if ctx.doc_id is not None: + parts.append(f"doc_id={ctx.doc_id}") + return " ".join(parts) + + +def _build_message(msg: str, ctx: PipelineLogContext, **extra) -> str: + try: + parts = [msg, _format_context(ctx)] + for key, val in extra.items(): + parts.append(f"{key}={val}") + return " ".join(parts) + except Exception: + return msg + + +def _safe_log(level_fn, msg: str, ctx: PipelineLogContext, exc_info=None, **extra) -> None: + # Logging must never raise — a broken log call inside an except block would + # chain with the original exception and mask it entirely. + try: + message = _build_message(msg, ctx, **extra) + level_fn(message, exc_info=exc_info) + except Exception: + pass + + +# ── prepare_for_indexing ────────────────────────────────────────────────────── + +def log_document_queued(ctx: PipelineLogContext) -> None: + _safe_log(logger.info, LogMessages.DOCUMENT_QUEUED, ctx) + + +def log_document_updated(ctx: PipelineLogContext) -> None: + _safe_log(logger.info, LogMessages.DOCUMENT_UPDATED, ctx) + + +def log_document_requeued(ctx: PipelineLogContext) -> None: + _safe_log(logger.info, LogMessages.DOCUMENT_REQUEUED, ctx) + + +def log_doc_skipped_db(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.warning, LogMessages.DOC_SKIPPED_DB, ctx, error=exc) + + +def log_doc_skipped_unknown(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.warning, LogMessages.DOC_SKIPPED_UNKNOWN, ctx, exc_info=exc, error=exc) + + +def log_batch_aborted(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.critical, LogMessages.BATCH_ABORTED, ctx, error=exc) + + +def log_race_condition(ctx: PipelineLogContext) -> None: + _safe_log(logger.warning, LogMessages.RACE_CONDITION, ctx) + + +# ── index ───────────────────────────────────────────────────────────────────── + +def log_index_started(ctx: PipelineLogContext) -> None: + _safe_log(logger.info, LogMessages.INDEX_STARTED, ctx) + + +def log_index_success(ctx: PipelineLogContext, chunk_count: int) -> None: + _safe_log(logger.info, LogMessages.INDEX_SUCCESS, ctx, chunk_count=chunk_count) + + +def log_retryable_llm_error(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.warning, LogMessages.LLM_RETRYABLE, ctx, error=exc) + + +def log_permanent_llm_error(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.error, LogMessages.LLM_PERMANENT, ctx, error=exc) + + +def log_embedding_error(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.error, LogMessages.EMBEDDING_FAILED, ctx, error=exc) + + +def log_chunking_overflow(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.error, LogMessages.CHUNKING_OVERFLOW, ctx, error=exc) + + +def log_db_transient_error(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.warning, LogMessages.DB_TRANSIENT, ctx, error=exc) + + +def log_db_fatal_error(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.critical, LogMessages.DB_FATAL, ctx, error=exc) + + +def log_unexpected_error(ctx: PipelineLogContext, exc: Exception) -> None: + _safe_log(logger.error, LogMessages.UNEXPECTED, ctx, exc_info=exc, error=exc)