dograh/api/services/looptalk/internal_serializer.py
Abhishek 0e12c41fc7
chore: bump pipecat version and fix tests (#263)
* chore: bump pipecat version and fix tests

* chore: add github workflow to run tests

* fix: install reqirements.dev.txt in test script

* fix: fix api-test action

* feat: add integration test

* test: add integration tests

* test: add test for function call mute strategy
2026-05-04 21:35:37 +05:30

75 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Internal frame serializer for agent-to-agent communication."""
from loguru import logger
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
class InternalFrameSerializer(FrameSerializer):
"""Serializer for InternalTransport that filters frames between agents.
This serializer ensures only audio frames are passed between agents,
preventing control frames from creating infinite loops.
"""
async def serialize(self, frame: Frame) -> bytes | None:
"""Only serialize audio frames for transmission between agents."""
# Only pass audio frames between agents
if isinstance(frame, OutputAudioRawFrame):
# Use a fixed-size header to avoid parsing issues with binary data
# Format: "AUDIO" (5 bytes) + sample_rate (4 bytes) + num_channels (2 bytes) + audio data
header = b"AUDIO"
sample_rate_bytes = frame.sample_rate.to_bytes(4, byteorder="big")
num_channels_bytes = frame.num_channels.to_bytes(2, byteorder="big")
serialized = header + sample_rate_bytes + num_channels_bytes + frame.audio
return serialized
# Don't pass control frames between agents
return None
async def deserialize(self, data: bytes) -> Frame | None:
"""Deserialize audio frames from partner agent."""
if data.startswith(b"AUDIO"):
try:
# Fixed-size header parsing
# Header: "AUDIO" (5 bytes) + sample_rate (4 bytes) + num_channels (2 bytes)
if len(data) < 11: # Minimum size for header
logger.error(
f"InternalSerializer: Data too short for header: {len(data)} bytes"
)
return None
# Extract fixed-size fields
# Skip header validation - we already checked startswith(b"AUDIO")
sample_rate = int.from_bytes(data[5:9], byteorder="big")
num_channels = int.from_bytes(data[9:11], byteorder="big")
# Extract audio data - everything after the header
audio_data = data[11:]
# Check if audio data length is valid
if len(audio_data) % 2 != 0:
logger.warning(
f"InternalSerializer: Audio data has odd length: {len(audio_data)}"
)
# Convert to InputAudioRawFrame for the receiving agent
return InputAudioRawFrame(
audio=audio_data, num_channels=num_channels, sample_rate=sample_rate
)
except Exception as e:
logger.error(f"Failed to deserialize audio frame: {e}")
return None
return None