From 0345df6fbebb42013221ac48d06f2b1867686ea7 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sat, 20 Sep 2025 14:07:00 +0530 Subject: [PATCH 1/2] Optimise requirements.txt and update pipecat imports --- api/db/workflow_run_client.py | 2 +- api/requirements.txt | 129 +----- api/routes/rtc_offer.py | 2 +- api/services/filesystem/minio.py | 11 +- .../looptalk/core/pipeline_builder.py | 2 +- api/services/looptalk/internal_serializer.py | 85 ++++ api/services/looptalk/internal_transport.py | 405 ++++++++++++++++++ api/services/looptalk/orchestrator.py | 8 +- api/services/mps_service_key_client.py | 15 +- api/services/pipecat/run_pipeline.py | 2 +- api/services/pipecat/service_factory.py | 5 +- api/services/pipecat/transport_setup.py | 10 +- .../smart_turn/websocket_smart_turn.py | 8 +- api/services/workflow/pipecat_engine.py | 4 +- 14 files changed, 536 insertions(+), 152 deletions(-) create mode 100644 api/services/looptalk/internal_serializer.py create mode 100644 api/services/looptalk/internal_transport.py diff --git a/api/db/workflow_run_client.py b/api/db/workflow_run_client.py index db44e04..fcd4f90 100644 --- a/api/db/workflow_run_client.py +++ b/api/db/workflow_run_client.py @@ -70,7 +70,7 @@ class WorkflowRunClient(BaseDBClient): # Get the current storage backend based on ENABLE_AWS_S3 flag current_backend = StorageBackend.get_current_backend() - + new_run = WorkflowRunModel( name=name, workflow=workflow, diff --git a/api/requirements.txt b/api/requirements.txt index 9407c1c..e6abd58 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,121 +1,16 @@ -aenum==3.1.16 -aiofiles==24.1.0 -aiohappyeyeballs==2.6.1 -aiohttp==3.11.16 -aiosignal==1.3.2 -alembic==1.15.2 -amqp==5.3.1 -annotated-types==0.7.0 -anyio==4.9.0 +pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero] @ git+https://github.com/dograh-hq/pipecat.git@0c5091d +langfuse==3.4.0 +fastapi==0.116.2 asyncpg==0.30.0 -attrs==25.3.0 -audioop-lts==0.2.1; python_version>='3.13' -av==14.2.0 -backoff==2.2.1 -billiard==4.2.1 -certifi==2025.1.31 -cffi==1.17.1 -charset-normalizer==3.4.1 -click==8.1.8 -click-didyoumean==0.3.1 -click-plugins==1.1.1 -click-repl==0.3.0 -colorama==0.4.6 -coloredlogs==15.0.1 -dataclasses-json==0.6.7 -deprecation==2.1.0 -distro==1.9.0 -docstring_parser==0.16 -eval_type_backport==0.2.2 -fastapi==0.115.12 -filelock==3.18.0 -flatbuffers==25.2.10 -frozenlist==1.5.0 -fsspec==2025.3.2 -future==1.0.0 -greenlet==3.1.1 -h11==0.14.0 -httpcore==1.0.7 -httpx==0.28.1 -huggingface-hub==0.30.1 -humanfriendly==10.0 -idna==3.10 -Jinja2==3.1.6 -jiter==0.9.0 -jmespath==1.0.1 -kombu==5.5.2 -langfuse==3.2.7 -llvmlite==0.44.0 -loguru==0.7.3 -Mako==1.3.9 -Markdown==3.8 -MarkupSafe==3.0.2 -marshmallow==3.26.1 -mpmath==1.3.0 -multidict==6.2.0 -mypy_extensions==1.1.0 -numba==0.61.2 -numpy==1.26.4 -onnxruntime==1.21.0 -packaging==24.2 -pillow==11.1.0 -pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure] @ git+https://github.com/dograh-hq/pipecat.git@main -prompt_toolkit==3.0.51 -propcache==0.3.1 -protobuf==5.29.4 -psutil==7.0.0 -pycparser==2.22 -pydantic==2.10.6 -pydantic_core==2.27.2 -PyJWT==2.10.1 -pyloudnorm==0.1.1 -python-dateutil==2.9.0.post0 -python-dotenv==1.1.0 -PyYAML==6.0.2 -redis==5.2.1 -regex==2024.11.6 -requests==2.32.3 -resampy==0.4.3 -safetensors==0.5.3 -scipy==1.15.2 -setuptools==75.8.0 -six==1.17.0 -sniffio==1.3.1 -sounddevice==0.5.1 -soxr==0.5.0.post1 -SQLAlchemy==2.0.40 -starlette==0.46.1 -sympy==1.13.3 -tokenizers==0.21.1 -tqdm==4.67.1 -transformers==4.50.3 -types-protobuf==4.25.0.20240417 -typing-inspect==0.9.0 -typing-inspection==0.4.0 -typing_extensions==4.13.0 -tzdata==2025.2 -urllib3==2.3.0 -uvicorn==0.34.0 -vine==5.1.0 -watchfiles==1.0.4 -wcwidth==0.2.13 -websockets==13.1 -wheel==0.45.1 -wrapt==1.17.2 -yarl==1.18.3 -aiortc==1.11.0 -opencv-python-headless==4.11.0.86 -aioboto3==14.3.0 +alembic==1.16.5 +redis==5.3.1 +uvicorn==0.35.0 +aioboto3==15.1.0 arq==0.26.3 -opentelemetry-sdk==1.33.1 -opentelemetry-api==1.33.1 -opentelemetry-exporter-otlp==1.33.1 -torch==2.7.0 -torchaudio==2.7.0 axiom-py==0.9.0 -sentry-sdk[fastapi]==2.30.0 -twilio==9.7.0 +twilio==9.8.0 +minio==7.2.16 +alembic-postgresql-enum==1.8.0 python-multipart==0.0.20 -soundfile==0.13.1 -minio==7.2.7 -alembic-postgresql-enum==1.8.0 \ No newline at end of file +sentry-sdk[fastapi]==2.38.0 +sqlalchemy[asyncio]==2.0.43 diff --git a/api/routes/rtc_offer.py b/api/routes/rtc_offer.py index a8e7973..7ebeaf6 100644 --- a/api/routes/rtc_offer.py +++ b/api/routes/rtc_offer.py @@ -2,7 +2,7 @@ from typing import Dict from fastapi import APIRouter, BackgroundTasks, Depends from loguru import logger -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection from pipecat.utils.context import set_current_run_id from pydantic import BaseModel diff --git a/api/services/filesystem/minio.py b/api/services/filesystem/minio.py index 9051d2d..bbfc555 100644 --- a/api/services/filesystem/minio.py +++ b/api/services/filesystem/minio.py @@ -1,11 +1,10 @@ import asyncio -from datetime import datetime, timedelta, timezone +import json from typing import Any, BinaryIO, Dict, Optional from loguru import logger from minio import Minio from minio.error import S3Error -import json from .base import BaseFileSystem @@ -48,7 +47,7 @@ class MinioFileSystem(BaseFileSystem): try: if not self.client.bucket_exists(self.bucket_name): self.client.make_bucket(self.bucket_name) - + # Set anonymous download policy for local development # This allows unsigned URLs to work policy = { @@ -58,11 +57,11 @@ class MinioFileSystem(BaseFileSystem): "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": ["s3:GetObject"], - "Resource": [f"arn:aws:s3:::{self.bucket_name}/*"] + "Resource": [f"arn:aws:s3:::{self.bucket_name}/*"], } - ] + ], } - + self.client.set_bucket_policy(self.bucket_name, json.dumps(policy)) except Exception as e: # Bucket might already exist or we might be in a restricted environment diff --git a/api/services/looptalk/core/pipeline_builder.py b/api/services/looptalk/core/pipeline_builder.py index d080e45..0d53235 100644 --- a/api/services/looptalk/core/pipeline_builder.py +++ b/api/services/looptalk/core/pipeline_builder.py @@ -9,10 +9,10 @@ from pipecat.processors.filters.stt_mute_filter import ( STTMuteFilter, STTMuteStrategy, ) -from pipecat.transports import InternalTransport from api.db.db_client import DBClient from api.services.looptalk.audio_streamer import get_or_create_audio_streamer +from api.services.looptalk.internal_transport import InternalTransport from api.services.pipecat.audio_config import AudioConfig from api.services.pipecat.pipeline_builder import ( create_pipeline_components, diff --git a/api/services/looptalk/internal_serializer.py b/api/services/looptalk/internal_serializer.py new file mode 100644 index 0000000..ec14ca4 --- /dev/null +++ b/api/services/looptalk/internal_serializer.py @@ -0,0 +1,85 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Internal frame serializer for agent-to-agent communication.""" + +from loguru import logger +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, + StartFrame, +) +from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType + + +class InternalFrameSerializer(FrameSerializer): + """Serializer for InternalTransport that filters frames between agents. + + This serializer ensures only audio frames are passed between agents, + preventing control frames from creating infinite loops. + """ + + @property + def type(self) -> FrameSerializerType: + """Internal transport uses binary frames.""" + return FrameSerializerType.BINARY + + async def setup(self, frame: StartFrame): + """No setup required for internal transport.""" + pass + + async def serialize(self, frame: Frame) -> bytes | None: + """Only serialize audio frames for transmission between agents.""" + # Only pass audio frames between agents + if isinstance(frame, OutputAudioRawFrame): + # Use a fixed-size header to avoid parsing issues with binary data + # Format: "AUDIO" (5 bytes) + sample_rate (4 bytes) + num_channels (2 bytes) + audio data + header = b"AUDIO" + sample_rate_bytes = frame.sample_rate.to_bytes(4, byteorder="big") + num_channels_bytes = frame.num_channels.to_bytes(2, byteorder="big") + + serialized = header + sample_rate_bytes + num_channels_bytes + frame.audio + return serialized + + # Don't pass control frames between agents + return None + + async def deserialize(self, data: bytes) -> Frame | None: + """Deserialize audio frames from partner agent.""" + if data.startswith(b"AUDIO"): + try: + # Fixed-size header parsing + # Header: "AUDIO" (5 bytes) + sample_rate (4 bytes) + num_channels (2 bytes) + if len(data) < 11: # Minimum size for header + logger.error( + f"InternalSerializer: Data too short for header: {len(data)} bytes" + ) + return None + + # Extract fixed-size fields + # Skip header validation - we already checked startswith(b"AUDIO") + sample_rate = int.from_bytes(data[5:9], byteorder="big") + num_channels = int.from_bytes(data[9:11], byteorder="big") + + # Extract audio data - everything after the header + audio_data = data[11:] + + # Check if audio data length is valid + if len(audio_data) % 2 != 0: + logger.warning( + f"InternalSerializer: Audio data has odd length: {len(audio_data)}" + ) + + # Convert to InputAudioRawFrame for the receiving agent + return InputAudioRawFrame( + audio=audio_data, num_channels=num_channels, sample_rate=sample_rate + ) + except Exception as e: + logger.error(f"Failed to deserialize audio frame: {e}") + return None + + return None diff --git a/api/services/looptalk/internal_transport.py b/api/services/looptalk/internal_transport.py new file mode 100644 index 0000000..00bfc93 --- /dev/null +++ b/api/services/looptalk/internal_transport.py @@ -0,0 +1,405 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Internal transport for in-memory agent-to-agent communication.""" + +import asyncio +import time +from typing import Dict, Optional, Tuple + +from loguru import logger +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + InputAudioRawFrame, + OutputAudioRawFrame, + OutputDTMFFrame, + OutputDTMFUrgentFrame, + OutputImageRawFrame, + StartFrame, + StopFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.transports.base_input import BaseInputTransport +from pipecat.transports.base_output import BaseOutputTransport +from pipecat.transports.base_transport import BaseTransport, TransportParams + +from api.services.looptalk.internal_serializer import InternalFrameSerializer + + +class InternalInputTransport(BaseInputTransport): + """Input side of internal transport for agent-to-agent communication.""" + + def __init__( + self, + transport: Optional["InternalTransport"], + params: TransportParams, + **kwargs, + ): + """Initialize internal input transport. + + Args: + transport: The parent InternalTransport instance. + params: Transport parameters for configuration. + **kwargs: Additional keyword arguments including latency_seconds. + """ + # Extract latency configuration before passing to parent + self._latency_seconds = kwargs.pop("latency_seconds", 0.0) + + super().__init__(params, **kwargs) + self._transport = transport + self._queue: asyncio.Queue[bytes] = asyncio.Queue() + self._partner: Optional["InternalOutputTransport"] = None + self._running = False + self._connected = False + self._serializer = InternalFrameSerializer() + # Queue for delayed packets (timestamp, data) + self._delayed_queue: asyncio.Queue[Tuple[float, bytes]] = asyncio.Queue() + self._latency_task: Optional[asyncio.Task] = None + + def set_partner(self, partner: "InternalOutputTransport"): + """Connect this input transport to an output transport.""" + self._partner = partner + + async def receive_data(self, data: bytes): + """Receive serialized data from the partner output transport.""" + # logger.debug("received data in input transport") + if self._latency_seconds > 0: + # Add to delayed queue with delivery timestamp + delivery_time = time.monotonic() + self._latency_seconds + await self._delayed_queue.put((delivery_time, data)) + else: + # No latency, put directly in the main queue + await self._queue.put(data) + + async def start(self, frame: StartFrame): + """Start the input transport.""" + self._running = True + await super().start(frame) + await self._serializer.setup(frame) + + # Set transport ready to initialize audio task for VAD processing + await self.set_transport_ready(frame) + + # Trigger on_client_connected event for InternalTransport (only once) + if hasattr(self, "_transport") and self._transport and not self._connected: + self._connected = True + await self._transport._call_event_handler( + "on_client_connected", self._transport + ) + + # Start latency processor if latency is configured + if self._latency_seconds > 0: + self._latency_task = asyncio.create_task(self._latency_processor()) + + asyncio.create_task(self._run()) + + async def stop(self, frame: EndFrame | StopFrame | None = None): + """Stop the input transport.""" + self._running = False + + # Stop latency processor + if self._latency_task: + self._latency_task.cancel() + try: + await self._latency_task + except asyncio.CancelledError: + pass + self._latency_task = None + + await super().stop(frame) + + # Trigger on_client_disconnected event for InternalTransport + if hasattr(self, "_transport") and self._transport: + await self._transport._call_event_handler( + "on_client_disconnected", self._transport + ) + + async def _run(self): + """Main loop to process incoming data.""" + while self._running: + try: + data = await asyncio.wait_for(self._queue.get(), timeout=0.1) + + # Deserialize the data + frame = await self._serializer.deserialize(data) + if frame: + if isinstance(frame, InputAudioRawFrame): + # Debug received audio + try: + import numpy as np + + # Check if audio length is valid for int16 + if len(frame.audio) % 2 != 0: + logger.error( + f"InternalInput: Audio buffer has odd length: {len(frame.audio)}" + ) + else: + audio_array = np.frombuffer(frame.audio, dtype=np.int16) + # logger.debug(f"InternalInput: Received audio - size: {len(frame.audio)} bytes, " + # f"samples: {len(audio_array)}, min: {audio_array.min()}, max: {audio_array.max()}, " + # f"sample_rate: {frame.sample_rate}") + except Exception as e: + logger.error(f"InternalInput: Error analyzing audio: {e}") + + # Use the base class's audio processing which includes VAD + await self.push_audio_frame(frame) + else: + # For non-audio frames, push directly + await self.push_frame(frame, FrameDirection.DOWNSTREAM) + + except asyncio.TimeoutError: + continue + except Exception as e: + logger.error(f"Error in internal input transport: {e}") + + async def _latency_processor(self): + """Process delayed packets and deliver them after the configured latency.""" + logger.info( + f"InternalInput: Started latency processor with {self._latency_seconds}s delay" + ) + + # Use a list to maintain order (we'll process in FIFO order) + pending_packets = [] + + while self._running: + try: + # Get all new packets from the delayed queue (non-blocking) + while True: + try: + packet = self._delayed_queue.get_nowait() + pending_packets.append(packet) + except asyncio.QueueEmpty: + break + + # Process packets that are ready + current_time = time.monotonic() + delivered = [] + + for i, (delivery_time, data) in enumerate(pending_packets): + if current_time >= delivery_time: + # Time to deliver this packet + await self._queue.put(data) + delivered.append(i) + + # Remove delivered packets (in reverse order to maintain indices) + for i in reversed(delivered): + pending_packets.pop(i) + + # Sleep briefly before next check + await asyncio.sleep(0.005) # 5ms for more responsive delivery + + except asyncio.CancelledError: + # Deliver any remaining packets immediately on shutdown + for _, data in pending_packets: + await self._queue.put(data) + break + except Exception as e: + logger.error(f"Error in latency processor: {e}") + await asyncio.sleep(0.01) + + logger.info("InternalInput: Stopped latency processor") + + +class InternalOutputTransport(BaseOutputTransport): + """Output side of internal transport for agent-to-agent communication.""" + + def __init__(self, params: TransportParams, **kwargs): + """Initialize internal output transport. + + Args: + params: Transport parameters for configuration. + **kwargs: Additional keyword arguments. + """ + super().__init__(params, **kwargs) + self._partner: Optional[InternalInputTransport] = None + self._serializer = InternalFrameSerializer() + + # Audio timing synchronization (similar to WebsocketServerOutputTransport) + # _send_interval is the time interval between audio chunks in seconds + self._send_interval = 0 + self._next_send_time = 0 + + def set_partner(self, partner: InternalInputTransport): + """Connect this output transport to an input transport.""" + self._partner = partner + + async def start(self, frame: StartFrame): + """Start the output transport.""" + await super().start(frame) + await self._serializer.setup(frame) + # Calculate the send interval based on audio chunk size (like WebsocketServerOutputTransport) + self._send_interval = ( + self._params.audio_out_10ms_chunks * 10 / 1000 + ) # Convert ms to seconds + await self.set_transport_ready(frame) + + async def write_audio_frame(self, frame: OutputAudioRawFrame): + """Write audio frame to partner through serializer with proper timing.""" + # Debug audio characteristics + # import numpy as np + # audio_array = np.frombuffer(frame.audio, dtype=np.int16) + # logger.debug(f"InternalOutput: Sending audio - type: {type(frame).__name__}, size: {len(frame.audio)} bytes, " + # f"samples: {len(audio_array)}, min: {audio_array.min()}, max: {audio_array.max()}, " + # f"sample_rate: {frame.sample_rate}") + + # Serialize and send the audio first + data = await self._serializer.serialize(frame) + if data and self._partner: + await self._partner.receive_data(data) + + # logger.debug(f"InternalOutput: Sent audio frame to partner") + + # Then simulate audio playback timing (following WebsocketServerOutputTransport pattern) + await self._write_audio_sleep() + + async def write_video_frame(self, _frame: OutputImageRawFrame): + """Internal transport doesn't support video.""" + pass + + async def write_dtmf(self, _frame: OutputDTMFFrame | OutputDTMFUrgentFrame): + """Internal transport doesn't support DTMF.""" + pass + + async def stop(self, frame: EndFrame): + """Stop the output transport and reset timing.""" + await super().stop(frame) + self._next_send_time = 0 + + async def cancel(self, frame: CancelFrame): + """Cancel the output transport and reset timing.""" + await super().cancel(frame) + self._next_send_time = 0 + + async def _write_audio_sleep(self): + """Simulate audio playback timing (following WebsocketServerOutputTransport pattern).""" + # Simulate a clock to ensure audio is sent at real-time pace + current_time = time.monotonic() + sleep_duration = max(0, self._next_send_time - current_time) + await asyncio.sleep(sleep_duration) + if sleep_duration == 0: + self._next_send_time = time.monotonic() + self._send_interval + else: + self._next_send_time += self._send_interval + + +class InternalTransport(BaseTransport): + """Internal transport for in-memory agent-to-agent communication.""" + + def __init__(self, params: TransportParams, **kwargs): + """Initialize internal transport. + + Args: + params: Transport parameters for configuration. + **kwargs: Additional keyword arguments including latency_seconds. + """ + # Extract latency configuration before passing to parent + self._latency_seconds = kwargs.pop("latency_seconds", 0.0) + + super().__init__(**kwargs) + self._params = params + + # Create input and output transports + self._input = InternalInputTransport( + self, + params, + name=self._input_name or f"{self.name}#input", + latency_seconds=self._latency_seconds, + ) + self._output = InternalOutputTransport( + params, name=self._output_name or f"{self.name}#output" + ) + + # Register supported event handlers + self._register_event_handler("on_client_connected") + self._register_event_handler("on_client_disconnected") + + def input(self) -> InternalInputTransport: + """Get the input transport.""" + return self._input + + def output(self) -> InternalOutputTransport: + """Get the output transport.""" + return self._output + + def connect_partner(self, partner: "InternalTransport"): + """Connect this transport to another internal transport.""" + # Connect output of this transport to input of partner + self._output.set_partner(partner._input) + # Connect output of partner to input of this transport + partner._output.set_partner(self._input) + + +class InternalTransportManager: + """Manages multiple internal transport pairs for load testing.""" + + def __init__(self): + """Initialize internal transport manager.""" + self._transport_pairs: Dict[ + str, Tuple[InternalTransport, InternalTransport] + ] = {} + + def create_transport_pair( + self, + test_session_id: str, + actor_params: TransportParams, + adversary_params: TransportParams, + latency_seconds: float = 0.0, + ) -> Tuple[InternalTransport, InternalTransport]: + """Create a connected pair of internal transports. + + Args: + test_session_id: Unique identifier for the test session. + actor_params: Transport parameters for the actor. + adversary_params: Transport parameters for the adversary. + latency_seconds: Simulated network latency in seconds (default: 0.0). + + Returns: + Tuple of (actor_transport, adversary_transport). + """ + # Create actor transport with latency + actor_transport = InternalTransport( + params=actor_params, + name=f"actor-{test_session_id}", + latency_seconds=latency_seconds, + ) + + # Create adversary transport with latency + adversary_transport = InternalTransport( + params=adversary_params, + name=f"adversary-{test_session_id}", + latency_seconds=latency_seconds, + ) + + # Connect them + actor_transport.connect_partner(adversary_transport) + + # Store the pair + self._transport_pairs[test_session_id] = (actor_transport, adversary_transport) + + logger.info( + f"Created internal transport pair for test session: {test_session_id} with {latency_seconds}s latency" + ) + + return actor_transport, adversary_transport + + def get_transport_pair( + self, test_session_id: str + ) -> Optional[Tuple[InternalTransport, InternalTransport]]: + """Get an existing transport pair.""" + return self._transport_pairs.get(test_session_id) + + def remove_transport_pair(self, test_session_id: str): + """Remove a transport pair.""" + if test_session_id in self._transport_pairs: + del self._transport_pairs[test_session_id] + logger.info( + f"Removed internal transport pair for test session: {test_session_id}" + ) + + def get_active_test_count(self) -> int: + """Get the number of active test sessions.""" + return len(self._transport_pairs) diff --git a/api/services/looptalk/orchestrator.py b/api/services/looptalk/orchestrator.py index 11ecb4c..aed3852 100644 --- a/api/services/looptalk/orchestrator.py +++ b/api/services/looptalk/orchestrator.py @@ -7,13 +7,13 @@ from typing import Any, Dict, Optional from loguru import logger from pipecat.pipeline.task import PipelineTask -from pipecat.transports import ( - InternalTransport, - InternalTransportManager, -) from pipecat.utils.context import set_current_run_id from api.db.db_client import DBClient +from api.services.looptalk.internal_transport import ( + InternalTransport, + InternalTransportManager, +) from api.services.pipecat.transport_setup import create_internal_transport from .core.pipeline_builder import LoopTalkPipelineBuilder diff --git a/api/services/mps_service_key_client.py b/api/services/mps_service_key_client.py index 8f49ad5..d9f0f7e 100644 --- a/api/services/mps_service_key_client.py +++ b/api/services/mps_service_key_client.py @@ -65,12 +65,15 @@ class MPSServiceKeyClient: data = response.json() # Transform the response to match our expected format return { - "id": data.get("id"), - "name": data.get("name") or name, - "service_key": data.get("service_key"), - "key_prefix": data.get("key_prefix") or (data.get("service_key", "")[:8] - if data.get("service_key") - else ""), + "id": data.get("id"), + "name": data.get("name") or name, + "service_key": data.get("service_key"), + "key_prefix": data.get("key_prefix") + or ( + data.get("service_key", "")[:8] + if data.get("service_key") + else "" + ), "expires_at": data.get("expires_at"), "created_at": data.get("created_at"), "is_active": data.get("is_active", True), diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 564fb3f..2cc4029 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -46,7 +46,7 @@ from pipecat.processors.filters.stt_mute_filter import ( STTMuteStrategy, ) from pipecat.processors.user_idle_processor import UserIdleProcessor -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection from pipecat.utils.context import set_current_run_id from pipecat.utils.tracing.context_registry import ContextProviderRegistry diff --git a/api/services/pipecat/service_factory.py b/api/services/pipecat/service_factory.py index 69fa579..5dfbe09 100644 --- a/api/services/pipecat/service_factory.py +++ b/api/services/pipecat/service_factory.py @@ -63,9 +63,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): """ if user_config.tts.provider == ServiceProviders.DEEPGRAM.value: return DeepgramTTSService( - api_key=user_config.tts.api_key, - voice=user_config.tts.voice.value, - sample_rate=24000, + api_key=user_config.tts.api_key, voice=user_config.tts.voice.value ) elif user_config.tts.provider == ServiceProviders.OPENAI.value: return OpenAITTSService( @@ -91,7 +89,6 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): api_key=user_config.tts.api_key, model=user_config.tts.model.value, voice=user_config.tts.voice.value, - sample_rate=24000, ) else: raise HTTPException( diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index 19ab013..9225163 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -3,6 +3,7 @@ import os from fastapi import WebSocket from api.constants import APP_ROOT_DIR, ENABLE_RNNOISE, ENABLE_SMART_TURN +from api.services.looptalk.internal_transport import InternalTransport from api.services.pipecat.audio_config import AudioConfig from api.services.smart_turn.websocket_smart_turn import ( WebSocketSmartTurnAnalyzer, @@ -14,19 +15,18 @@ from api.services.telephony.stasis_rtp_transport import ( StasisRTPTransportParams, ) from pipecat.audio.filters.rnnoise_filter import RNNoiseFilter -from pipecat.audio.mixers.silence_audio_mixer import SilenceAudioMixer +from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams from pipecat.serializers.twilio import TwilioFrameSerializer -from pipecat.transports import InternalTransport from pipecat.transports.base_transport import TransportParams -from pipecat.transports.network.fastapi_websocket import ( +from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection +from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport +from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, ) -from pipecat.transports.network.small_webrtc import SmallWebRTCTransport -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection librnnoise_path = os.path.normpath( str(APP_ROOT_DIR / "native" / "rnnoise" / "librnnoise.so") diff --git a/api/services/smart_turn/websocket_smart_turn.py b/api/services/smart_turn/websocket_smart_turn.py index 0383cfa..82a7e6f 100644 --- a/api/services/smart_turn/websocket_smart_turn.py +++ b/api/services/smart_turn/websocket_smart_turn.py @@ -122,9 +122,9 @@ class WebSocketSmartTurnAnalyzer(BaseSmartTurn): logger.debug("Establishing new WebSocket connection to Smart-Turn service...") # Prepare headers - extra_headers = dict(self._headers) + additional_headers = dict(self._headers) if self._service_context is not None: - extra_headers["X-Service-Context"] = str(self._service_context) + additional_headers["X-Service-Context"] = str(self._service_context) # _init_sample_rate is being set in the constructor, which we should # use in case self._sample_rate is not set yet. The actual _sample_rate @@ -135,7 +135,7 @@ class WebSocketSmartTurnAnalyzer(BaseSmartTurn): _sample_rate = self._sample_rate or self._init_sample_rate if _sample_rate > 0: - extra_headers["X-Sample-Rate"] = str(_sample_rate) + additional_headers["X-Sample-Rate"] = str(_sample_rate) max_attempts = 3 for attempt in range(max_attempts): @@ -148,7 +148,7 @@ class WebSocketSmartTurnAnalyzer(BaseSmartTurn): # Connect with websockets library self._ws = await websockets.connect( self._url, - extra_headers=extra_headers, + additional_headers=additional_headers, ping_interval=5.0, # let websockets send pings every 5s ping_timeout=3.0, # fail fast if no pong in 3s close_timeout=10, diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index 23db1fa..323bf56 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -480,9 +480,9 @@ class PipecatEngine: async def _handle_start_node(self, node: Node) -> None: """Handle start node execution.""" # Handle voicemail detection setup (before any returns) - # Lets check ENABLE_TRACING to make sure we have prompt access from + # Lets check ENABLE_TRACING to make sure we have prompt access from # langfuse - if node.detect_voicemail and DEPLOYMENT_MODE == 'saas' and ENABLE_TRACING: + if node.detect_voicemail and DEPLOYMENT_MODE == "saas" and ENABLE_TRACING: if not self._audio_buffer: logger.warning( "Voicemail detection enabled but no audio buffer available - skipping detection" From d89bb84dd161011bd2d3d050a89db61e60aa6586 Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Sat, 20 Sep 2025 15:39:22 +0530 Subject: [PATCH 2/2] fix: pipecat commit hash & add webrtc in requirements.txt --- api/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/requirements.txt b/api/requirements.txt index e6abd58..f85699d 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,4 +1,4 @@ -pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero] @ git+https://github.com/dograh-hq/pipecat.git@0c5091d +pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero,webrtc] @ git+https://github.com/dograh-hq/pipecat.git@9dbd5eb langfuse==3.4.0 fastapi==0.116.2 asyncpg==0.30.0