feat: custom telemetry configuration

This commit is contained in:
Abhishek Kumar 2026-03-23 11:36:39 +05:30
parent 1967a71935
commit affb39e57f
23 changed files with 927 additions and 139 deletions

View file

@ -0,0 +1,96 @@
import csv
import io
from typing import Any, List
from api.constants import BACKEND_API_ENDPOINT
from api.db import db_client
from api.utils.transcript import generate_transcript_text
def _transcript_from_logs(logs: dict | None) -> str:
"""Extract transcript text from workflow run logs JSON."""
if not logs:
return ""
events = logs.get("realtime_feedback_events", [])
return generate_transcript_text(events).strip()
def _collect_extracted_variable_keys(runs: List[Any]) -> list[str]:
"""Collect all unique extracted variable keys across runs, preserving insertion order."""
keys: dict[str, None] = {}
for run in runs:
gathered = run.gathered_context or {}
extracted = gathered.get("extracted_variables", {})
if isinstance(extracted, dict):
for key in extracted:
keys.setdefault(key, None)
return list(keys)
async def generate_campaign_report_csv(campaign_id: int) -> tuple[io.StringIO, str]:
"""Generate a CSV report for a campaign.
Returns a tuple of (csv_output, filename).
"""
runs = await db_client.get_completed_runs_for_report(campaign_id)
# Collect dynamic extracted variable columns
extracted_var_keys = _collect_extracted_variable_keys(runs)
output = io.StringIO()
writer = csv.writer(output)
pre_headers = [
"Run ID",
"Created At",
"Phone Number",
"Call Disposition",
"Call Duration (s)",
]
post_headers = [
"Call Tags",
"Transcript",
"Recording URL",
]
writer.writerow(pre_headers + extracted_var_keys + post_headers)
for run in runs:
initial = run.initial_context or {}
gathered = run.gathered_context or {}
cost = run.cost_info or {}
recording_url = ""
if run.public_access_token:
recording_url = (
f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow"
f"/{run.public_access_token}/recording"
)
call_tags = gathered.get("call_tags", [])
if isinstance(call_tags, list):
call_tags = ", ".join(str(t) for t in call_tags)
pre_values = [
run.id,
run.created_at.isoformat() if run.created_at else "",
initial.get("phone_number", ""),
gathered.get("mapped_call_disposition", ""),
cost.get("call_duration_seconds", ""),
]
extracted = gathered.get("extracted_variables", {})
if not isinstance(extracted, dict):
extracted = {}
extracted_values = [extracted.get(key, "") for key in extracted_var_keys]
post_values = [
call_tags,
_transcript_from_logs(run.logs),
recording_url,
]
writer.writerow(pre_values + extracted_values + post_values)
output.seek(0)
filename = f"campaign_{campaign_id}_report.csv"
return output, filename

View file

@ -9,6 +9,7 @@ from api.services.pipecat.in_memory_buffers import (
InMemoryLogsBuffer,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.tracing_config import get_trace_url
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
@ -139,10 +140,12 @@ def register_event_handlers(
# Add trace URL if available (must be done before conversation tracing ends)
if task.turn_trace_observer:
trace_url = task.turn_trace_observer.get_trace_url()
if trace_url:
gathered_context["trace_url"] = trace_url
logger.debug(f"Added trace URL to gathered_context: {trace_url}")
trace_id = task.turn_trace_observer.get_trace_id()
if trace_id:
trace_url = get_trace_url(trace_id)
if trace_url:
gathered_context["trace_url"] = trace_url
logger.debug(f"Added trace URL to gathered_context: {trace_url}")
# also consider existing gathered context in workflow_run
gathered_context = {**gathered_context, **workflow_run.gathered_context}
@ -165,6 +168,19 @@ def register_event_handlers(
gathered_context["call_tags"] = call_tags
# Store disposition code in workflow for dynamic filtering
disposition_code = gathered_context.get("mapped_call_disposition")
if disposition_code and workflow_run:
try:
await db_client.add_call_disposition_code(
workflow_run.workflow_id, disposition_code
)
except Exception as e:
logger.error(
f"Error storing disposition code in workflow: {e}",
exc_info=True,
)
# Clean up engine resources (including voicemail detector)
await engine.cleanup()

View file

@ -38,7 +38,9 @@ from api.services.pipecat.service_factory import (
create_stt_service,
create_tts_service,
)
from api.services.pipecat.tracing_config import setup_tracing_exporter
from api.services.pipecat.tracing_config import (
ensure_tracing,
)
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
@ -82,10 +84,10 @@ from pipecat.turns.user_stop import (
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType
from pipecat.utils.run_context import set_current_run_id
from pipecat.utils.run_context import set_current_org_id, set_current_run_id
# Setup tracing if enabled
setup_tracing_exporter()
ensure_tracing()
async def run_pipeline_twilio(
@ -108,6 +110,11 @@ async def run_pipeline_twilio(
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
# Set org context early so tasks created by the transport inherit it
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
@ -156,6 +163,7 @@ async def run_pipeline_vonage(
"""
logger.info(f"Starting Vonage pipeline for workflow run {workflow_run_id}")
set_current_run_id(workflow_run_id)
set_current_org_id(organization_id)
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_uuid}
@ -226,6 +234,11 @@ async def run_pipeline_ari(
# Get workflow to extract configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
# Set org context early so tasks created by the transport inherit it
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
@ -281,6 +294,11 @@ async def run_pipeline_vobiz(
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
workflow = await db_client.get_workflow(workflow_id, user_id)
# Set org context early so tasks created by the transport inherit it
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
@ -350,6 +368,11 @@ async def run_pipeline_cloudonix(
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
# Set org context early so tasks created by the transport inherit it
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
@ -397,6 +420,11 @@ async def run_pipeline_smallwebrtc(
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
# Set org context early so tasks created by the transport inherit it
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:

View file

@ -1,8 +1,9 @@
import base64
import os
from loguru import logger
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from api.constants import (
ENABLE_TRACING,
@ -10,43 +11,229 @@ from api.constants import (
LANGFUSE_PUBLIC_KEY,
LANGFUSE_SECRET_KEY,
)
from pipecat.utils.run_context import get_current_org_id
from pipecat.utils.tracing.setup import setup_tracing
_tracing_initialized = False
_org_routing_exporter = None
def is_tracing_enabled():
"""Check if tracing should be enabled based on ENABLE_TRACING flag."""
# Tracing is only enabled when ENABLE_TRACING is explicitly set to true
# This makes the system OSS-friendly by default (no external dependencies required)
return ENABLE_TRACING
class _OrgAttributeSpanProcessor(SpanProcessor):
"""Stamps each span with the current org_id from the async context var."""
def on_start(self, span, parent_context=None):
from pipecat.utils.run_context import get_current_org_id
org_id = get_current_org_id()
if org_id:
span.set_attribute("dograh.org_id", str(org_id))
def on_end(self, span):
pass
def shutdown(self):
pass
def force_flush(self, timeout_millis=30000):
return True
def setup_tracing_exporter():
"""Setup the OTEL tracing exporter for Langfuse if enabled.
class _OrgRoutingExporter(SpanExporter):
"""Routes spans to org-specific or default Langfuse exporter.
Spans with a ``dograh.org_id`` attribute whose org has registered
credentials are forwarded to that org's exporter. All other spans
go to the default exporter (env-var credentials).
"""
def __init__(self, default_exporter):
self._default_exporter = default_exporter
self._org_exporters = {}
self._org_hosts = {}
def get_org_host(self, org_id):
return self._org_hosts.get(str(org_id))
def register_org(self, org_id, host, public_key, secret_key):
key = str(org_id)
normalized_host = host.rstrip("/")
auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
endpoint = f"{normalized_host}/api/public/otel/v1/traces"
# Skip if already registered with identical settings
if key in self._org_exporters:
existing = self._org_exporters[key]
if (
self._org_hosts.get(key) == normalized_host
and getattr(existing, "_endpoint", None) == endpoint
and existing._headers.get("Authorization") == f"Basic {auth}"
):
return
# Credentials changed — shut down the old exporter
logger.info(f"Updating OTEL exporter for org {org_id}")
existing.shutdown()
self._org_hosts[key] = normalized_host
exporter = OTLPSpanExporter(
endpoint=endpoint,
headers={"Authorization": f"Basic {auth}"},
)
self._org_exporters[key] = exporter
logger.info(f"Registered OTEL exporter for org {org_id}")
def unregister_org(self, org_id):
key = str(org_id)
exporter = self._org_exporters.pop(key, None)
self._org_hosts.pop(key, None)
if exporter:
exporter.shutdown()
logger.info(f"Unregistered OTEL exporter for org {org_id}")
def export(self, spans):
default_spans = []
org_buckets = {}
for span in spans:
org_id = span.attributes.get("dograh.org_id") if span.attributes else None
if org_id and str(org_id) in self._org_exporters:
org_buckets.setdefault(str(org_id), []).append(span)
else:
default_spans.append(span)
result = SpanExportResult.SUCCESS
if default_spans and self._default_exporter:
r = self._default_exporter.export(default_spans)
if r != SpanExportResult.SUCCESS:
result = r
for oid, batch in org_buckets.items():
r = self._org_exporters[oid].export(batch)
if r != SpanExportResult.SUCCESS:
result = r
return result
def shutdown(self):
if self._default_exporter:
self._default_exporter.shutdown()
for exp in self._org_exporters.values():
exp.shutdown()
def force_flush(self, timeout_millis=30000):
ok = True
if self._default_exporter:
ok = self._default_exporter.force_flush(timeout_millis) and ok
for exp in self._org_exporters.values():
ok = exp.force_flush(timeout_millis) and ok
return ok
def ensure_tracing() -> bool:
"""Initialize OTEL tracing if enabled. Returns True if tracing is available.
Installs an ``_OrgRoutingExporter`` so that spans can be routed to
org-specific Langfuse projects at export time.
Idempotent safe to call from both the pipeline process and the ARQ worker.
"""
global _tracing_initialized
global _tracing_initialized, _org_routing_exporter
if _tracing_initialized:
return
return True
if is_tracing_enabled():
if not all([LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY]):
logger.warning(
"Warning: ENABLE_TRACING is true but Langfuse credentials are not configured. Tracing disabled."
)
return
if not ENABLE_TRACING:
return False
# Build the default exporter from env-var credentials (may be None)
default_exporter = None
if all([LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY]):
langfuse_auth = base64.b64encode(
f"{LANGFUSE_PUBLIC_KEY}:{LANGFUSE_SECRET_KEY}".encode()
).decode()
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{LANGFUSE_HOST}/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = (
f"Authorization=Basic {langfuse_auth}"
default_exporter = OTLPSpanExporter(
endpoint=f"{LANGFUSE_HOST}/api/public/otel/v1/traces",
headers={"Authorization": f"Basic {langfuse_auth}"},
)
else:
logger.warning(
"ENABLE_TRACING is true but default Langfuse credentials are not configured. "
"Only org-level credentials will be used."
)
otlp_exporter = OTLPSpanExporter()
setup_tracing(service_name="dograh-pipeline", exporter=otlp_exporter)
_tracing_initialized = True
_org_routing_exporter = _OrgRoutingExporter(default_exporter)
setup_tracing(service_name="dograh-pipeline", exporter=_org_routing_exporter)
# Add processor that stamps every span with the current org_id context var
from opentelemetry import trace as otel_trace
provider = otel_trace.get_tracer_provider()
if hasattr(provider, "add_span_processor"):
provider.add_span_processor(_OrgAttributeSpanProcessor())
_tracing_initialized = True
return True
def register_org_langfuse_credentials(org_id, host, public_key, secret_key):
"""Register or update org-specific Langfuse credentials for span routing.
Safe to call multiple times updates credentials if they changed.
"""
if not ensure_tracing():
return
if not all([host, public_key, secret_key]):
logger.warning(
f"Incomplete Langfuse credentials for org {org_id}, skipping registration"
)
return
_org_routing_exporter.register_org(org_id, host, public_key, secret_key)
def unregister_org_langfuse_credentials(org_id):
"""Remove org-specific Langfuse credentials. Spans will fall back to the default exporter."""
if not ensure_tracing():
return
_org_routing_exporter.unregister_org(org_id)
async def load_all_org_langfuse_credentials():
"""Load Langfuse credentials for all orgs at startup.
Called once during app lifespan so that org-specific exporters are ready
before any pipeline runs, without per-call DB lookups.
"""
if not ensure_tracing():
return
from api.db import db_client
from api.enums import OrganizationConfigurationKey
configs = await db_client.get_all_configurations_by_key(
OrganizationConfigurationKey.LANGFUSE_CREDENTIALS.value,
)
for config in configs:
org_id = config["organization_id"]
value = config["value"]
register_org_langfuse_credentials(
org_id=org_id,
host=value.get("host"),
public_key=value.get("public_key"),
secret_key=value.get("secret_key"),
)
logger.info(f"Loaded Langfuse credentials for {len(configs)} org(s)")
def get_trace_url(trace_id: str, org_id=None) -> str | None:
"""Build a Langfuse trace URL, using org-specific host when available."""
if org_id is None:
org_id = get_current_org_id()
host = None
if org_id and _org_routing_exporter:
host = _org_routing_exporter.get_org_host(str(org_id))
if not host:
host = LANGFUSE_HOST
if not host:
return None
return f"{host.rstrip('/')}/trace/{trace_id}"

View file

@ -25,6 +25,7 @@ if TYPE_CHECKING:
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.utils.tracing.tracing_context import TracingContext
LLMService = Union[OpenAILLMService, AnthropicLLMService, GoogleLLMService]
@ -135,7 +136,9 @@ class PipecatEngine:
Returns the turn-level context if available, otherwise the
conversation-level context, or None.
"""
tracing_ctx = getattr(self.task, "_tracing_context", None)
tracing_ctx: TracingContext | None = getattr(
self.task, "_tracing_context", None
)
if not tracing_ctx:
return None
return tracing_ctx.get_turn_context() or tracing_ctx.get_conversation_context()
@ -439,6 +442,10 @@ class PipecatEngine:
)
)
self._gathered_context.update(extracted_data)
extracted_variables = self._gathered_context.setdefault(
"extracted_variables", {}
)
extracted_variables.update(extracted_data)
logger.debug(
f"Variable extraction completed. Extracted: {extracted_data}"
)

View file

@ -7,7 +7,7 @@ from loguru import logger
from opentelemetry import trace
from api.services.gen_ai.json_parser import parse_llm_json
from api.services.pipecat.tracing_config import is_tracing_enabled
from api.services.pipecat.tracing_config import ensure_tracing
from api.services.workflow.dto import ExtractionVariableDTO
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
@ -206,7 +206,7 @@ class VariableExtractionManager:
# Get model name for tracing
model_name = getattr(self._engine.llm, "model_name", "unknown")
if is_tracing_enabled():
if ensure_tracing():
tracer = trace.get_tracer("pipecat")
with tracer.start_as_current_span(
"llm-variable-extraction", context=parent_ctx

View file

@ -5,18 +5,21 @@ import re
from loguru import logger
from api.db.models import WorkflowRunModel
from api.services.pipecat.tracing_config import get_trace_url
def extract_trace_id(gathered_context: dict) -> str | None:
"""Extract Langfuse trace_id from gathered_context trace_url.
URL format: https://langfuse.dograh.com/project/<project_id>/traces/<trace_id>
Supports both URL formats:
- New: https://langfuse.dograh.com/trace/<trace_id>
- Legacy: https://langfuse.dograh.com/project/<project_id>/traces/<trace_id>
"""
trace_url = gathered_context.get("trace_url")
if not trace_url:
return None
try:
match = re.search(r"/traces/([a-fA-F0-9]+)$", trace_url)
match = re.search(r"/traces?/([a-fA-F0-9]+)$", trace_url)
if match:
return match.group(1)
except Exception:
@ -37,16 +40,11 @@ def setup_langfuse_parent_context(workflow_run: WorkflowRunModel):
set_span_in_context,
)
from api.services.pipecat.tracing_config import (
is_tracing_enabled,
setup_tracing_exporter,
)
from api.services.pipecat.tracing_config import ensure_tracing
if not is_tracing_enabled():
if not ensure_tracing():
return None
setup_tracing_exporter()
gathered_context = workflow_run.gathered_context or {}
trace_id = extract_trace_id(gathered_context)
if not trace_id:
@ -114,17 +112,12 @@ def create_node_summary_trace(
from opentelemetry import trace as otel_trace
from opentelemetry.context import Context
from api.services.pipecat.tracing_config import (
is_tracing_enabled,
setup_tracing_exporter,
)
from api.services.pipecat.tracing_config import ensure_tracing
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
if not is_tracing_enabled():
if not ensure_tracing():
return None
setup_tracing_exporter()
tracer = otel_trace.get_tracer("pipecat")
# Create a root span (new trace) for this node summary generation
@ -144,10 +137,7 @@ def create_node_summary_trace(
)
trace_id = format(span.get_span_context().trace_id, "032x")
from langfuse import get_client
langfuse = get_client()
return langfuse.get_trace_url(trace_id=trace_id)
return get_trace_url(trace_id)
except Exception as e:
logger.warning(f"Failed to create node summary trace for '{node_name}': {e}")

View file

@ -14,7 +14,7 @@ from opentelemetry import trace
from api.db import db_client
from api.services.gen_ai import OpenAIEmbeddingService
from api.services.pipecat.tracing_config import is_tracing_enabled
from api.services.pipecat.tracing_config import ensure_tracing
async def retrieve_from_knowledge_base(
@ -51,7 +51,7 @@ async def retrieve_from_knowledge_base(
- total_results: Number of results returned
"""
# Create span for retrieval operation if tracing is enabled
if is_tracing_enabled():
if ensure_tracing():
try:
parent_context = tracing_context