dograh/api/services/workflow/qa/tracing.py

142 lines
4.3 KiB
Python
Raw Permalink Normal View History

2026-02-25 17:17:48 +05:30
"""Langfuse / OpenTelemetry tracing helpers for QA analysis."""
import json
2026-02-25 17:17:48 +05:30
import re
from loguru import logger
from api.db.models import WorkflowRunModel
from api.services.pipecat.tracing_config import (
build_remote_parent_context,
get_trace_url,
)
2026-02-25 17:17:48 +05:30
def extract_trace_id(gathered_context: dict) -> str | None:
"""Extract Langfuse trace_id from gathered_context trace_url.
2026-03-23 11:36:39 +05:30
Supports both URL formats:
- New: https://langfuse.dograh.com/trace/<trace_id>
- Legacy: https://langfuse.dograh.com/project/<project_id>/traces/<trace_id>
2026-02-25 17:17:48 +05:30
"""
trace_url = gathered_context.get("trace_url")
if not trace_url:
return None
try:
2026-03-23 11:36:39 +05:30
match = re.search(r"/traces?/([a-fA-F0-9]+)$", trace_url)
2026-02-25 17:17:48 +05:30
if match:
return match.group(1)
except Exception:
pass
return None
def setup_langfuse_parent_context(workflow_run: WorkflowRunModel):
"""Set up OTEL parent context from the workflow run's Langfuse trace.
Returns the parent context object, or None if tracing is unavailable.
"""
gathered_context = workflow_run.gathered_context or {}
trace_id = extract_trace_id(gathered_context)
if not trace_id:
logger.debug("No trace_id found, skipping Langfuse tracing")
2026-02-25 17:17:48 +05:30
return None
return build_remote_parent_context(trace_id)
2026-02-25 17:17:48 +05:30
def add_qa_span_to_trace(
parent_ctx,
model: str,
messages: list[dict],
output: str,
span_name: str,
system_prompt: str = "",
2026-02-25 17:17:48 +05:30
) -> None:
"""Create a child span under the conversation trace."""
if parent_ctx is None:
return
try:
from opentelemetry import trace as otel_trace
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
tracer = otel_trace.get_tracer("pipecat")
with tracer.start_as_current_span(
span_name,
context=parent_ctx,
) as span:
tracing_messages = (
[
{"role": "system", "content": system_prompt},
*messages,
]
if system_prompt
else messages
)
2026-02-25 17:17:48 +05:30
add_llm_span_attributes(
span,
service_name="OpenAILLMService",
model=model,
operation_name=span_name,
messages=tracing_messages,
output=json.dumps({"content": output}),
2026-02-25 17:17:48 +05:30
stream=False,
parameters={"temperature": 0},
)
except Exception as e:
logger.warning(f"Failed to trace span '{span_name}' to Langfuse: {e}")
def create_node_summary_trace(
model: str,
messages: list[dict],
output: str,
node_name: str,
system_prompt: str = "",
2026-02-25 17:17:48 +05:30
) -> str | None:
"""Create a standalone Langfuse trace for a node summary generation.
Returns the trace URL, or None if tracing is unavailable.
"""
try:
from opentelemetry import trace as otel_trace
from opentelemetry.context import Context
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
2026-02-25 17:17:48 +05:30
2026-03-23 11:36:39 +05:30
from api.services.pipecat.tracing_config import ensure_tracing
2026-02-25 17:17:48 +05:30
2026-03-23 11:36:39 +05:30
if not ensure_tracing():
2026-02-25 17:17:48 +05:30
return None
tracer = otel_trace.get_tracer("pipecat")
# Create a root span (new trace) for this node summary generation
with tracer.start_as_current_span(
f"node-summary-{node_name}",
context=Context(),
) as span:
tracing_messages = (
[
{"role": "system", "content": system_prompt},
*messages,
]
if system_prompt
else messages
)
2026-02-25 17:17:48 +05:30
add_llm_span_attributes(
span,
service_name="OpenAILLMService",
model=model,
operation_name=f"node-summary-{node_name}",
messages=tracing_messages,
output=json.dumps({"content": output}),
2026-02-25 17:17:48 +05:30
stream=False,
parameters={"temperature": 0},
)
trace_id = format(span.get_span_context().trace_id, "032x")
2026-03-23 11:36:39 +05:30
return get_trace_url(trace_id)
2026-02-25 17:17:48 +05:30
except Exception as e:
logger.warning(f"Failed to create node summary trace for '{node_name}': {e}")
return None