mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
Merge pull request #6 from dograh-hq/optimise-webrtc
Optimise requirements.txt and update pipecat imports
This commit is contained in:
commit
70c58e1a40
14 changed files with 536 additions and 152 deletions
|
|
@ -70,7 +70,7 @@ class WorkflowRunClient(BaseDBClient):
|
|||
|
||||
# Get the current storage backend based on ENABLE_AWS_S3 flag
|
||||
current_backend = StorageBackend.get_current_backend()
|
||||
|
||||
|
||||
new_run = WorkflowRunModel(
|
||||
name=name,
|
||||
workflow=workflow,
|
||||
|
|
|
|||
|
|
@ -1,121 +1,16 @@
|
|||
aenum==3.1.16
|
||||
aiofiles==24.1.0
|
||||
aiohappyeyeballs==2.6.1
|
||||
aiohttp==3.11.16
|
||||
aiosignal==1.3.2
|
||||
alembic==1.15.2
|
||||
amqp==5.3.1
|
||||
annotated-types==0.7.0
|
||||
anyio==4.9.0
|
||||
pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure,soundfile,silero,webrtc] @ git+https://github.com/dograh-hq/pipecat.git@9dbd5eb
|
||||
langfuse==3.4.0
|
||||
fastapi==0.116.2
|
||||
asyncpg==0.30.0
|
||||
attrs==25.3.0
|
||||
audioop-lts==0.2.1; python_version>='3.13'
|
||||
av==14.2.0
|
||||
backoff==2.2.1
|
||||
billiard==4.2.1
|
||||
certifi==2025.1.31
|
||||
cffi==1.17.1
|
||||
charset-normalizer==3.4.1
|
||||
click==8.1.8
|
||||
click-didyoumean==0.3.1
|
||||
click-plugins==1.1.1
|
||||
click-repl==0.3.0
|
||||
colorama==0.4.6
|
||||
coloredlogs==15.0.1
|
||||
dataclasses-json==0.6.7
|
||||
deprecation==2.1.0
|
||||
distro==1.9.0
|
||||
docstring_parser==0.16
|
||||
eval_type_backport==0.2.2
|
||||
fastapi==0.115.12
|
||||
filelock==3.18.0
|
||||
flatbuffers==25.2.10
|
||||
frozenlist==1.5.0
|
||||
fsspec==2025.3.2
|
||||
future==1.0.0
|
||||
greenlet==3.1.1
|
||||
h11==0.14.0
|
||||
httpcore==1.0.7
|
||||
httpx==0.28.1
|
||||
huggingface-hub==0.30.1
|
||||
humanfriendly==10.0
|
||||
idna==3.10
|
||||
Jinja2==3.1.6
|
||||
jiter==0.9.0
|
||||
jmespath==1.0.1
|
||||
kombu==5.5.2
|
||||
langfuse==3.2.7
|
||||
llvmlite==0.44.0
|
||||
loguru==0.7.3
|
||||
Mako==1.3.9
|
||||
Markdown==3.8
|
||||
MarkupSafe==3.0.2
|
||||
marshmallow==3.26.1
|
||||
mpmath==1.3.0
|
||||
multidict==6.2.0
|
||||
mypy_extensions==1.1.0
|
||||
numba==0.61.2
|
||||
numpy==1.26.4
|
||||
onnxruntime==1.21.0
|
||||
packaging==24.2
|
||||
pillow==11.1.0
|
||||
pipecat-ai[cartesia,deepgram,openai,elevenlabs,groq,google,azure] @ git+https://github.com/dograh-hq/pipecat.git@main
|
||||
prompt_toolkit==3.0.51
|
||||
propcache==0.3.1
|
||||
protobuf==5.29.4
|
||||
psutil==7.0.0
|
||||
pycparser==2.22
|
||||
pydantic==2.10.6
|
||||
pydantic_core==2.27.2
|
||||
PyJWT==2.10.1
|
||||
pyloudnorm==0.1.1
|
||||
python-dateutil==2.9.0.post0
|
||||
python-dotenv==1.1.0
|
||||
PyYAML==6.0.2
|
||||
redis==5.2.1
|
||||
regex==2024.11.6
|
||||
requests==2.32.3
|
||||
resampy==0.4.3
|
||||
safetensors==0.5.3
|
||||
scipy==1.15.2
|
||||
setuptools==75.8.0
|
||||
six==1.17.0
|
||||
sniffio==1.3.1
|
||||
sounddevice==0.5.1
|
||||
soxr==0.5.0.post1
|
||||
SQLAlchemy==2.0.40
|
||||
starlette==0.46.1
|
||||
sympy==1.13.3
|
||||
tokenizers==0.21.1
|
||||
tqdm==4.67.1
|
||||
transformers==4.50.3
|
||||
types-protobuf==4.25.0.20240417
|
||||
typing-inspect==0.9.0
|
||||
typing-inspection==0.4.0
|
||||
typing_extensions==4.13.0
|
||||
tzdata==2025.2
|
||||
urllib3==2.3.0
|
||||
uvicorn==0.34.0
|
||||
vine==5.1.0
|
||||
watchfiles==1.0.4
|
||||
wcwidth==0.2.13
|
||||
websockets==13.1
|
||||
wheel==0.45.1
|
||||
wrapt==1.17.2
|
||||
yarl==1.18.3
|
||||
aiortc==1.11.0
|
||||
opencv-python-headless==4.11.0.86
|
||||
aioboto3==14.3.0
|
||||
alembic==1.16.5
|
||||
redis==5.3.1
|
||||
uvicorn==0.35.0
|
||||
aioboto3==15.1.0
|
||||
arq==0.26.3
|
||||
opentelemetry-sdk==1.33.1
|
||||
opentelemetry-api==1.33.1
|
||||
opentelemetry-exporter-otlp==1.33.1
|
||||
torch==2.7.0
|
||||
torchaudio==2.7.0
|
||||
axiom-py==0.9.0
|
||||
sentry-sdk[fastapi]==2.30.0
|
||||
twilio==9.7.0
|
||||
twilio==9.8.0
|
||||
minio==7.2.16
|
||||
alembic-postgresql-enum==1.8.0
|
||||
python-multipart==0.0.20
|
||||
soundfile==0.13.1
|
||||
minio==7.2.7
|
||||
alembic-postgresql-enum==1.8.0
|
||||
sentry-sdk[fastapi]==2.38.0
|
||||
sqlalchemy[asyncio]==2.0.43
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ from typing import Dict
|
|||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends
|
||||
from loguru import logger
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
|
|
|||
|
|
@ -1,11 +1,10 @@
|
|||
import asyncio
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import json
|
||||
from typing import Any, BinaryIO, Dict, Optional
|
||||
|
||||
from loguru import logger
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
import json
|
||||
|
||||
from .base import BaseFileSystem
|
||||
|
||||
|
|
@ -48,7 +47,7 @@ class MinioFileSystem(BaseFileSystem):
|
|||
try:
|
||||
if not self.client.bucket_exists(self.bucket_name):
|
||||
self.client.make_bucket(self.bucket_name)
|
||||
|
||||
|
||||
# Set anonymous download policy for local development
|
||||
# This allows unsigned URLs to work
|
||||
policy = {
|
||||
|
|
@ -58,11 +57,11 @@ class MinioFileSystem(BaseFileSystem):
|
|||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": ["s3:GetObject"],
|
||||
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"]
|
||||
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
|
||||
except Exception as e:
|
||||
# Bucket might already exist or we might be in a restricted environment
|
||||
|
|
|
|||
|
|
@ -9,10 +9,10 @@ from pipecat.processors.filters.stt_mute_filter import (
|
|||
STTMuteFilter,
|
||||
STTMuteStrategy,
|
||||
)
|
||||
from pipecat.transports import InternalTransport
|
||||
|
||||
from api.db.db_client import DBClient
|
||||
from api.services.looptalk.audio_streamer import get_or_create_audio_streamer
|
||||
from api.services.looptalk.internal_transport import InternalTransport
|
||||
from api.services.pipecat.audio_config import AudioConfig
|
||||
from api.services.pipecat.pipeline_builder import (
|
||||
create_pipeline_components,
|
||||
|
|
|
|||
85
api/services/looptalk/internal_serializer.py
Normal file
85
api/services/looptalk/internal_serializer.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Internal frame serializer for agent-to-agent communication."""
|
||||
|
||||
from loguru import logger
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
StartFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
|
||||
|
||||
class InternalFrameSerializer(FrameSerializer):
|
||||
"""Serializer for InternalTransport that filters frames between agents.
|
||||
|
||||
This serializer ensures only audio frames are passed between agents,
|
||||
preventing control frames from creating infinite loops.
|
||||
"""
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
"""Internal transport uses binary frames."""
|
||||
return FrameSerializerType.BINARY
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
"""No setup required for internal transport."""
|
||||
pass
|
||||
|
||||
async def serialize(self, frame: Frame) -> bytes | None:
|
||||
"""Only serialize audio frames for transmission between agents."""
|
||||
# Only pass audio frames between agents
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
# Use a fixed-size header to avoid parsing issues with binary data
|
||||
# Format: "AUDIO" (5 bytes) + sample_rate (4 bytes) + num_channels (2 bytes) + audio data
|
||||
header = b"AUDIO"
|
||||
sample_rate_bytes = frame.sample_rate.to_bytes(4, byteorder="big")
|
||||
num_channels_bytes = frame.num_channels.to_bytes(2, byteorder="big")
|
||||
|
||||
serialized = header + sample_rate_bytes + num_channels_bytes + frame.audio
|
||||
return serialized
|
||||
|
||||
# Don't pass control frames between agents
|
||||
return None
|
||||
|
||||
async def deserialize(self, data: bytes) -> Frame | None:
|
||||
"""Deserialize audio frames from partner agent."""
|
||||
if data.startswith(b"AUDIO"):
|
||||
try:
|
||||
# Fixed-size header parsing
|
||||
# Header: "AUDIO" (5 bytes) + sample_rate (4 bytes) + num_channels (2 bytes)
|
||||
if len(data) < 11: # Minimum size for header
|
||||
logger.error(
|
||||
f"InternalSerializer: Data too short for header: {len(data)} bytes"
|
||||
)
|
||||
return None
|
||||
|
||||
# Extract fixed-size fields
|
||||
# Skip header validation - we already checked startswith(b"AUDIO")
|
||||
sample_rate = int.from_bytes(data[5:9], byteorder="big")
|
||||
num_channels = int.from_bytes(data[9:11], byteorder="big")
|
||||
|
||||
# Extract audio data - everything after the header
|
||||
audio_data = data[11:]
|
||||
|
||||
# Check if audio data length is valid
|
||||
if len(audio_data) % 2 != 0:
|
||||
logger.warning(
|
||||
f"InternalSerializer: Audio data has odd length: {len(audio_data)}"
|
||||
)
|
||||
|
||||
# Convert to InputAudioRawFrame for the receiving agent
|
||||
return InputAudioRawFrame(
|
||||
audio=audio_data, num_channels=num_channels, sample_rate=sample_rate
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to deserialize audio frame: {e}")
|
||||
return None
|
||||
|
||||
return None
|
||||
405
api/services/looptalk/internal_transport.py
Normal file
405
api/services/looptalk/internal_transport.py
Normal file
|
|
@ -0,0 +1,405 @@
|
|||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Internal transport for in-memory agent-to-agent communication."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
from loguru import logger
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputDTMFFrame,
|
||||
OutputDTMFUrgentFrame,
|
||||
OutputImageRawFrame,
|
||||
StartFrame,
|
||||
StopFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
|
||||
from api.services.looptalk.internal_serializer import InternalFrameSerializer
|
||||
|
||||
|
||||
class InternalInputTransport(BaseInputTransport):
|
||||
"""Input side of internal transport for agent-to-agent communication."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
transport: Optional["InternalTransport"],
|
||||
params: TransportParams,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize internal input transport.
|
||||
|
||||
Args:
|
||||
transport: The parent InternalTransport instance.
|
||||
params: Transport parameters for configuration.
|
||||
**kwargs: Additional keyword arguments including latency_seconds.
|
||||
"""
|
||||
# Extract latency configuration before passing to parent
|
||||
self._latency_seconds = kwargs.pop("latency_seconds", 0.0)
|
||||
|
||||
super().__init__(params, **kwargs)
|
||||
self._transport = transport
|
||||
self._queue: asyncio.Queue[bytes] = asyncio.Queue()
|
||||
self._partner: Optional["InternalOutputTransport"] = None
|
||||
self._running = False
|
||||
self._connected = False
|
||||
self._serializer = InternalFrameSerializer()
|
||||
# Queue for delayed packets (timestamp, data)
|
||||
self._delayed_queue: asyncio.Queue[Tuple[float, bytes]] = asyncio.Queue()
|
||||
self._latency_task: Optional[asyncio.Task] = None
|
||||
|
||||
def set_partner(self, partner: "InternalOutputTransport"):
|
||||
"""Connect this input transport to an output transport."""
|
||||
self._partner = partner
|
||||
|
||||
async def receive_data(self, data: bytes):
|
||||
"""Receive serialized data from the partner output transport."""
|
||||
# logger.debug("received data in input transport")
|
||||
if self._latency_seconds > 0:
|
||||
# Add to delayed queue with delivery timestamp
|
||||
delivery_time = time.monotonic() + self._latency_seconds
|
||||
await self._delayed_queue.put((delivery_time, data))
|
||||
else:
|
||||
# No latency, put directly in the main queue
|
||||
await self._queue.put(data)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the input transport."""
|
||||
self._running = True
|
||||
await super().start(frame)
|
||||
await self._serializer.setup(frame)
|
||||
|
||||
# Set transport ready to initialize audio task for VAD processing
|
||||
await self.set_transport_ready(frame)
|
||||
|
||||
# Trigger on_client_connected event for InternalTransport (only once)
|
||||
if hasattr(self, "_transport") and self._transport and not self._connected:
|
||||
self._connected = True
|
||||
await self._transport._call_event_handler(
|
||||
"on_client_connected", self._transport
|
||||
)
|
||||
|
||||
# Start latency processor if latency is configured
|
||||
if self._latency_seconds > 0:
|
||||
self._latency_task = asyncio.create_task(self._latency_processor())
|
||||
|
||||
asyncio.create_task(self._run())
|
||||
|
||||
async def stop(self, frame: EndFrame | StopFrame | None = None):
|
||||
"""Stop the input transport."""
|
||||
self._running = False
|
||||
|
||||
# Stop latency processor
|
||||
if self._latency_task:
|
||||
self._latency_task.cancel()
|
||||
try:
|
||||
await self._latency_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._latency_task = None
|
||||
|
||||
await super().stop(frame)
|
||||
|
||||
# Trigger on_client_disconnected event for InternalTransport
|
||||
if hasattr(self, "_transport") and self._transport:
|
||||
await self._transport._call_event_handler(
|
||||
"on_client_disconnected", self._transport
|
||||
)
|
||||
|
||||
async def _run(self):
|
||||
"""Main loop to process incoming data."""
|
||||
while self._running:
|
||||
try:
|
||||
data = await asyncio.wait_for(self._queue.get(), timeout=0.1)
|
||||
|
||||
# Deserialize the data
|
||||
frame = await self._serializer.deserialize(data)
|
||||
if frame:
|
||||
if isinstance(frame, InputAudioRawFrame):
|
||||
# Debug received audio
|
||||
try:
|
||||
import numpy as np
|
||||
|
||||
# Check if audio length is valid for int16
|
||||
if len(frame.audio) % 2 != 0:
|
||||
logger.error(
|
||||
f"InternalInput: Audio buffer has odd length: {len(frame.audio)}"
|
||||
)
|
||||
else:
|
||||
audio_array = np.frombuffer(frame.audio, dtype=np.int16)
|
||||
# logger.debug(f"InternalInput: Received audio - size: {len(frame.audio)} bytes, "
|
||||
# f"samples: {len(audio_array)}, min: {audio_array.min()}, max: {audio_array.max()}, "
|
||||
# f"sample_rate: {frame.sample_rate}")
|
||||
except Exception as e:
|
||||
logger.error(f"InternalInput: Error analyzing audio: {e}")
|
||||
|
||||
# Use the base class's audio processing which includes VAD
|
||||
await self.push_audio_frame(frame)
|
||||
else:
|
||||
# For non-audio frames, push directly
|
||||
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Error in internal input transport: {e}")
|
||||
|
||||
async def _latency_processor(self):
|
||||
"""Process delayed packets and deliver them after the configured latency."""
|
||||
logger.info(
|
||||
f"InternalInput: Started latency processor with {self._latency_seconds}s delay"
|
||||
)
|
||||
|
||||
# Use a list to maintain order (we'll process in FIFO order)
|
||||
pending_packets = []
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
# Get all new packets from the delayed queue (non-blocking)
|
||||
while True:
|
||||
try:
|
||||
packet = self._delayed_queue.get_nowait()
|
||||
pending_packets.append(packet)
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
|
||||
# Process packets that are ready
|
||||
current_time = time.monotonic()
|
||||
delivered = []
|
||||
|
||||
for i, (delivery_time, data) in enumerate(pending_packets):
|
||||
if current_time >= delivery_time:
|
||||
# Time to deliver this packet
|
||||
await self._queue.put(data)
|
||||
delivered.append(i)
|
||||
|
||||
# Remove delivered packets (in reverse order to maintain indices)
|
||||
for i in reversed(delivered):
|
||||
pending_packets.pop(i)
|
||||
|
||||
# Sleep briefly before next check
|
||||
await asyncio.sleep(0.005) # 5ms for more responsive delivery
|
||||
|
||||
except asyncio.CancelledError:
|
||||
# Deliver any remaining packets immediately on shutdown
|
||||
for _, data in pending_packets:
|
||||
await self._queue.put(data)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in latency processor: {e}")
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
logger.info("InternalInput: Stopped latency processor")
|
||||
|
||||
|
||||
class InternalOutputTransport(BaseOutputTransport):
|
||||
"""Output side of internal transport for agent-to-agent communication."""
|
||||
|
||||
def __init__(self, params: TransportParams, **kwargs):
|
||||
"""Initialize internal output transport.
|
||||
|
||||
Args:
|
||||
params: Transport parameters for configuration.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
super().__init__(params, **kwargs)
|
||||
self._partner: Optional[InternalInputTransport] = None
|
||||
self._serializer = InternalFrameSerializer()
|
||||
|
||||
# Audio timing synchronization (similar to WebsocketServerOutputTransport)
|
||||
# _send_interval is the time interval between audio chunks in seconds
|
||||
self._send_interval = 0
|
||||
self._next_send_time = 0
|
||||
|
||||
def set_partner(self, partner: InternalInputTransport):
|
||||
"""Connect this output transport to an input transport."""
|
||||
self._partner = partner
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the output transport."""
|
||||
await super().start(frame)
|
||||
await self._serializer.setup(frame)
|
||||
# Calculate the send interval based on audio chunk size (like WebsocketServerOutputTransport)
|
||||
self._send_interval = (
|
||||
self._params.audio_out_10ms_chunks * 10 / 1000
|
||||
) # Convert ms to seconds
|
||||
await self.set_transport_ready(frame)
|
||||
|
||||
async def write_audio_frame(self, frame: OutputAudioRawFrame):
|
||||
"""Write audio frame to partner through serializer with proper timing."""
|
||||
# Debug audio characteristics
|
||||
# import numpy as np
|
||||
# audio_array = np.frombuffer(frame.audio, dtype=np.int16)
|
||||
# logger.debug(f"InternalOutput: Sending audio - type: {type(frame).__name__}, size: {len(frame.audio)} bytes, "
|
||||
# f"samples: {len(audio_array)}, min: {audio_array.min()}, max: {audio_array.max()}, "
|
||||
# f"sample_rate: {frame.sample_rate}")
|
||||
|
||||
# Serialize and send the audio first
|
||||
data = await self._serializer.serialize(frame)
|
||||
if data and self._partner:
|
||||
await self._partner.receive_data(data)
|
||||
|
||||
# logger.debug(f"InternalOutput: Sent audio frame to partner")
|
||||
|
||||
# Then simulate audio playback timing (following WebsocketServerOutputTransport pattern)
|
||||
await self._write_audio_sleep()
|
||||
|
||||
async def write_video_frame(self, _frame: OutputImageRawFrame):
|
||||
"""Internal transport doesn't support video."""
|
||||
pass
|
||||
|
||||
async def write_dtmf(self, _frame: OutputDTMFFrame | OutputDTMFUrgentFrame):
|
||||
"""Internal transport doesn't support DTMF."""
|
||||
pass
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the output transport and reset timing."""
|
||||
await super().stop(frame)
|
||||
self._next_send_time = 0
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the output transport and reset timing."""
|
||||
await super().cancel(frame)
|
||||
self._next_send_time = 0
|
||||
|
||||
async def _write_audio_sleep(self):
|
||||
"""Simulate audio playback timing (following WebsocketServerOutputTransport pattern)."""
|
||||
# Simulate a clock to ensure audio is sent at real-time pace
|
||||
current_time = time.monotonic()
|
||||
sleep_duration = max(0, self._next_send_time - current_time)
|
||||
await asyncio.sleep(sleep_duration)
|
||||
if sleep_duration == 0:
|
||||
self._next_send_time = time.monotonic() + self._send_interval
|
||||
else:
|
||||
self._next_send_time += self._send_interval
|
||||
|
||||
|
||||
class InternalTransport(BaseTransport):
|
||||
"""Internal transport for in-memory agent-to-agent communication."""
|
||||
|
||||
def __init__(self, params: TransportParams, **kwargs):
|
||||
"""Initialize internal transport.
|
||||
|
||||
Args:
|
||||
params: Transport parameters for configuration.
|
||||
**kwargs: Additional keyword arguments including latency_seconds.
|
||||
"""
|
||||
# Extract latency configuration before passing to parent
|
||||
self._latency_seconds = kwargs.pop("latency_seconds", 0.0)
|
||||
|
||||
super().__init__(**kwargs)
|
||||
self._params = params
|
||||
|
||||
# Create input and output transports
|
||||
self._input = InternalInputTransport(
|
||||
self,
|
||||
params,
|
||||
name=self._input_name or f"{self.name}#input",
|
||||
latency_seconds=self._latency_seconds,
|
||||
)
|
||||
self._output = InternalOutputTransport(
|
||||
params, name=self._output_name or f"{self.name}#output"
|
||||
)
|
||||
|
||||
# Register supported event handlers
|
||||
self._register_event_handler("on_client_connected")
|
||||
self._register_event_handler("on_client_disconnected")
|
||||
|
||||
def input(self) -> InternalInputTransport:
|
||||
"""Get the input transport."""
|
||||
return self._input
|
||||
|
||||
def output(self) -> InternalOutputTransport:
|
||||
"""Get the output transport."""
|
||||
return self._output
|
||||
|
||||
def connect_partner(self, partner: "InternalTransport"):
|
||||
"""Connect this transport to another internal transport."""
|
||||
# Connect output of this transport to input of partner
|
||||
self._output.set_partner(partner._input)
|
||||
# Connect output of partner to input of this transport
|
||||
partner._output.set_partner(self._input)
|
||||
|
||||
|
||||
class InternalTransportManager:
|
||||
"""Manages multiple internal transport pairs for load testing."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize internal transport manager."""
|
||||
self._transport_pairs: Dict[
|
||||
str, Tuple[InternalTransport, InternalTransport]
|
||||
] = {}
|
||||
|
||||
def create_transport_pair(
|
||||
self,
|
||||
test_session_id: str,
|
||||
actor_params: TransportParams,
|
||||
adversary_params: TransportParams,
|
||||
latency_seconds: float = 0.0,
|
||||
) -> Tuple[InternalTransport, InternalTransport]:
|
||||
"""Create a connected pair of internal transports.
|
||||
|
||||
Args:
|
||||
test_session_id: Unique identifier for the test session.
|
||||
actor_params: Transport parameters for the actor.
|
||||
adversary_params: Transport parameters for the adversary.
|
||||
latency_seconds: Simulated network latency in seconds (default: 0.0).
|
||||
|
||||
Returns:
|
||||
Tuple of (actor_transport, adversary_transport).
|
||||
"""
|
||||
# Create actor transport with latency
|
||||
actor_transport = InternalTransport(
|
||||
params=actor_params,
|
||||
name=f"actor-{test_session_id}",
|
||||
latency_seconds=latency_seconds,
|
||||
)
|
||||
|
||||
# Create adversary transport with latency
|
||||
adversary_transport = InternalTransport(
|
||||
params=adversary_params,
|
||||
name=f"adversary-{test_session_id}",
|
||||
latency_seconds=latency_seconds,
|
||||
)
|
||||
|
||||
# Connect them
|
||||
actor_transport.connect_partner(adversary_transport)
|
||||
|
||||
# Store the pair
|
||||
self._transport_pairs[test_session_id] = (actor_transport, adversary_transport)
|
||||
|
||||
logger.info(
|
||||
f"Created internal transport pair for test session: {test_session_id} with {latency_seconds}s latency"
|
||||
)
|
||||
|
||||
return actor_transport, adversary_transport
|
||||
|
||||
def get_transport_pair(
|
||||
self, test_session_id: str
|
||||
) -> Optional[Tuple[InternalTransport, InternalTransport]]:
|
||||
"""Get an existing transport pair."""
|
||||
return self._transport_pairs.get(test_session_id)
|
||||
|
||||
def remove_transport_pair(self, test_session_id: str):
|
||||
"""Remove a transport pair."""
|
||||
if test_session_id in self._transport_pairs:
|
||||
del self._transport_pairs[test_session_id]
|
||||
logger.info(
|
||||
f"Removed internal transport pair for test session: {test_session_id}"
|
||||
)
|
||||
|
||||
def get_active_test_count(self) -> int:
|
||||
"""Get the number of active test sessions."""
|
||||
return len(self._transport_pairs)
|
||||
|
|
@ -7,13 +7,13 @@ from typing import Any, Dict, Optional
|
|||
|
||||
from loguru import logger
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.transports import (
|
||||
InternalTransport,
|
||||
InternalTransportManager,
|
||||
)
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
||||
from api.db.db_client import DBClient
|
||||
from api.services.looptalk.internal_transport import (
|
||||
InternalTransport,
|
||||
InternalTransportManager,
|
||||
)
|
||||
from api.services.pipecat.transport_setup import create_internal_transport
|
||||
|
||||
from .core.pipeline_builder import LoopTalkPipelineBuilder
|
||||
|
|
|
|||
|
|
@ -65,12 +65,15 @@ class MPSServiceKeyClient:
|
|||
data = response.json()
|
||||
# Transform the response to match our expected format
|
||||
return {
|
||||
"id": data.get("id"),
|
||||
"name": data.get("name") or name,
|
||||
"service_key": data.get("service_key"),
|
||||
"key_prefix": data.get("key_prefix") or (data.get("service_key", "")[:8]
|
||||
if data.get("service_key")
|
||||
else ""),
|
||||
"id": data.get("id"),
|
||||
"name": data.get("name") or name,
|
||||
"service_key": data.get("service_key"),
|
||||
"key_prefix": data.get("key_prefix")
|
||||
or (
|
||||
data.get("service_key", "")[:8]
|
||||
if data.get("service_key")
|
||||
else ""
|
||||
),
|
||||
"expires_at": data.get("expires_at"),
|
||||
"created_at": data.get("created_at"),
|
||||
"is_active": data.get("is_active", True),
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ from pipecat.processors.filters.stt_mute_filter import (
|
|||
STTMuteStrategy,
|
||||
)
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
from pipecat.utils.tracing.context_registry import ContextProviderRegistry
|
||||
|
||||
|
|
|
|||
|
|
@ -63,9 +63,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
|
|||
"""
|
||||
if user_config.tts.provider == ServiceProviders.DEEPGRAM.value:
|
||||
return DeepgramTTSService(
|
||||
api_key=user_config.tts.api_key,
|
||||
voice=user_config.tts.voice.value,
|
||||
sample_rate=24000,
|
||||
api_key=user_config.tts.api_key, voice=user_config.tts.voice.value
|
||||
)
|
||||
elif user_config.tts.provider == ServiceProviders.OPENAI.value:
|
||||
return OpenAITTSService(
|
||||
|
|
@ -91,7 +89,6 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
|
|||
api_key=user_config.tts.api_key,
|
||||
model=user_config.tts.model.value,
|
||||
voice=user_config.tts.voice.value,
|
||||
sample_rate=24000,
|
||||
)
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import os
|
|||
from fastapi import WebSocket
|
||||
|
||||
from api.constants import APP_ROOT_DIR, ENABLE_RNNOISE, ENABLE_SMART_TURN
|
||||
from api.services.looptalk.internal_transport import InternalTransport
|
||||
from api.services.pipecat.audio_config import AudioConfig
|
||||
from api.services.smart_turn.websocket_smart_turn import (
|
||||
WebSocketSmartTurnAnalyzer,
|
||||
|
|
@ -14,19 +15,18 @@ from api.services.telephony.stasis_rtp_transport import (
|
|||
StasisRTPTransportParams,
|
||||
)
|
||||
from pipecat.audio.filters.rnnoise_filter import RNNoiseFilter
|
||||
from pipecat.audio.mixers.silence_audio_mixer import SilenceAudioMixer
|
||||
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
|
||||
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.transports import InternalTransport
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
|
||||
from pipecat.transports.websocket.fastapi import (
|
||||
FastAPIWebsocketParams,
|
||||
FastAPIWebsocketTransport,
|
||||
)
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
librnnoise_path = os.path.normpath(
|
||||
str(APP_ROOT_DIR / "native" / "rnnoise" / "librnnoise.so")
|
||||
|
|
|
|||
|
|
@ -122,9 +122,9 @@ class WebSocketSmartTurnAnalyzer(BaseSmartTurn):
|
|||
logger.debug("Establishing new WebSocket connection to Smart-Turn service...")
|
||||
|
||||
# Prepare headers
|
||||
extra_headers = dict(self._headers)
|
||||
additional_headers = dict(self._headers)
|
||||
if self._service_context is not None:
|
||||
extra_headers["X-Service-Context"] = str(self._service_context)
|
||||
additional_headers["X-Service-Context"] = str(self._service_context)
|
||||
|
||||
# _init_sample_rate is being set in the constructor, which we should
|
||||
# use in case self._sample_rate is not set yet. The actual _sample_rate
|
||||
|
|
@ -135,7 +135,7 @@ class WebSocketSmartTurnAnalyzer(BaseSmartTurn):
|
|||
_sample_rate = self._sample_rate or self._init_sample_rate
|
||||
|
||||
if _sample_rate > 0:
|
||||
extra_headers["X-Sample-Rate"] = str(_sample_rate)
|
||||
additional_headers["X-Sample-Rate"] = str(_sample_rate)
|
||||
|
||||
max_attempts = 3
|
||||
for attempt in range(max_attempts):
|
||||
|
|
@ -148,7 +148,7 @@ class WebSocketSmartTurnAnalyzer(BaseSmartTurn):
|
|||
# Connect with websockets library
|
||||
self._ws = await websockets.connect(
|
||||
self._url,
|
||||
extra_headers=extra_headers,
|
||||
additional_headers=additional_headers,
|
||||
ping_interval=5.0, # let websockets send pings every 5s
|
||||
ping_timeout=3.0, # fail fast if no pong in 3s
|
||||
close_timeout=10,
|
||||
|
|
|
|||
|
|
@ -480,9 +480,9 @@ class PipecatEngine:
|
|||
async def _handle_start_node(self, node: Node) -> None:
|
||||
"""Handle start node execution."""
|
||||
# Handle voicemail detection setup (before any returns)
|
||||
# Lets check ENABLE_TRACING to make sure we have prompt access from
|
||||
# Lets check ENABLE_TRACING to make sure we have prompt access from
|
||||
# langfuse
|
||||
if node.detect_voicemail and DEPLOYMENT_MODE == 'saas' and ENABLE_TRACING:
|
||||
if node.detect_voicemail and DEPLOYMENT_MODE == "saas" and ENABLE_TRACING:
|
||||
if not self._audio_buffer:
|
||||
logger.warning(
|
||||
"Voicemail detection enabled but no audio buffer available - skipping detection"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue