Remove test files

This commit is contained in:
Abhishek Kumar 2025-09-09 14:40:09 +05:30
parent 4f2a629340
commit 695b43af28
2 changed files with 0 additions and 386 deletions

View file

@ -1,208 +0,0 @@
"""Tests for audio chunk size calculations to ensure robustness against upstream changes."""
import pytest
from pipecat.audio.audio_utils import (
AudioEncoding,
calculate_audio_bytes_per_sample,
calculate_chunk_size_bytes,
calculate_duration_ms,
get_audio_encoding,
)
class TestAudioEncoding:
"""Test audio encoding detection from metadata."""
def test_get_audio_encoding_pcm_default(self):
"""Test that PCM is the default encoding."""
assert get_audio_encoding({}) == AudioEncoding.PCM
assert get_audio_encoding({"audio_encoding": ""}) == AudioEncoding.PCM
assert get_audio_encoding({"audio_encoding": "unknown"}) == AudioEncoding.PCM
def test_get_audio_encoding_ulaw(self):
"""Test μ-law encoding detection."""
assert get_audio_encoding({"audio_encoding": "ulaw"}) == AudioEncoding.ULAW
assert get_audio_encoding({"audio_encoding": "ULAW"}) == AudioEncoding.ULAW
assert get_audio_encoding({"audio_encoding": "Ulaw"}) == AudioEncoding.ULAW
def test_get_audio_encoding_alaw(self):
"""Test A-law encoding detection."""
assert get_audio_encoding({"audio_encoding": "alaw"}) == AudioEncoding.ALAW
assert get_audio_encoding({"audio_encoding": "ALAW"}) == AudioEncoding.ALAW
class TestAudioBytesPerSample:
"""Test bytes per sample calculation for different encodings."""
def test_pcm_bytes_per_sample(self):
"""Test PCM uses 2 bytes per sample."""
assert calculate_audio_bytes_per_sample(AudioEncoding.PCM) == 2
def test_ulaw_bytes_per_sample(self):
"""Test μ-law uses 1 byte per sample."""
assert calculate_audio_bytes_per_sample(AudioEncoding.ULAW) == 1
def test_alaw_bytes_per_sample(self):
"""Test A-law uses 1 byte per sample."""
assert calculate_audio_bytes_per_sample(AudioEncoding.ALAW) == 1
class TestChunkSizeCalculation:
"""Test chunk size calculations for various configurations."""
def test_pcm_8khz_20ms_mono(self):
"""Test PCM 8kHz 20ms mono chunk size."""
chunk_size = calculate_chunk_size_bytes(8000, 20, 1, AudioEncoding.PCM)
assert chunk_size == 320 # 8000/1000 * 20 * 1 * 2
def test_ulaw_8khz_20ms_mono(self):
"""Test μ-law 8kHz 20ms mono chunk size."""
chunk_size = calculate_chunk_size_bytes(8000, 20, 1, AudioEncoding.ULAW)
assert chunk_size == 160 # 8000/1000 * 20 * 1 * 1
def test_pcm_16khz_10ms_mono(self):
"""Test PCM 16kHz 10ms mono chunk size."""
chunk_size = calculate_chunk_size_bytes(16000, 10, 1, AudioEncoding.PCM)
assert chunk_size == 320 # 16000/1000 * 10 * 1 * 2
def test_pcm_44100hz_10ms_stereo(self):
"""Test PCM 44.1kHz 10ms stereo chunk size."""
chunk_size = calculate_chunk_size_bytes(44100, 10, 2, AudioEncoding.PCM)
assert chunk_size == 1764 # 44100/1000 * 10 * 2 * 2
def test_different_durations(self):
"""Test various durations for consistency."""
# 10ms chunks
assert calculate_chunk_size_bytes(8000, 10, 1, AudioEncoding.PCM) == 160
assert calculate_chunk_size_bytes(8000, 10, 1, AudioEncoding.ULAW) == 80
# 20ms chunks
assert calculate_chunk_size_bytes(8000, 20, 1, AudioEncoding.PCM) == 320
assert calculate_chunk_size_bytes(8000, 20, 1, AudioEncoding.ULAW) == 160
# 40ms chunks
assert calculate_chunk_size_bytes(8000, 40, 1, AudioEncoding.PCM) == 640
assert calculate_chunk_size_bytes(8000, 40, 1, AudioEncoding.ULAW) == 320
class TestDurationCalculation:
"""Test duration calculation from byte count."""
def test_pcm_duration_calculation(self):
"""Test duration calculation for PCM audio."""
# 320 bytes of 8kHz mono PCM should be 20ms
duration = calculate_duration_ms(320, 8000, 1, AudioEncoding.PCM)
assert duration == 20.0
# 160 bytes of 8kHz mono PCM should be 10ms
duration = calculate_duration_ms(160, 8000, 1, AudioEncoding.PCM)
assert duration == 10.0
def test_ulaw_duration_calculation(self):
"""Test duration calculation for μ-law audio."""
# 160 bytes of 8kHz mono μ-law should be 20ms
duration = calculate_duration_ms(160, 8000, 1, AudioEncoding.ULAW)
assert duration == 20.0
# 80 bytes of 8kHz mono μ-law should be 10ms
duration = calculate_duration_ms(80, 8000, 1, AudioEncoding.ULAW)
assert duration == 10.0
def test_round_trip_consistency(self):
"""Test that chunk size and duration calculations are consistent."""
test_cases = [
(8000, 20, 1, AudioEncoding.PCM),
(8000, 20, 1, AudioEncoding.ULAW),
(16000, 10, 1, AudioEncoding.PCM),
(44100, 10, 2, AudioEncoding.PCM),
]
for sample_rate, duration_ms, channels, encoding in test_cases:
chunk_size = calculate_chunk_size_bytes(
sample_rate, duration_ms, channels, encoding
)
calculated_duration = calculate_duration_ms(
chunk_size, sample_rate, channels, encoding
)
assert abs(calculated_duration - duration_ms) < 0.1, (
f"Round trip failed for {sample_rate}Hz {duration_ms}ms {channels}ch {encoding}: "
f"expected {duration_ms}ms, got {calculated_duration}ms"
)
class TestRobustnessAgainstUpstreamChanges:
"""Test scenarios that ensure our code is robust against upstream changes."""
def test_chunk_size_independence(self):
"""Test that our calculations don't depend on upstream PCM assumptions."""
# Simulate what upstream calculates for PCM
upstream_sample_rate = 8000
upstream_channels = 1
upstream_10ms_chunks = 2 # 20ms total
# Upstream calculation (assumes PCM with 2 bytes per sample)
upstream_audio_bytes_10ms = (
int(upstream_sample_rate / 100) * upstream_channels * 2
)
upstream_chunk_size = upstream_audio_bytes_10ms * upstream_10ms_chunks
# Our calculation for PCM should match upstream
our_pcm_chunk_size = calculate_chunk_size_bytes(
upstream_sample_rate,
upstream_10ms_chunks * 10,
upstream_channels,
AudioEncoding.PCM,
)
assert our_pcm_chunk_size == upstream_chunk_size
# Our calculation for μ-law should be different
our_ulaw_chunk_size = calculate_chunk_size_bytes(
upstream_sample_rate,
upstream_10ms_chunks * 10,
upstream_channels,
AudioEncoding.ULAW,
)
assert our_ulaw_chunk_size == upstream_chunk_size // 2
def test_various_upstream_configurations(self):
"""Test that our calculations work correctly for various upstream configs."""
configurations = [
# (sample_rate, channels, 10ms_chunks)
(8000, 1, 1), # 10ms chunks
(8000, 1, 2), # 20ms chunks
(8000, 1, 4), # 40ms chunks
(16000, 1, 2), # 16kHz, 20ms
(24000, 1, 2), # 24kHz, 20ms
(44100, 2, 1), # 44.1kHz stereo, 10ms
]
for sample_rate, channels, chunks_10ms in configurations:
# Simulate upstream PCM calculation
upstream_bytes_10ms = int(sample_rate / 100) * channels * 2
upstream_chunk_size = upstream_bytes_10ms * chunks_10ms
# Our calculations
duration_ms = chunks_10ms * 10
# PCM should match upstream
pcm_size = calculate_chunk_size_bytes(
sample_rate, duration_ms, channels, AudioEncoding.PCM
)
assert pcm_size == upstream_chunk_size, (
f"PCM mismatch for {sample_rate}Hz {channels}ch {duration_ms}ms: "
f"expected {upstream_chunk_size}, got {pcm_size}"
)
# μ-law should be half of PCM
ulaw_size = calculate_chunk_size_bytes(
sample_rate, duration_ms, channels, AudioEncoding.ULAW
)
assert ulaw_size == upstream_chunk_size // 2, (
f"μ-law mismatch for {sample_rate}Hz {channels}ch {duration_ms}ms: "
f"expected {upstream_chunk_size // 2}, got {ulaw_size}"
)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View file

@ -1,178 +0,0 @@
"""Tests specific to Stasis RTP audio handling to ensure correct μ-law processing."""
import pytest
from api.services.telephony.stasis_rtp_serializer import StasisRTPFrameSerializer
from pipecat.audio.audio_utils import AudioEncoding, calculate_chunk_size_bytes
from pipecat.frames.frames import TTSAudioRawFrame
class TestStasisAudioFlow:
"""Test the complete audio flow for Stasis RTP transport."""
def test_elevenlabs_ulaw_metadata(self):
"""Test that ElevenLabs μ-law audio frames have correct metadata."""
# Create a frame as ElevenLabs would
audio_data = b"\xff" * 160 # 160 bytes of μ-law silence
frame = TTSAudioRawFrame(audio=audio_data, sample_rate=8000, num_channels=1)
frame.metadata["audio_encoding"] = "ulaw"
# Verify metadata
assert frame.metadata.get("audio_encoding") == "ulaw"
assert len(frame.audio) == 160 # 20ms of 8kHz μ-law
@pytest.mark.asyncio
async def test_serializer_passthrough_for_ulaw(self):
"""Test that StasisRTPFrameSerializer passes through μ-law audio."""
serializer = StasisRTPFrameSerializer()
# Create a μ-law frame
ulaw_data = b"\xff" * 160
frame = TTSAudioRawFrame(audio=ulaw_data, sample_rate=8000, num_channels=1)
frame.metadata["audio_encoding"] = "ulaw"
# Serialize should pass through without conversion
result = await serializer.serialize(frame)
assert result == ulaw_data # Should be unchanged
assert len(result) == 160
def test_chunk_size_for_stasis_configuration(self):
"""Test chunk size calculation for typical Stasis configurations."""
# Stasis typically uses 20ms packets at 8kHz
# PCM calculation (what upstream assumes)
pcm_chunk_size = calculate_chunk_size_bytes(
sample_rate=8000, duration_ms=20, num_channels=1, encoding=AudioEncoding.PCM
)
assert pcm_chunk_size == 320
# μ-law calculation (what we actually need)
ulaw_chunk_size = calculate_chunk_size_bytes(
sample_rate=8000,
duration_ms=20,
num_channels=1,
encoding=AudioEncoding.ULAW,
)
assert ulaw_chunk_size == 160
# The ratio should always be 2:1 for PCM:μ-law
assert pcm_chunk_size == ulaw_chunk_size * 2
def test_rtp_packet_timing(self):
"""Test that RTP packet timing is correct for μ-law audio."""
# For 8kHz μ-law:
# - 20ms = 160 bytes
# - RTP timestamp increments by 160 for each packet
sample_rate = 8000
packet_duration_ms = 20
# Calculate bytes per packet
bytes_per_packet = calculate_chunk_size_bytes(
sample_rate, packet_duration_ms, 1, AudioEncoding.ULAW
)
# RTP timestamp increment should equal samples per packet
samples_per_packet = int(sample_rate * packet_duration_ms / 1000)
rtp_timestamp_increment = samples_per_packet
assert bytes_per_packet == 160
assert rtp_timestamp_increment == 160
def test_audio_speed_scenario(self):
"""Test the scenario that was causing audio to play too fast."""
# Original problem: 320 bytes of μ-law was being sent as one chunk
# This is 40ms of audio, not 20ms, causing 2x playback speed
# Incorrect scenario (what was happening)
incorrect_chunk_size = 320 # PCM assumption
incorrect_duration = incorrect_chunk_size / (
8000 * 1
) # bytes / (samples/sec * bytes/sample)
# For μ-law: 320 bytes = 320 samples = 0.04 seconds = 40ms
# Correct scenario (what should happen)
correct_chunk_size = 160 # μ-law reality
correct_duration = correct_chunk_size / (
8000 * 1
) # 160 samples = 0.02 seconds = 20ms
assert incorrect_duration == 0.04 # 40ms - too much!
assert correct_duration == 0.02 # 20ms - correct!
def test_transport_chunk_calculation(self):
"""Test that transport correctly calculates chunk sizes for different encodings."""
from pipecat.transports.base_transport import TransportParams
# Standard transport params for Stasis
params = TransportParams(
audio_out_enabled=True,
audio_out_sample_rate=8000,
audio_out_channels=1,
audio_out_10ms_chunks=2, # 20ms total
)
# Calculate what the transport would compute for PCM
audio_bytes_10ms_pcm = (
int(params.audio_out_sample_rate / 100) * params.audio_out_channels * 2
)
chunk_size_pcm = audio_bytes_10ms_pcm * params.audio_out_10ms_chunks
assert audio_bytes_10ms_pcm == 160 # 10ms of PCM
assert chunk_size_pcm == 320 # 20ms of PCM
# Our calculation for μ-law
duration_ms = params.audio_out_10ms_chunks * 10
chunk_size_ulaw = calculate_chunk_size_bytes(
params.audio_out_sample_rate,
duration_ms,
params.audio_out_channels,
AudioEncoding.ULAW,
)
assert chunk_size_ulaw == 160 # 20ms of μ-law
assert chunk_size_ulaw == chunk_size_pcm // 2
class TestEdgeCases:
"""Test edge cases and error conditions."""
def test_mixed_encoding_stream(self):
"""Test handling of streams that mix PCM and μ-law frames."""
# This shouldn't happen in practice, but we should handle it gracefully
# PCM frame
pcm_frame = TTSAudioRawFrame(
audio=b"\x00" * 320, sample_rate=8000, num_channels=1
)
pcm_chunk_size = calculate_chunk_size_bytes(8000, 20, 1, AudioEncoding.PCM)
assert pcm_chunk_size == 320
# μ-law frame
ulaw_frame = TTSAudioRawFrame(
audio=b"\xff" * 160, sample_rate=8000, num_channels=1
)
ulaw_frame.metadata["audio_encoding"] = "ulaw"
ulaw_chunk_size = calculate_chunk_size_bytes(8000, 20, 1, AudioEncoding.ULAW)
assert ulaw_chunk_size == 160
def test_non_standard_sample_rates(self):
"""Test chunk size calculations for non-standard sample rates."""
# While Stasis typically uses 8kHz, we should handle other rates correctly
test_cases = [
(16000, 20, AudioEncoding.ULAW, 320), # 16kHz μ-law
(24000, 20, AudioEncoding.ULAW, 480), # 24kHz μ-law
(48000, 10, AudioEncoding.ULAW, 480), # 48kHz μ-law, 10ms
]
for sample_rate, duration_ms, encoding, expected_size in test_cases:
chunk_size = calculate_chunk_size_bytes(
sample_rate, duration_ms, 1, encoding
)
assert chunk_size == expected_size
if __name__ == "__main__":
pytest.main([__file__, "-v"])