feat(tasks): measure Celery queue latency

This commit is contained in:
Anish Sarkar 2026-05-22 17:49:02 +05:30
parent dc893281ba
commit 6e03ab044a

View file

@ -1,16 +1,75 @@
"""Celery application configuration and setup.""" """Celery application configuration and setup."""
import contextlib
import os import os
import time
from celery import Celery from celery import Celery
from celery.schedules import crontab from celery.schedules import crontab
from celery.signals import worker_process_init from celery.signals import before_task_publish, task_prerun, worker_process_init
from dotenv import load_dotenv from dotenv import load_dotenv
try:
from opentelemetry import trace
except ImportError: # pragma: no cover - optional OTel dependency
trace = None # type: ignore[assignment]
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
@before_task_publish.connect
def _stamp_enqueue_time(headers=None, **_kwargs):
"""Stamp enqueue time so workers can measure queue wait."""
if headers is None:
return
with contextlib.suppress(Exception):
headers["surfsense.enqueued_at_ns"] = str(time.monotonic_ns())
@task_prerun.connect
def _record_queue_latency(task=None, **_kwargs):
"""Record queue latency and attach the generic operation to task spans."""
if task is None:
return
try:
from app.observability import metrics as ot_metrics
task_name = getattr(task, "name", None) or "unknown"
operation = ot_metrics.parse_celery_task_label(task_name)
request = getattr(task, "request", None)
delivery_info = getattr(request, "delivery_info", None) or {}
queue = delivery_info.get("routing_key") or "unknown"
scheduled = bool(
getattr(request, "eta", None) or getattr(request, "expires", None)
)
if trace is not None:
with contextlib.suppress(Exception):
span = trace.get_current_span()
span.set_attribute("celery.task.operation", operation)
headers = getattr(request, "headers", None) or {}
enqueued_ns = headers.get("surfsense.enqueued_at_ns")
if enqueued_ns is None:
return
elapsed_s = (time.monotonic_ns() - int(enqueued_ns)) / 1e9
ot_metrics.record_celery_queue_latency(
elapsed_s,
task_name=task_name,
queue=queue,
scheduled=scheduled,
operation=operation,
)
if trace is not None:
with contextlib.suppress(Exception):
span = trace.get_current_span()
span.set_attribute("celery.queue.latency_ms", elapsed_s * 1000)
except Exception:
pass
@worker_process_init.connect @worker_process_init.connect
def init_worker(**kwargs): def init_worker(**kwargs):
"""Initialize the LLM Router and Image Gen Router when a Celery worker process starts. """Initialize the LLM Router and Image Gen Router when a Celery worker process starts.