diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 39406fb6b..5b45baca1 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -6,7 +6,12 @@ import time from celery import Celery from celery.schedules import crontab -from celery.signals import before_task_publish, task_prerun, worker_process_init +from celery.signals import ( + before_task_publish, + task_postrun, + task_prerun, + worker_process_init, +) from dotenv import load_dotenv try: @@ -29,7 +34,7 @@ def _stamp_enqueue_time(headers=None, **_kwargs): @task_prerun.connect def _record_queue_latency(task=None, **_kwargs): - """Record queue latency and attach the generic operation to task spans.""" + """Record queue latency and stash task metadata for span enrichment.""" if task is None: return try: @@ -44,10 +49,10 @@ def _record_queue_latency(task=None, **_kwargs): getattr(request, "eta", None) or getattr(request, "expires", None) ) - if trace is not None: - with contextlib.suppress(Exception): - span = trace.get_current_span() - span.set_attribute("celery.task.operation", operation) + with contextlib.suppress(Exception): + request.surfsense_operation = operation + request.surfsense_queue = queue + request.surfsense_scheduled = scheduled headers = getattr(request, "headers", None) or {} enqueued_ns = headers.get("surfsense.enqueued_at_ns") @@ -55,6 +60,9 @@ def _record_queue_latency(task=None, **_kwargs): return elapsed_s = (time.monotonic_ns() - int(enqueued_ns)) / 1e9 + with contextlib.suppress(Exception): + request.surfsense_queue_latency_ms = elapsed_s * 1000 + ot_metrics.record_celery_queue_latency( elapsed_s, task_name=task_name, @@ -62,10 +70,30 @@ def _record_queue_latency(task=None, **_kwargs): scheduled=scheduled, operation=operation, ) - if trace is not None: - with contextlib.suppress(Exception): - span = trace.get_current_span() - span.set_attribute("celery.queue.latency_ms", elapsed_s * 1000) + except Exception: + pass + + +@task_postrun.connect +def _set_celery_span_attributes(task=None, **_kwargs): + """Attach derived queue metadata to the active Celery run span.""" + if task is None or trace is None: + return + + try: + request = getattr(task, "request", None) + if request is None: + return + + span = trace.get_current_span() + + operation = getattr(request, "surfsense_operation", None) + if operation: + span.set_attribute("celery.task.operation", operation) + + latency_ms = getattr(request, "surfsense_queue_latency_ms", None) + if latency_ms is not None: + span.set_attribute("celery.queue.latency_ms", latency_ms) except Exception: pass