diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 71571bd..33ecdce 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -780,7 +780,7 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq await campaign_call_dispatcher.release_call_slot(workflow_run_id) await circuit_breaker.record_and_evaluate( workflow_run.campaign_id, - is_failure=status.status == "error", + is_failure=status.status in ("error", "failed"), ) # Check if retry is needed for campaign calls (busy/no-answer) diff --git a/api/services/configuration/check_validity.py b/api/services/configuration/check_validity.py index 3f25ef2..0a6fa7a 100644 --- a/api/services/configuration/check_validity.py +++ b/api/services/configuration/check_validity.py @@ -1,10 +1,7 @@ from typing import Dict, Optional, TypedDict import openai -from deepgram import ( - DeepgramClient, - LiveOptions, -) +from deepgram import DeepgramClient from groq import Groq # try: @@ -105,20 +102,12 @@ class UserConfigurationValidator: if model in self._provider_api_key_validity_status: return self._provider_api_key_validity_status[model] - deepgram = DeepgramClient(api_key) - dg_connection = deepgram.listen.websocket.v("1") - try: - options = LiveOptions( - model="nova-2", - language="en-US", - smart_format=True, - ) - - connected = dg_connection.start(options) - self._provider_api_key_validity_status[model] = connected - finally: - dg_connection.finish() + deepgram = DeepgramClient(api_key=api_key) + deepgram.manage.v1.projects.list() + self._provider_api_key_validity_status[model] = True + except Exception: + self._provider_api_key_validity_status[model] = False return self._provider_api_key_validity_status[model] def _check_groq_api_key(self, model: str, api_key: str) -> bool: diff --git a/api/services/pipecat/pipeline_metrics_aggregator.py b/api/services/pipecat/pipeline_metrics_aggregator.py index 7d947bd..361f4b9 100644 --- a/api/services/pipecat/pipeline_metrics_aggregator.py +++ b/api/services/pipecat/pipeline_metrics_aggregator.py @@ -14,7 +14,6 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import ( LLMTokenUsage, LLMUsageMetricsData, - STTUsageMetricsData, TTSUsageMetricsData, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -49,8 +48,6 @@ class PipelineMetricsAggregator(FrameProcessor): await self._handle_llm_usage_metrics(data) elif isinstance(data, TTSUsageMetricsData): await self._handle_tts_usage_metrics(data) - elif isinstance(data, STTUsageMetricsData): - await self._handle_stt_usage_metrics(data) await self.push_frame(frame, direction) @@ -104,11 +101,6 @@ class PipelineMetricsAggregator(FrameProcessor): self._tts_usage_metrics[key] += data.value # logger.debug(f"TTS usage metrics: {self._tts_usage_metrics}") - async def _handle_stt_usage_metrics(self, data: STTUsageMetricsData): - key = f"{data.processor}|||{data.model}" - self._stt_usage_metrics[key] += data.value - logger.debug(f"STT usage metrics: {self._stt_usage_metrics}") - def get_llm_usage_metrics(self) -> Dict[str, LLMTokenUsage]: """Get the aggregated LLM usage metrics grouped by processor|||model.""" return self._llm_usage_metrics diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 8d90b1a..65f3c93 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -77,7 +77,6 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType from pipecat.utils.run_context import set_current_run_id -from pipecat.utils.tracing.context_registry import ContextProviderRegistry # Setup tracing if enabled setup_tracing_exporter() @@ -766,5 +765,4 @@ async def _run_pipeline( except asyncio.CancelledError: logger.warning("Received CancelledError in _run_pipeline") finally: - ContextProviderRegistry.remove_providers(str(workflow_run_id)) logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}") diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index 77023ef..7caf5b2 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -10,7 +10,9 @@ from api.services.telephony.providers.ari_call_strategies import ( ARIBridgeSwapStrategy, ARIHangupStrategy, ) -from api.services.telephony.providers.cloudonix_call_strategies import CloudonixHangupStrategy +from api.services.telephony.providers.cloudonix_call_strategies import ( + CloudonixHangupStrategy, +) from api.services.telephony.providers.twilio_call_strategies import ( TwilioConferenceStrategy, TwilioHangupStrategy, diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index f0398a6..f16d253 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -49,7 +49,6 @@ from api.services.workflow.tools.timezone import ( get_current_time, get_time_tools, ) -from pipecat.utils.tracing.context_registry import get_current_turn_context class PipecatEngine: @@ -116,6 +115,17 @@ class PipecatEngine: # Fallback for when manager is not yet initialized return await get_organization_id_from_workflow_run(self._workflow_run_id) + def _get_otel_context(self): + """Extract the OTel Context from the task's TracingContext. + + Returns the turn-level context if available, otherwise the + conversation-level context, or None. + """ + tracing_ctx = getattr(self.task, "_tracing_context", None) + if not tracing_ctx: + return None + return tracing_ctx.get_turn_context() or tracing_ctx.get_conversation_context() + @property def builtin_function_schemas(self) -> list[dict]: """Get built-in function schemas (calculator and timezone tools).""" @@ -356,6 +366,7 @@ class PipecatEngine: embeddings_api_key=self._embeddings_api_key, embeddings_model=self._embeddings_model, embeddings_base_url=self._embeddings_base_url, + tracing_context=self._get_otel_context(), ) await function_call_params.result_callback(result) @@ -383,8 +394,8 @@ class PipecatEngine: return # Capture the current turn context for otel tracing - # before creating the background task - parent_context = get_current_turn_context() + # before creating the background task. + parent_context = self._get_otel_context() extraction_prompt = self._format_prompt(node.extraction_prompt) extraction_variables = node.extraction_variables @@ -416,11 +427,11 @@ class PipecatEngine: async def _setup_llm_context(self, node: Node) -> None: """Common method to set up LLM context""" - # Set node name for tracing + # Set OTel span name for tracing try: - self.context.set_node_name(node.name) + self.context.set_otel_span_name(f"llm-{node.name}") except AttributeError: - logger.warning(f"context has no set_node_name method") + logger.warning(f"context has no set_otel_span_name method") # Register transition functions if not an end node if not node.is_end: diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index c915fb8..c81ad6b 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -128,14 +128,14 @@ class CustomToolManager: function_name = schema["function"]["name"] # Create and register the handler - handler, disable_timeout, cancel_on_interruption = self._create_handler( + handler, timeout_secs, cancel_on_interruption = self._create_handler( tool, function_name ) self._engine.llm.register_function( function_name, handler, cancel_on_interruption=cancel_on_interruption, - disable_timeout=disable_timeout, + timeout_secs=timeout_secs, ) logger.debug( @@ -156,21 +156,20 @@ class CustomToolManager: Returns: Async handler function for the tool """ - # Whether to disable function call timeout - disable_timeout = False + timeout_secs: Optional[float] = None cancel_on_interruption = True if tool.category == ToolCategory.END_CALL.value: cancel_on_interruption = False handler = self._create_end_call_handler(tool, function_name) elif tool.category == ToolCategory.TRANSFER_CALL.value: - disable_timeout = True + timeout_secs = 120.0 cancel_on_interruption = False handler = self._create_transfer_call_handler(tool, function_name) else: handler = self._create_http_tool_handler(tool, function_name) - return handler, disable_timeout, cancel_on_interruption + return handler, timeout_secs, cancel_on_interruption def _create_http_tool_handler(self, tool: Any, function_name: str): """Create a handler function for an HTTP API tool. diff --git a/api/services/workflow/qa/analysis.py b/api/services/workflow/qa/analysis.py index b2fe335..3e4768b 100644 --- a/api/services/workflow/qa/analysis.py +++ b/api/services/workflow/qa/analysis.py @@ -178,6 +178,7 @@ async def run_per_node_qa_analysis( model=model, messages=messages, temperature=0, + extra_body={"stream": False}, ) raw_response = response.choices[0].message.content accumulate_token_usage(total_token_usage, response) diff --git a/api/services/workflow/tools/knowledge_base.py b/api/services/workflow/tools/knowledge_base.py index ca79efc..cd4c5b2 100644 --- a/api/services/workflow/tools/knowledge_base.py +++ b/api/services/workflow/tools/knowledge_base.py @@ -15,10 +15,6 @@ from opentelemetry import trace from api.db import db_client from api.services.gen_ai import OpenAIEmbeddingService from api.services.pipecat.tracing_config import is_tracing_enabled -from pipecat.utils.tracing.context_registry import ( - get_current_conversation_context, - get_current_turn_context, -) async def retrieve_from_knowledge_base( @@ -29,6 +25,7 @@ async def retrieve_from_knowledge_base( embeddings_api_key: Optional[str] = None, embeddings_model: Optional[str] = None, embeddings_base_url: Optional[str] = None, + tracing_context=None, ) -> Dict[str, Any]: """Retrieve relevant information from the knowledge base using vector similarity search. @@ -45,6 +42,7 @@ async def retrieve_from_knowledge_base( embeddings_api_key: Optional API key for embedding service embeddings_model: Optional model ID for embedding service embeddings_base_url: Optional base URL for embedding service + tracing_context: Optional OpenTelemetry context for tracing Returns: Dictionary containing: @@ -55,10 +53,7 @@ async def retrieve_from_knowledge_base( # Create span for retrieval operation if tracing is enabled if is_tracing_enabled(): try: - # Get parent context from turn or conversation - turn_context = get_current_turn_context() - conversation_context = get_current_conversation_context() - parent_context = turn_context or conversation_context + parent_context = tracing_context # Get tracer tracer = trace.get_tracer("pipecat") diff --git a/api/tests/test_user_turn_stop_scenarios.py b/api/tests/test_user_turn_stop_scenarios.py index 8cafcdf..594dcff 100644 --- a/api/tests/test_user_turn_stop_scenarios.py +++ b/api/tests/test_user_turn_stop_scenarios.py @@ -341,12 +341,6 @@ class TestUserTurnStopScenarios: await injector.inject(BotStoppedSpeakingFrame()) await asyncio.sleep(ASYNC_DELAY) - # TranscriptionFrame arrives AFTER unmute -> reaches stop strategy - await injector.inject( - TranscriptionFrame("hello", "user-1", time_now_iso8601()) - ) - await asyncio.sleep(ASYNC_DELAY) - # Install spy on trigger_user_turn_stopped to track every call # and the _user_turn state at the time of each call. trigger_stop_calls = [] @@ -358,6 +352,12 @@ class TestUserTurnStopScenarios: stop_strategy.trigger_user_turn_stopped = spy_trigger_stop + # TranscriptionFrame arrives AFTER unmute -> reaches stop strategy + await injector.inject( + TranscriptionFrame("hello", "user-1", time_now_iso8601()) + ) + await asyncio.sleep(ASYNC_DELAY) + # UserStoppedSpeaking arrives AFTER unmute # Stop strategy: _user_speaking is False (UserStartedSpeaking was suppressed), # _text is "hello" -> triggers stop via _handle_user_stopped_speaking diff --git a/pipecat b/pipecat index a1d3906..8bc718e 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit a1d39061b2829a66d1bdd03b6e0525746d382057 +Subproject commit 8bc718e6f136cb19814db5b8ce8412ae3a6ed393 diff --git a/scripts/setup_pipecat.sh b/scripts/setup_pipecat.sh index 70b3663..546684e 100755 --- a/scripts/setup_pipecat.sh +++ b/scripts/setup_pipecat.sh @@ -16,7 +16,7 @@ git submodule update --init --recursive # Install pipecat in editable mode with all extras echo "Installing pipecat dependencies..." -pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3,speechmatics,openrouter] +pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,speechmatics,openrouter] # Install other requirements echo "Installing dograh API requirements..." diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx index 498e3c8..72b496a 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx @@ -142,7 +142,7 @@ export default function WorkflowRunPage() { returnValue = (