From 6e03ab044a7ad10ead96a22ced444bfa5672bf0c Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 22 May 2026 17:49:02 +0530 Subject: [PATCH] feat(tasks): measure Celery queue latency --- surfsense_backend/app/celery_app.py | 61 ++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index cfb24731d..39406fb6b 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -1,16 +1,75 @@ """Celery application configuration and setup.""" +import contextlib import os +import time from celery import Celery from celery.schedules import crontab -from celery.signals import worker_process_init +from celery.signals import before_task_publish, task_prerun, worker_process_init from dotenv import load_dotenv +try: + from opentelemetry import trace +except ImportError: # pragma: no cover - optional OTel dependency + trace = None # type: ignore[assignment] + # Load environment variables load_dotenv() +@before_task_publish.connect +def _stamp_enqueue_time(headers=None, **_kwargs): + """Stamp enqueue time so workers can measure queue wait.""" + if headers is None: + return + with contextlib.suppress(Exception): + headers["surfsense.enqueued_at_ns"] = str(time.monotonic_ns()) + + +@task_prerun.connect +def _record_queue_latency(task=None, **_kwargs): + """Record queue latency and attach the generic operation to task spans.""" + if task is None: + return + try: + from app.observability import metrics as ot_metrics + + task_name = getattr(task, "name", None) or "unknown" + operation = ot_metrics.parse_celery_task_label(task_name) + request = getattr(task, "request", None) + delivery_info = getattr(request, "delivery_info", None) or {} + queue = delivery_info.get("routing_key") or "unknown" + scheduled = bool( + getattr(request, "eta", None) or getattr(request, "expires", None) + ) + + if trace is not None: + with contextlib.suppress(Exception): + span = trace.get_current_span() + span.set_attribute("celery.task.operation", operation) + + headers = getattr(request, "headers", None) or {} + enqueued_ns = headers.get("surfsense.enqueued_at_ns") + if enqueued_ns is None: + return + + elapsed_s = (time.monotonic_ns() - int(enqueued_ns)) / 1e9 + ot_metrics.record_celery_queue_latency( + elapsed_s, + task_name=task_name, + queue=queue, + scheduled=scheduled, + operation=operation, + ) + if trace is not None: + with contextlib.suppress(Exception): + span = trace.get_current_span() + span.set_attribute("celery.queue.latency_ms", elapsed_s * 1000) + except Exception: + pass + + @worker_process_init.connect def init_worker(**kwargs): """Initialize the LLM Router and Image Gen Router when a Celery worker process starts.