feat(tasks): enhance Celery task telemetry with queue metadata and latency tracking

This commit is contained in:
Anish Sarkar 2026-05-22 18:19:38 +05:30
parent 7a3b278b75
commit 51e4d8b489

View file

@ -6,7 +6,12 @@ import time
from celery import Celery from celery import Celery
from celery.schedules import crontab from celery.schedules import crontab
from celery.signals import before_task_publish, task_prerun, worker_process_init from celery.signals import (
before_task_publish,
task_postrun,
task_prerun,
worker_process_init,
)
from dotenv import load_dotenv from dotenv import load_dotenv
try: try:
@ -29,7 +34,7 @@ def _stamp_enqueue_time(headers=None, **_kwargs):
@task_prerun.connect @task_prerun.connect
def _record_queue_latency(task=None, **_kwargs): def _record_queue_latency(task=None, **_kwargs):
"""Record queue latency and attach the generic operation to task spans.""" """Record queue latency and stash task metadata for span enrichment."""
if task is None: if task is None:
return return
try: try:
@ -44,10 +49,10 @@ def _record_queue_latency(task=None, **_kwargs):
getattr(request, "eta", None) or getattr(request, "expires", None) getattr(request, "eta", None) or getattr(request, "expires", None)
) )
if trace is not None: with contextlib.suppress(Exception):
with contextlib.suppress(Exception): request.surfsense_operation = operation
span = trace.get_current_span() request.surfsense_queue = queue
span.set_attribute("celery.task.operation", operation) request.surfsense_scheduled = scheduled
headers = getattr(request, "headers", None) or {} headers = getattr(request, "headers", None) or {}
enqueued_ns = headers.get("surfsense.enqueued_at_ns") enqueued_ns = headers.get("surfsense.enqueued_at_ns")
@ -55,6 +60,9 @@ def _record_queue_latency(task=None, **_kwargs):
return return
elapsed_s = (time.monotonic_ns() - int(enqueued_ns)) / 1e9 elapsed_s = (time.monotonic_ns() - int(enqueued_ns)) / 1e9
with contextlib.suppress(Exception):
request.surfsense_queue_latency_ms = elapsed_s * 1000
ot_metrics.record_celery_queue_latency( ot_metrics.record_celery_queue_latency(
elapsed_s, elapsed_s,
task_name=task_name, task_name=task_name,
@ -62,10 +70,30 @@ def _record_queue_latency(task=None, **_kwargs):
scheduled=scheduled, scheduled=scheduled,
operation=operation, operation=operation,
) )
if trace is not None: except Exception:
with contextlib.suppress(Exception): pass
span = trace.get_current_span()
span.set_attribute("celery.queue.latency_ms", elapsed_s * 1000)
@task_postrun.connect
def _set_celery_span_attributes(task=None, **_kwargs):
"""Attach derived queue metadata to the active Celery run span."""
if task is None or trace is None:
return
try:
request = getattr(task, "request", None)
if request is None:
return
span = trace.get_current_span()
operation = getattr(request, "surfsense_operation", None)
if operation:
span.set_attribute("celery.task.operation", operation)
latency_ms = getattr(request, "surfsense_queue_latency_ms", None)
if latency_ms is not None:
span.set_attribute("celery.queue.latency_ms", latency_ms)
except Exception: except Exception:
pass pass