2025-09-09 14:37:32 +05:30
|
|
|
|
"""Low-level RTP transport for Asterisk externalMedia sessions.
|
|
|
|
|
|
|
|
|
|
|
|
stasis_rtp_client.py
|
|
|
|
|
|
~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
|
|
|
|
|
|
* Sends and receives **proper RTP/UDP** (PT 0 PCMU/μ-law).
|
|
|
|
|
|
* Uses 20 ms frames (160 bytes payload) by default; automatically
|
|
|
|
|
|
chunks or concatenates data so timestamps stay correct.
|
|
|
|
|
|
* Verifies the RTP header on the receive path (SSRC and PT).
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
import secrets
|
|
|
|
|
|
import socket
|
|
|
|
|
|
import struct
|
|
|
|
|
|
from typing import TYPE_CHECKING, AsyncIterator, Optional
|
|
|
|
|
|
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
|
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
|
|
|
|
|
|
from api.services.telephony.stasis_rtp_transport import StasisRTPCallbacks
|
|
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────── helpers
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_RTP_HDR = struct.Struct("!BBHII") # v/p/x/cc, m/pt, seq, ts, ssrc
|
|
|
|
|
|
_PT_PCMU = 0 # static payload type for μ-law
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _RTPEncoder:
|
|
|
|
|
|
"""Builds PCMU RTP headers for the packets we SEND to Asterisk."""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.ssrc = secrets.randbits(32)
|
|
|
|
|
|
self.seq = secrets.randbits(16)
|
|
|
|
|
|
self.ts = 0 # incremented by #payload bytes
|
|
|
|
|
|
|
|
|
|
|
|
def pack(self, payload: bytes, mark=False) -> bytes:
|
|
|
|
|
|
b0 = 0x80 # V=2
|
|
|
|
|
|
b1 = (0x80 if mark else 0x00) | _PT_PCMU
|
|
|
|
|
|
hdr = _RTP_HDR.pack(b0, b1, self.seq, self.ts, self.ssrc)
|
|
|
|
|
|
self.seq = (self.seq + 1) & 0xFFFF
|
|
|
|
|
|
self.ts += len(payload) # 1 sample/byte @ 8 kHz
|
|
|
|
|
|
return hdr + payload
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _RTPDecoder:
|
|
|
|
|
|
"""Very forgiving RTP decoder.
|
|
|
|
|
|
|
|
|
|
|
|
Latches on the first valid packet and then insists
|
|
|
|
|
|
that SSRC & PT match afterwards. Returns *None* if the packet
|
|
|
|
|
|
should be ignored.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.peer_ssrc: int | None = None # learned from first packet
|
|
|
|
|
|
|
|
|
|
|
|
def unpack(self, packet: bytes) -> bytes | None:
|
|
|
|
|
|
if len(packet) < _RTP_HDR.size:
|
|
|
|
|
|
return None
|
|
|
|
|
|
b0, b1, seq, ts, ssrc = _RTP_HDR.unpack_from(packet)
|
|
|
|
|
|
if (b0 & 0xC0) != 0x80: # RTP v2?
|
|
|
|
|
|
return None
|
|
|
|
|
|
if (b1 & 0x7F) != _PT_PCMU: # payload-type 0?
|
|
|
|
|
|
return None
|
|
|
|
|
|
if self.peer_ssrc is None:
|
|
|
|
|
|
self.peer_ssrc = ssrc # latch on first good packet
|
|
|
|
|
|
elif ssrc != self.peer_ssrc:
|
|
|
|
|
|
return None # stray stream – drop
|
|
|
|
|
|
return packet[_RTP_HDR.size :]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────── client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StasisRTPClient:
|
|
|
|
|
|
"""Low-level wrapper around StasisRTPConnection.
|
|
|
|
|
|
|
|
|
|
|
|
Public API
|
|
|
|
|
|
──────────
|
|
|
|
|
|
• await setup(start_frame) kept for parity (does nothing)
|
|
|
|
|
|
• await connect()
|
|
|
|
|
|
• async for payload in receive(): # μ-law bytes (20 ms each)
|
|
|
|
|
|
…
|
|
|
|
|
|
• await send(data) # any length; will be chunked
|
|
|
|
|
|
• await disconnect()
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
_FRAME_SIZE = 160 # 20 ms @ 8 kHz PCMU
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
|
self,
|
|
|
|
|
|
connection: "StasisRTPConnection",
|
|
|
|
|
|
callbacks: "StasisRTPCallbacks",
|
|
|
|
|
|
):
|
|
|
|
|
|
"""Initialize Stasis RTP client.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
connection: RTP connection parameters.
|
|
|
|
|
|
callbacks: Callback handlers for transport events.
|
|
|
|
|
|
"""
|
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
|
|
self._connection = connection
|
|
|
|
|
|
self._callbacks = callbacks
|
|
|
|
|
|
self._encoder = _RTPEncoder()
|
|
|
|
|
|
self._decoder = _RTPDecoder()
|
|
|
|
|
|
|
|
|
|
|
|
self._recv_sock: Optional[socket.socket] = None
|
|
|
|
|
|
self._send_sock: Optional[socket.socket] = None
|
|
|
|
|
|
self._closing = False
|
|
|
|
|
|
self._recv_sock_ready = asyncio.Event() # Signal when recv socket is ready
|
|
|
|
|
|
self._leave_counter = 0 # Track input/output transport usage
|
|
|
|
|
|
|
|
|
|
|
|
# ── wire event handlers to the connection ────────────────
|
|
|
|
|
|
@self._connection.event_handler("connected")
|
|
|
|
|
|
async def _on_connected(_: Any):
|
|
|
|
|
|
await self._setup_sockets()
|
|
|
|
|
|
await self._callbacks.on_client_connected(
|
|
|
|
|
|
self._connection.caller_channel_id
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@self._connection.event_handler("disconnected")
|
2025-09-22 09:31:13 +00:00
|
|
|
|
async def _on_disconnected(_: Any):
|
|
|
|
|
|
logger.debug("In _on_disconnected of StasisRTPClient")
|
2025-09-09 14:37:32 +05:30
|
|
|
|
await self._callbacks.on_client_disconnected(
|
2025-09-22 09:31:13 +00:00
|
|
|
|
self._connection.caller_channel_id
|
2025-09-09 14:37:32 +05:30
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# ─── public helpers ──────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
async def setup(self, _):
|
|
|
|
|
|
"""Setup method for compatibility."""
|
|
|
|
|
|
self._leave_counter += 1
|
|
|
|
|
|
|
|
|
|
|
|
async def connect(self):
|
|
|
|
|
|
"""Connect to the RTP socket."""
|
|
|
|
|
|
if self._connection.is_connected():
|
|
|
|
|
|
return
|
|
|
|
|
|
await self._connection.connect()
|
|
|
|
|
|
|
2025-09-22 09:31:13 +00:00
|
|
|
|
async def disconnect(self):
|
2025-09-09 14:37:32 +05:30
|
|
|
|
"""Disconnect from the RTP socket."""
|
|
|
|
|
|
# Decrement leave counter when disconnect is called
|
|
|
|
|
|
logger.debug(f"StasisRTPClient.disconnect leave_counter: {self._leave_counter}")
|
|
|
|
|
|
self._leave_counter -= 1
|
|
|
|
|
|
if self._leave_counter > 0:
|
|
|
|
|
|
# Early return - InputTransport called first, OutputTransport will call later
|
2025-09-22 09:31:13 +00:00
|
|
|
|
# Only proceed when counter reaches 0 (OutputTransport's call)
|
2025-09-09 14:37:32 +05:30
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Close sockets
|
|
|
|
|
|
logger.debug("Going to close sockets")
|
|
|
|
|
|
await self._close_sockets()
|
|
|
|
|
|
|
|
|
|
|
|
if self._closing:
|
|
|
|
|
|
# We might have received the disconnected callback from the StasisRTPConnection
|
|
|
|
|
|
# due to user hangup. We will just return. We have already closed the sockets
|
|
|
|
|
|
# in disconnected callback handler.
|
|
|
|
|
|
return
|
|
|
|
|
|
self._closing = True
|
|
|
|
|
|
|
2025-09-22 09:31:13 +00:00
|
|
|
|
# If we have initiated transfer before, we would ignore _connection.disconnect()
|
|
|
|
|
|
# in the connection. (since is_closing would be set by transfer)
|
|
|
|
|
|
try:
|
|
|
|
|
|
await self._connection.disconnect()
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
logger.error(f"Failed to disconnect RTP connection: {exc}")
|
2025-09-09 14:37:32 +05:30
|
|
|
|
|
|
|
|
|
|
# ─── socket management ──────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
async def _setup_sockets(self):
|
|
|
|
|
|
if self._recv_sock and self._send_sock:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(
|
|
|
|
|
|
f"Setting up Sockets - local {self._connection.local_addr}, remote: {self._connection.remote_addr}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# receive socket – bind to local address provided by connection
|
|
|
|
|
|
if not self._recv_sock:
|
|
|
|
|
|
rs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
|
rs.setblocking(False)
|
|
|
|
|
|
rs.bind(self._connection.local_addr)
|
|
|
|
|
|
self._recv_sock = rs
|
|
|
|
|
|
self._recv_sock_ready.set() # Signal that recv socket is ready
|
|
|
|
|
|
|
|
|
|
|
|
# send socket – connect to remote (Asterisk) address
|
|
|
|
|
|
if not self._send_sock:
|
|
|
|
|
|
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
|
ss.setblocking(False)
|
|
|
|
|
|
ss.connect(self._connection.remote_addr)
|
|
|
|
|
|
self._send_sock = ss
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(
|
|
|
|
|
|
f"Socket setup complete - recv_fd: {self._recv_sock.fileno()}, send_fd: {self._send_sock.fileno()}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
async def _close_sockets(self):
|
|
|
|
|
|
"""Safely close sockets with proper error handling."""
|
|
|
|
|
|
for sock_name, sock in [("recv", self._recv_sock), ("send", self._send_sock)]:
|
|
|
|
|
|
if sock:
|
|
|
|
|
|
try:
|
|
|
|
|
|
# Shutdown the socket first to break any pending operations
|
|
|
|
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
|
|
|
|
except OSError:
|
|
|
|
|
|
# Socket might already be closed or in a bad state
|
|
|
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
|
|
|
sock.close()
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
logger.debug(f"Error closing {sock_name} socket: {exc}")
|
|
|
|
|
|
|
|
|
|
|
|
self._recv_sock = None
|
|
|
|
|
|
self._send_sock = None
|
|
|
|
|
|
self._recv_sock_ready.clear() # Reset the event for potential reconnection
|
|
|
|
|
|
|
|
|
|
|
|
# Notify the connection that sockets are closed so ARI Manager can clean up ports
|
|
|
|
|
|
await self._connection.notify_sockets_closed()
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Closed sockets in StasisRTPClient")
|
|
|
|
|
|
|
|
|
|
|
|
# ─── receive path ────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
async def receive(self) -> AsyncIterator[bytes]:
|
|
|
|
|
|
"""Async generator yielding μ-law frames (exactly 160 bytes each).
|
|
|
|
|
|
|
|
|
|
|
|
Silently drops any packet whose RTP header does not match our SSRC/PT.
|
|
|
|
|
|
"""
|
|
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
|
|
|
|
|
|
|
# Wait for recv socket to be created
|
|
|
|
|
|
try:
|
|
|
|
|
|
await self._recv_sock_ready.wait()
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Going to receive from the socket now")
|
|
|
|
|
|
|
|
|
|
|
|
while not self._closing:
|
|
|
|
|
|
try:
|
|
|
|
|
|
# each loop gets 172 bytes UDP packet, which is 160 bytes of
|
|
|
|
|
|
# audio data (Asterisk sends 20ms audio chunks with 8k sample rate)
|
|
|
|
|
|
# and 12 bytes of RTP header
|
|
|
|
|
|
data = await loop.sock_recv(self._recv_sock, 2048)
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
|
logger.debug("RTP receive task cancelled")
|
|
|
|
|
|
break
|
|
|
|
|
|
except (OSError, socket.error) as exc:
|
|
|
|
|
|
logger.warning(f"RTP receive failed (socket closed): {exc}")
|
|
|
|
|
|
break
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
logger.debug(f"Unexpected error in receive: {exc}")
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
payload = self._decoder.unpack(data)
|
|
|
|
|
|
if payload is None:
|
|
|
|
|
|
continue # header failed validation
|
|
|
|
|
|
|
|
|
|
|
|
# In practice Asterisk sends 20 ms frames – assert just in case.
|
|
|
|
|
|
if len(payload) != self._FRAME_SIZE:
|
|
|
|
|
|
logger.warning(f"Dropping non-20 ms packet len={len(payload)}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
yield payload
|
|
|
|
|
|
|
|
|
|
|
|
# ─── send path ───────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
async def send(self, data: bytes):
|
|
|
|
|
|
"""Send μ-law data of arbitrary length.
|
|
|
|
|
|
|
|
|
|
|
|
Splits/aggregates into 160-byte chunks before RTP-wrapping.
|
|
|
|
|
|
"""
|
|
|
|
|
|
if self._closing or not self._send_sock:
|
|
|
|
|
|
return
|
|
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
|
|
|
|
|
|
|
# chunk/concat to 160-byte frames
|
|
|
|
|
|
chunks = self._chunk_ulaw(data, self._FRAME_SIZE)
|
|
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
|
|
|
|
mark = i == 0 # set marker on the first packet of talk-spurt
|
|
|
|
|
|
packet = self._encoder.pack(chunk, mark=mark)
|
|
|
|
|
|
try:
|
|
|
|
|
|
await loop.sock_sendall(self._send_sock, packet)
|
|
|
|
|
|
except (OSError, socket.error) as exc:
|
|
|
|
|
|
logger.warning(f"RTP send failed (socket closed): {exc}")
|
|
|
|
|
|
break
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
logger.error(f"RTP send failed: {exc}")
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
def _chunk_ulaw(self, buf: bytes, size: int) -> list[bytes]:
|
|
|
|
|
|
"""Split / aggregate μ-law bytes to exact *size* multiples.
|
|
|
|
|
|
|
|
|
|
|
|
• If buf length is not a multiple of *size*, pad the last chunk with 0xFF
|
|
|
|
|
|
(silence). That keeps timestamps monotonic.
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not buf:
|
|
|
|
|
|
return []
|
|
|
|
|
|
if len(buf) % size:
|
|
|
|
|
|
pad = size - (len(buf) % size)
|
|
|
|
|
|
buf += b"\xff" * pad
|
|
|
|
|
|
return [buf[i : i + size] for i in range(0, len(buf), size)]
|
|
|
|
|
|
|
|
|
|
|
|
# ─── properties ──────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def is_connected(self) -> bool:
|
|
|
|
|
|
"""Check if client is connected."""
|
|
|
|
|
|
return self._connection.is_connected() and not self._closing
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def is_closing(self) -> bool:
|
|
|
|
|
|
"""Check if client is closing."""
|
|
|
|
|
|
return self._closing
|