2026-02-25 17:17:48 +05:30
|
|
|
"""Langfuse / OpenTelemetry tracing helpers for QA analysis."""
|
|
|
|
|
|
2026-03-31 21:42:03 +05:30
|
|
|
import json
|
2026-02-25 17:17:48 +05:30
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
|
|
from api.db.models import WorkflowRunModel
|
2026-05-21 15:20:02 +05:30
|
|
|
from api.services.pipecat.tracing_config import (
|
|
|
|
|
build_remote_parent_context,
|
|
|
|
|
get_trace_url,
|
|
|
|
|
)
|
2026-02-25 17:17:48 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_trace_id(gathered_context: dict) -> str | None:
|
|
|
|
|
"""Extract Langfuse trace_id from gathered_context trace_url.
|
|
|
|
|
|
2026-03-23 11:36:39 +05:30
|
|
|
Supports both URL formats:
|
|
|
|
|
- New: https://langfuse.dograh.com/trace/<trace_id>
|
|
|
|
|
- Legacy: https://langfuse.dograh.com/project/<project_id>/traces/<trace_id>
|
2026-02-25 17:17:48 +05:30
|
|
|
"""
|
|
|
|
|
trace_url = gathered_context.get("trace_url")
|
|
|
|
|
if not trace_url:
|
|
|
|
|
return None
|
|
|
|
|
try:
|
2026-03-23 11:36:39 +05:30
|
|
|
match = re.search(r"/traces?/([a-fA-F0-9]+)$", trace_url)
|
2026-02-25 17:17:48 +05:30
|
|
|
if match:
|
|
|
|
|
return match.group(1)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def setup_langfuse_parent_context(workflow_run: WorkflowRunModel):
|
|
|
|
|
"""Set up OTEL parent context from the workflow run's Langfuse trace.
|
|
|
|
|
|
|
|
|
|
Returns the parent context object, or None if tracing is unavailable.
|
|
|
|
|
"""
|
2026-05-21 15:20:02 +05:30
|
|
|
gathered_context = workflow_run.gathered_context or {}
|
|
|
|
|
trace_id = extract_trace_id(gathered_context)
|
|
|
|
|
if not trace_id:
|
|
|
|
|
logger.debug("No trace_id found, skipping Langfuse tracing")
|
2026-02-25 17:17:48 +05:30
|
|
|
return None
|
2026-05-21 15:20:02 +05:30
|
|
|
return build_remote_parent_context(trace_id)
|
2026-02-25 17:17:48 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_qa_span_to_trace(
|
|
|
|
|
parent_ctx,
|
|
|
|
|
model: str,
|
|
|
|
|
messages: list[dict],
|
|
|
|
|
output: str,
|
|
|
|
|
span_name: str,
|
2026-03-31 21:42:03 +05:30
|
|
|
system_prompt: str = "",
|
2026-02-25 17:17:48 +05:30
|
|
|
) -> None:
|
|
|
|
|
"""Create a child span under the conversation trace."""
|
|
|
|
|
if parent_ctx is None:
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
from opentelemetry import trace as otel_trace
|
|
|
|
|
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
|
|
|
|
|
|
|
|
|
|
tracer = otel_trace.get_tracer("pipecat")
|
|
|
|
|
with tracer.start_as_current_span(
|
|
|
|
|
span_name,
|
|
|
|
|
context=parent_ctx,
|
|
|
|
|
) as span:
|
2026-03-31 21:42:03 +05:30
|
|
|
tracing_messages = (
|
|
|
|
|
[
|
|
|
|
|
{"role": "system", "content": system_prompt},
|
|
|
|
|
*messages,
|
|
|
|
|
]
|
|
|
|
|
if system_prompt
|
|
|
|
|
else messages
|
|
|
|
|
)
|
2026-02-25 17:17:48 +05:30
|
|
|
add_llm_span_attributes(
|
|
|
|
|
span,
|
|
|
|
|
service_name="OpenAILLMService",
|
|
|
|
|
model=model,
|
|
|
|
|
operation_name=span_name,
|
2026-03-31 21:42:03 +05:30
|
|
|
messages=tracing_messages,
|
|
|
|
|
output=json.dumps({"content": output}),
|
2026-02-25 17:17:48 +05:30
|
|
|
stream=False,
|
|
|
|
|
parameters={"temperature": 0},
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"Failed to trace span '{span_name}' to Langfuse: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_node_summary_trace(
|
|
|
|
|
model: str,
|
|
|
|
|
messages: list[dict],
|
|
|
|
|
output: str,
|
|
|
|
|
node_name: str,
|
2026-03-31 21:42:03 +05:30
|
|
|
system_prompt: str = "",
|
2026-02-25 17:17:48 +05:30
|
|
|
) -> str | None:
|
|
|
|
|
"""Create a standalone Langfuse trace for a node summary generation.
|
|
|
|
|
|
|
|
|
|
Returns the trace URL, or None if tracing is unavailable.
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
from opentelemetry import trace as otel_trace
|
|
|
|
|
from opentelemetry.context import Context
|
2026-05-07 12:23:41 +05:30
|
|
|
from pipecat.utils.tracing.service_attributes import add_llm_span_attributes
|
2026-02-25 17:17:48 +05:30
|
|
|
|
2026-03-23 11:36:39 +05:30
|
|
|
from api.services.pipecat.tracing_config import ensure_tracing
|
2026-02-25 17:17:48 +05:30
|
|
|
|
2026-03-23 11:36:39 +05:30
|
|
|
if not ensure_tracing():
|
2026-02-25 17:17:48 +05:30
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
tracer = otel_trace.get_tracer("pipecat")
|
|
|
|
|
|
|
|
|
|
# Create a root span (new trace) for this node summary generation
|
|
|
|
|
with tracer.start_as_current_span(
|
|
|
|
|
f"node-summary-{node_name}",
|
|
|
|
|
context=Context(),
|
|
|
|
|
) as span:
|
2026-03-31 21:42:03 +05:30
|
|
|
tracing_messages = (
|
|
|
|
|
[
|
|
|
|
|
{"role": "system", "content": system_prompt},
|
|
|
|
|
*messages,
|
|
|
|
|
]
|
|
|
|
|
if system_prompt
|
|
|
|
|
else messages
|
|
|
|
|
)
|
2026-02-25 17:17:48 +05:30
|
|
|
add_llm_span_attributes(
|
|
|
|
|
span,
|
|
|
|
|
service_name="OpenAILLMService",
|
|
|
|
|
model=model,
|
|
|
|
|
operation_name=f"node-summary-{node_name}",
|
2026-03-31 21:42:03 +05:30
|
|
|
messages=tracing_messages,
|
|
|
|
|
output=json.dumps({"content": output}),
|
2026-02-25 17:17:48 +05:30
|
|
|
stream=False,
|
|
|
|
|
parameters={"temperature": 0},
|
|
|
|
|
)
|
|
|
|
|
trace_id = format(span.get_span_context().trace_id, "032x")
|
|
|
|
|
|
2026-03-23 11:36:39 +05:30
|
|
|
return get_trace_url(trace_id)
|
2026-02-25 17:17:48 +05:30
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"Failed to create node summary trace for '{node_name}': {e}")
|
|
|
|
|
return None
|