diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 2339647ea..2aa92bd9b 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -57,6 +57,7 @@ from app.indexing_pipeline.pipeline_logger import ( log_retryable_llm_error, log_unexpected_error, ) +from app.observability import metrics as ot_metrics, otel as ot from app.utils.perf import get_perf_logger @@ -362,6 +363,16 @@ class IndexingPipelineService: ) perf = get_perf_logger() t_index = time.perf_counter() + document_type = ( + document.document_type.value + if getattr(document, "document_type", None) + else None + ) + persist_span_cm = ot.kb_persist_span( + document_type=document_type, + ) + persist_span = persist_span_cm.__enter__() + outcome_status = "failed" try: log_index_started(ctx) document.status = DocumentStatus.processing() @@ -429,11 +440,13 @@ class IndexingPipelineService: time.perf_counter() - t_index, ) log_index_success(ctx, chunk_count=len(chunks)) + outcome_status = "success" await self._enqueue_ai_sort_if_enabled(document) except RETRYABLE_LLM_ERRORS as e: log_retryable_llm_error(ctx, e) + outcome_status = "requeued" await rollback_and_persist_failure( self.session, document, llm_retryable_message(e) ) @@ -465,6 +478,17 @@ class IndexingPipelineService: with contextlib.suppress(Exception): await self.session.refresh(document) + with contextlib.suppress(Exception): + persist_span.set_attribute("indexing.status", outcome_status) + ot_metrics.record_indexing_document_duration( + time.perf_counter() - t_index, + document_type=document_type, + ) + ot_metrics.record_indexing_document_outcome( + document_type=document_type, + status=outcome_status, + ) + persist_span_cm.__exit__(None, None, None) return document async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 08d96cfa0..86296c3d9 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -1,14 +1,43 @@ """Celery tasks for connector indexing.""" import logging +import time import traceback +from collections.abc import Awaitable, Callable + +from celery import current_task from app.celery_app import celery_app -from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task +from app.observability import metrics as ot_metrics +from app.tasks.celery_tasks import ( + get_celery_session_maker, + run_async_celery_task as _run_async_celery_task, +) logger = logging.getLogger(__name__) +def run_async_celery_task[T](coro_factory: Callable[[], Awaitable[T]]) -> T: + """Run connector sync work and record aggregate connector metrics.""" + task_name = getattr(current_task, "name", None) or "unknown" + t0 = time.perf_counter() + status = "failed" + try: + result = _run_async_celery_task(coro_factory) + status = "success" + return result + finally: + elapsed_s = time.perf_counter() - t0 + ot_metrics.record_connector_sync_duration( + elapsed_s, + connector_type=task_name, + ) + ot_metrics.record_connector_sync_outcome( + connector_type=task_name, + status=status, + ) + + def _handle_greenlet_error(e: Exception, task_name: str, connector_id: int) -> None: """ Handle greenlet_spawn errors with detailed logging for debugging.