dograh/evals/stt/audio_streamer.py
Abhishek 911c5ed416
fix: changes to update pipecat version to 0.0.100 (#122)
* feat: add stt evals

* add smart turn as provider

* chore: remove deprecations

* chore: format files

* fix: remove deprecated UserIdleProcessor

* fix: remove deprecated TranscriptProcessor

* chore: update pipecat submodule

* feat: add evals visualisation

* fix: trigger llm generation on client connected and pipeline started

* chore: update pipecat

* chore: update pipecat submodule

* Add tests

* fix: slow loading of workflow page

* chore: update pipecat submodule

* Show version after release

* Fixes #99

* fix: provider check for websocket connection

* Fixes #107

* Fix #96

* chore: fix documentation

* fix: cloudonix campaign call error

---------

Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
2026-01-23 18:53:59 +05:30

140 lines
4.1 KiB
Python

"""Audio file streamer - converts audio files to PCM16 streams."""
import asyncio
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncIterator
@dataclass
class AudioConfig:
"""Audio streaming configuration."""
sample_rate: int = 8000
channels: int = 1
sample_width: int = 2 # 16-bit = 2 bytes
chunk_duration_ms: int = 80 # Send chunks every 80ms
@property
def chunk_size(self) -> int:
"""Bytes per chunk based on duration."""
samples_per_chunk = int(self.sample_rate * self.chunk_duration_ms / 1000)
return samples_per_chunk * self.channels * self.sample_width
class AudioStreamer:
"""Streams audio files as PCM16 chunks.
Converts any audio format to raw PCM16 using ffmpeg and streams
in real-time chunks to simulate live audio.
"""
def __init__(self, config: AudioConfig | None = None):
self.config = config or AudioConfig()
def convert_to_pcm16(self, audio_path: Path) -> bytes:
"""Convert audio file to raw PCM16 bytes using ffmpeg.
Args:
audio_path: Path to input audio file
Returns:
Raw PCM16 audio bytes
"""
cmd = [
"ffmpeg",
"-i",
str(audio_path),
"-f",
"s16le", # signed 16-bit little-endian
"-acodec",
"pcm_s16le",
"-ar",
str(self.config.sample_rate),
"-ac",
str(self.config.channels),
"-", # output to stdout
]
result = subprocess.run(
cmd,
capture_output=True,
check=True,
)
return result.stdout
async def stream_file(
self,
audio_path: Path,
realtime: bool = True,
trailing_silence_seconds: float = 0.0,
) -> AsyncIterator[bytes]:
"""Stream audio file as PCM16 chunks.
Args:
audio_path: Path to audio file
realtime: If True, add delays to simulate real-time streaming
trailing_silence_seconds: Seconds of silence to append after audio ends.
Useful for capturing pending end-of-turn events from STT providers.
Yields:
PCM16 audio chunks
"""
# Convert entire file to PCM16
pcm_data = self.convert_to_pcm16(audio_path)
chunk_size = self.config.chunk_size
delay = self.config.chunk_duration_ms / 1000.0 if realtime else 0
# Stream audio chunks
for i in range(0, len(pcm_data), chunk_size):
chunk = pcm_data[i : i + chunk_size]
if chunk:
yield chunk
if realtime and delay > 0:
await asyncio.sleep(delay)
# Stream trailing silence if requested
if trailing_silence_seconds > 0:
silence_chunk = bytes(chunk_size) # Zero-filled bytes = silence
num_silence_chunks = int(trailing_silence_seconds / (self.config.chunk_duration_ms / 1000.0))
for _ in range(num_silence_chunks):
yield silence_chunk
if realtime and delay > 0:
await asyncio.sleep(delay)
async def stream_file_fast(self, audio_path: Path) -> AsyncIterator[bytes]:
"""Stream audio file as fast as possible (no real-time delay).
Args:
audio_path: Path to audio file
Yields:
PCM16 audio chunks
"""
async for chunk in self.stream_file(audio_path, realtime=False):
yield chunk
def get_duration(self, audio_path: Path) -> float:
"""Get audio file duration in seconds.
Args:
audio_path: Path to audio file
Returns:
Duration in seconds
"""
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())