From 7a3b278b7567041f48c06cfe09e23ca7af9a69f5 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 22 May 2026 17:50:02 +0530 Subject: [PATCH] feat(connectors): add retry and auth telemetry events --- .../routes/search_source_connectors_routes.py | 24 ++++++++++++++++++- .../app/tasks/celery_tasks/connector_tasks.py | 14 +++++++---- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 4d7e6b2ef..3060fdf4a 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -43,7 +43,7 @@ from app.db import ( async_session_maker, get_async_session, ) -from app.observability import metrics as ot_metrics +from app.observability import metrics as ot_metrics, otel as ot from app.schemas import ( GoogleDriveIndexRequest, MCPConnectorCreate, @@ -1246,6 +1246,12 @@ async def _persist_auth_expired(session: AsyncSession, connector_id: int) -> Non """Flag a connector as auth_expired so the frontend shows a re-auth prompt.""" from sqlalchemy.orm.attributes import flag_modified + ot.add_event( + "connector.auth.expired", + { + "error.category": "auth_failed", + }, + ) try: result = await session.execute( select(SearchSourceConnector).where( @@ -1305,6 +1311,13 @@ async def _run_indexing_with_notifications( try: connector_lock_acquired = acquire_connector_indexing_lock(connector_id) if not connector_lock_acquired: + ot.add_event( + "connector.sync.skipped", + { + "skip.reason": "lock_contention", + "error.category": "lock_contention", + }, + ) logger.info( f"Skipping indexing for connector {connector_id} " "(another worker already holds Redis connector lock)" @@ -1375,6 +1388,15 @@ async def _run_indexing_with_notifications( ) -> None: """Callback to update notification during API retries (rate limits, etc.)""" nonlocal notification + ot.add_event( + "connector.retry.scheduled", + { + "retry.reason": retry_reason, + "retry.attempt": attempt, + "retry.max": max_attempts, + "retry.delay_ms": int(wait_seconds * 1000), + }, + ) if notification: try: await session.refresh(notification) diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index e0f0f09c9..50f757473 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -22,15 +22,18 @@ def run_async_celery_task[T](coro_factory: Callable[[], Awaitable[T]]) -> T: task_name = getattr(current_task, "name", None) or "unknown" t0 = time.perf_counter() status = "failed" + error_category: str | None = None try: with ot.connector_sync_span(connector_type=task_name) as sp: - result = _run_async_celery_task(coro_factory) - sp.set_attribute("connector.status", "success") + try: + result = _run_async_celery_task(coro_factory) + sp.set_attribute("connector.status", "success") + except Exception as exc: + error_category = ot_metrics.categorize_exception(exc) + sp.set_attribute("connector.error.category", error_category) + raise status = "success" return result - except Exception: - status = "failed" - raise finally: elapsed_s = time.perf_counter() - t0 ot_metrics.record_connector_sync_duration( @@ -40,6 +43,7 @@ def run_async_celery_task[T](coro_factory: Callable[[], Awaitable[T]]) -> T: ot_metrics.record_connector_sync_outcome( connector_type=task_name, status=status, + error_category=error_category, )