chore: upgrade pipecat

This commit is contained in:
Abhishek Kumar 2026-03-06 16:49:14 +05:30
parent 7b77721964
commit e34e4f8f3c
13 changed files with 45 additions and 58 deletions

View file

@ -780,7 +780,7 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
await circuit_breaker.record_and_evaluate(
workflow_run.campaign_id,
is_failure=status.status == "error",
is_failure=status.status in ("error", "failed"),
)
# Check if retry is needed for campaign calls (busy/no-answer)

View file

@ -1,10 +1,7 @@
from typing import Dict, Optional, TypedDict
import openai
from deepgram import (
DeepgramClient,
LiveOptions,
)
from deepgram import DeepgramClient
from groq import Groq
# try:
@ -105,20 +102,12 @@ class UserConfigurationValidator:
if model in self._provider_api_key_validity_status:
return self._provider_api_key_validity_status[model]
deepgram = DeepgramClient(api_key)
dg_connection = deepgram.listen.websocket.v("1")
try:
options = LiveOptions(
model="nova-2",
language="en-US",
smart_format=True,
)
connected = dg_connection.start(options)
self._provider_api_key_validity_status[model] = connected
finally:
dg_connection.finish()
deepgram = DeepgramClient(api_key=api_key)
deepgram.manage.v1.projects.list()
self._provider_api_key_validity_status[model] = True
except Exception:
self._provider_api_key_validity_status[model] = False
return self._provider_api_key_validity_status[model]
def _check_groq_api_key(self, model: str, api_key: str) -> bool:

View file

@ -14,7 +14,6 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import (
LLMTokenUsage,
LLMUsageMetricsData,
STTUsageMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@ -49,8 +48,6 @@ class PipelineMetricsAggregator(FrameProcessor):
await self._handle_llm_usage_metrics(data)
elif isinstance(data, TTSUsageMetricsData):
await self._handle_tts_usage_metrics(data)
elif isinstance(data, STTUsageMetricsData):
await self._handle_stt_usage_metrics(data)
await self.push_frame(frame, direction)
@ -104,11 +101,6 @@ class PipelineMetricsAggregator(FrameProcessor):
self._tts_usage_metrics[key] += data.value
# logger.debug(f"TTS usage metrics: {self._tts_usage_metrics}")
async def _handle_stt_usage_metrics(self, data: STTUsageMetricsData):
key = f"{data.processor}|||{data.model}"
self._stt_usage_metrics[key] += data.value
logger.debug(f"STT usage metrics: {self._stt_usage_metrics}")
def get_llm_usage_metrics(self) -> Dict[str, LLMTokenUsage]:
"""Get the aggregated LLM usage metrics grouped by processor|||model."""
return self._llm_usage_metrics

View file

@ -77,7 +77,6 @@ from pipecat.turns.user_stop import (
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType
from pipecat.utils.run_context import set_current_run_id
from pipecat.utils.tracing.context_registry import ContextProviderRegistry
# Setup tracing if enabled
setup_tracing_exporter()
@ -766,5 +765,4 @@ async def _run_pipeline(
except asyncio.CancelledError:
logger.warning("Received CancelledError in _run_pipeline")
finally:
ContextProviderRegistry.remove_providers(str(workflow_run_id))
logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}")

View file

@ -10,7 +10,9 @@ from api.services.telephony.providers.ari_call_strategies import (
ARIBridgeSwapStrategy,
ARIHangupStrategy,
)
from api.services.telephony.providers.cloudonix_call_strategies import CloudonixHangupStrategy
from api.services.telephony.providers.cloudonix_call_strategies import (
CloudonixHangupStrategy,
)
from api.services.telephony.providers.twilio_call_strategies import (
TwilioConferenceStrategy,
TwilioHangupStrategy,

View file

@ -49,7 +49,6 @@ from api.services.workflow.tools.timezone import (
get_current_time,
get_time_tools,
)
from pipecat.utils.tracing.context_registry import get_current_turn_context
class PipecatEngine:
@ -116,6 +115,17 @@ class PipecatEngine:
# Fallback for when manager is not yet initialized
return await get_organization_id_from_workflow_run(self._workflow_run_id)
def _get_otel_context(self):
"""Extract the OTel Context from the task's TracingContext.
Returns the turn-level context if available, otherwise the
conversation-level context, or None.
"""
tracing_ctx = getattr(self.task, "_tracing_context", None)
if not tracing_ctx:
return None
return tracing_ctx.get_turn_context() or tracing_ctx.get_conversation_context()
@property
def builtin_function_schemas(self) -> list[dict]:
"""Get built-in function schemas (calculator and timezone tools)."""
@ -356,6 +366,7 @@ class PipecatEngine:
embeddings_api_key=self._embeddings_api_key,
embeddings_model=self._embeddings_model,
embeddings_base_url=self._embeddings_base_url,
tracing_context=self._get_otel_context(),
)
await function_call_params.result_callback(result)
@ -383,8 +394,8 @@ class PipecatEngine:
return
# Capture the current turn context for otel tracing
# before creating the background task
parent_context = get_current_turn_context()
# before creating the background task.
parent_context = self._get_otel_context()
extraction_prompt = self._format_prompt(node.extraction_prompt)
extraction_variables = node.extraction_variables
@ -416,11 +427,11 @@ class PipecatEngine:
async def _setup_llm_context(self, node: Node) -> None:
"""Common method to set up LLM context"""
# Set node name for tracing
# Set OTel span name for tracing
try:
self.context.set_node_name(node.name)
self.context.set_otel_span_name(f"llm-{node.name}")
except AttributeError:
logger.warning(f"context has no set_node_name method")
logger.warning(f"context has no set_otel_span_name method")
# Register transition functions if not an end node
if not node.is_end:

View file

@ -128,14 +128,14 @@ class CustomToolManager:
function_name = schema["function"]["name"]
# Create and register the handler
handler, disable_timeout, cancel_on_interruption = self._create_handler(
handler, timeout_secs, cancel_on_interruption = self._create_handler(
tool, function_name
)
self._engine.llm.register_function(
function_name,
handler,
cancel_on_interruption=cancel_on_interruption,
disable_timeout=disable_timeout,
timeout_secs=timeout_secs,
)
logger.debug(
@ -156,21 +156,20 @@ class CustomToolManager:
Returns:
Async handler function for the tool
"""
# Whether to disable function call timeout
disable_timeout = False
timeout_secs: Optional[float] = None
cancel_on_interruption = True
if tool.category == ToolCategory.END_CALL.value:
cancel_on_interruption = False
handler = self._create_end_call_handler(tool, function_name)
elif tool.category == ToolCategory.TRANSFER_CALL.value:
disable_timeout = True
timeout_secs = 120.0
cancel_on_interruption = False
handler = self._create_transfer_call_handler(tool, function_name)
else:
handler = self._create_http_tool_handler(tool, function_name)
return handler, disable_timeout, cancel_on_interruption
return handler, timeout_secs, cancel_on_interruption
def _create_http_tool_handler(self, tool: Any, function_name: str):
"""Create a handler function for an HTTP API tool.

View file

@ -178,6 +178,7 @@ async def run_per_node_qa_analysis(
model=model,
messages=messages,
temperature=0,
extra_body={"stream": False},
)
raw_response = response.choices[0].message.content
accumulate_token_usage(total_token_usage, response)

View file

@ -15,10 +15,6 @@ from opentelemetry import trace
from api.db import db_client
from api.services.gen_ai import OpenAIEmbeddingService
from api.services.pipecat.tracing_config import is_tracing_enabled
from pipecat.utils.tracing.context_registry import (
get_current_conversation_context,
get_current_turn_context,
)
async def retrieve_from_knowledge_base(
@ -29,6 +25,7 @@ async def retrieve_from_knowledge_base(
embeddings_api_key: Optional[str] = None,
embeddings_model: Optional[str] = None,
embeddings_base_url: Optional[str] = None,
tracing_context=None,
) -> Dict[str, Any]:
"""Retrieve relevant information from the knowledge base using vector similarity search.
@ -45,6 +42,7 @@ async def retrieve_from_knowledge_base(
embeddings_api_key: Optional API key for embedding service
embeddings_model: Optional model ID for embedding service
embeddings_base_url: Optional base URL for embedding service
tracing_context: Optional OpenTelemetry context for tracing
Returns:
Dictionary containing:
@ -55,10 +53,7 @@ async def retrieve_from_knowledge_base(
# Create span for retrieval operation if tracing is enabled
if is_tracing_enabled():
try:
# Get parent context from turn or conversation
turn_context = get_current_turn_context()
conversation_context = get_current_conversation_context()
parent_context = turn_context or conversation_context
parent_context = tracing_context
# Get tracer
tracer = trace.get_tracer("pipecat")

View file

@ -341,12 +341,6 @@ class TestUserTurnStopScenarios:
await injector.inject(BotStoppedSpeakingFrame())
await asyncio.sleep(ASYNC_DELAY)
# TranscriptionFrame arrives AFTER unmute -> reaches stop strategy
await injector.inject(
TranscriptionFrame("hello", "user-1", time_now_iso8601())
)
await asyncio.sleep(ASYNC_DELAY)
# Install spy on trigger_user_turn_stopped to track every call
# and the _user_turn state at the time of each call.
trigger_stop_calls = []
@ -358,6 +352,12 @@ class TestUserTurnStopScenarios:
stop_strategy.trigger_user_turn_stopped = spy_trigger_stop
# TranscriptionFrame arrives AFTER unmute -> reaches stop strategy
await injector.inject(
TranscriptionFrame("hello", "user-1", time_now_iso8601())
)
await asyncio.sleep(ASYNC_DELAY)
# UserStoppedSpeaking arrives AFTER unmute
# Stop strategy: _user_speaking is False (UserStartedSpeaking was suppressed),
# _text is "hello" -> triggers stop via _handle_user_stopped_speaking

@ -1 +1 @@
Subproject commit a1d39061b2829a66d1bdd03b6e0525746d382057
Subproject commit 8bc718e6f136cb19814db5b8ce8412ae3a6ed393

View file

@ -16,7 +16,7 @@ git submodule update --init --recursive
# Install pipecat in editable mode with all extras
echo "Installing pipecat dependencies..."
pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3,speechmatics,openrouter]
pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,speechmatics,openrouter]
# Install other requirements
echo "Installing dograh API requirements..."

View file

@ -142,7 +142,7 @@ export default function WorkflowRunPage() {
returnValue = (
<div className="flex h-screen w-full overflow-hidden">
{/* Main content - 2/3 width */}
<div className="w-2/3 h-full flex items-center justify-center overflow-y-auto">
<div className="w-2/3 h-full overflow-y-auto">
<div className="w-full max-w-4xl space-y-6 p-6">
<Card className="border-border">
<CardHeader className="flex flex-row items-center justify-between">