dograh/evals/stt/providers/deepgram_provider.py
Abhishek 911c5ed416
fix: changes to update pipecat version to 0.0.100 (#122)
* feat: add stt evals

* add smart turn as provider

* chore: remove deprecations

* chore: format files

* fix: remove deprecated UserIdleProcessor

* fix: remove deprecated TranscriptProcessor

* chore: update pipecat submodule

* feat: add evals visualisation

* fix: trigger llm generation on client connected and pipeline started

* chore: update pipecat

* chore: update pipecat submodule

* Add tests

* fix: slow loading of workflow page

* chore: update pipecat submodule

* Show version after release

* Fixes #99

* fix: provider check for websocket connection

* Fixes #107

* Fix #96

* chore: fix documentation

* fix: cloudonix campaign call error

---------

Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
2026-01-23 18:53:59 +05:30

236 lines
8.4 KiB
Python

"""Deepgram STT provider with WebSocket streaming."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any
from urllib.parse import urlencode
from ..audio_streamer import AudioConfig, AudioStreamer
from .base import EventCallback, STTProvider, TranscriptionResult, Word
from loguru import logger
try:
from websockets.asyncio.client import connect as websocket_connect
except ImportError:
raise ImportError("websockets required: pip install websockets")
class DeepgramProvider(STTProvider):
"""Deepgram Nova Speech-to-Text provider with WebSocket streaming.
API Docs: https://developers.deepgram.com/docs/
Supports:
- Speaker diarization via `diarize=true`
- Keyterm boosting via `keyterm` parameter
- Real-time streaming via WebSocket
- Multiple languages
- Punctuation
For Flux models, use DeepgramFluxProvider instead.
"""
WS_URL = "wss://api.deepgram.com/v1/listen"
def __init__(self, api_key: str | None = None):
self.api_key = api_key or os.getenv("DEEPGRAM_API_KEY")
if not self.api_key:
raise ValueError(
"Deepgram API key required. Set DEEPGRAM_API_KEY env var or pass api_key."
)
@property
def name(self) -> str:
return "deepgram"
async def transcribe(
self,
audio_path: Path,
diarize: bool = False,
keyterms: list[str] | None = None,
on_event: EventCallback | None = None,
model: str = "nova-3-general",
language: str = "en",
sample_rate: int = 8000,
punctuate: bool = True,
trailing_silence_seconds: float = 3.0,
**kwargs: Any,
) -> TranscriptionResult:
"""Transcribe audio using Deepgram Nova WebSocket streaming.
Args:
audio_path: Path to audio file
diarize: Enable speaker diarization
keyterms: List of keywords to boost recognition
on_event: Optional callback for raw WebSocket events
model: Deepgram Nova model (nova-3, nova-2, etc.)
language: Language code
sample_rate: Audio sample rate for streaming
punctuate: Add punctuation
trailing_silence_seconds: Seconds of silence after audio to capture pending events
**kwargs: Additional Deepgram parameters
Returns:
TranscriptionResult with transcript and speaker info
"""
# Build query params
params: dict[str, Any] = {
"model": model,
"language": language,
"punctuate": str(punctuate).lower(),
"encoding": "linear16",
"sample_rate": sample_rate,
"channels": 1,
"interim_results": "true",
"smart_format": "true",
"profanity_filter": "true",
"vad_events": "true",
"utterance_end_ms": "1000"
}
if diarize:
params["diarize"] = "true"
# Build URL with params
url_parts = [f"{k}={v}" for k, v in params.items()]
# Add keyterms (repeated params)
if keyterms:
for term in keyterms:
url_parts.append(urlencode({"keyterm": term}))
# Add extra kwargs
for k, v in kwargs.items():
url_parts.append(f"{k}={v}")
ws_url = f"{self.WS_URL}?{'&'.join(url_parts)}"
logger.debug(f"Deepgram WebSocket URL: {ws_url}")
# Setup audio streamer
audio_config = AudioConfig(sample_rate=sample_rate)
streamer = AudioStreamer(audio_config)
# Collect results
all_words: list[dict[str, Any]] = []
final_transcript = ""
duration = 0.0
try:
async with websocket_connect(
ws_url,
additional_headers={"Authorization": f"Token {self.api_key}"},
) as ws:
# Create tasks for sending and receiving
send_complete = asyncio.Event()
async def send_audio():
"""Send audio chunks to Deepgram."""
chunk_no = 0
async for chunk in streamer.stream_file(
audio_path, trailing_silence_seconds=trailing_silence_seconds
):
logger.trace(f"[deepgram] Sent audio chunk {chunk_no}")
await ws.send(chunk)
chunk_no += 1
# Send close message
logger.debug(f"[deepgram] Sending CloseStream after {chunk_no} chunks")
await ws.send(json.dumps({"type": "CloseStream"}))
send_complete.set()
async def receive_transcripts():
"""Receive and collect transcription results."""
nonlocal all_words, final_transcript, duration
async for message in ws:
if isinstance(message, str):
data = json.loads(message)
msg_type = data.get("type")
logger.debug(f"[deepgram] Received {msg_type}: {data}")
# Emit event via callback if provided
if on_event and msg_type:
on_event(msg_type, data)
if msg_type == "Results":
# Nova-style response
channel = data.get("channel", {})
alternatives = channel.get("alternatives", [])
if alternatives:
alt = alternatives[0]
words = alt.get("words", [])
all_words.extend(words)
# Check if final
if data.get("is_final"):
final_transcript += alt.get("transcript", "") + " "
duration = max(
duration, data.get("duration", 0) + data.get("start", 0)
)
elif msg_type == "Metadata":
# Get duration from metadata
duration = data.get("duration", duration)
elif msg_type == "Error":
raise Exception(f"Deepgram error: {data}")
# Run send and receive concurrently
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_transcripts())
# Wait for send to complete, then wait a bit for final results
await send_task
try:
await asyncio.wait_for(receive_task, timeout=5.0)
except asyncio.TimeoutError:
pass # Normal - websocket closes after final results
except Exception as e:
logger.exception(e)
return self._parse_results(
all_words, final_transcript.strip(), duration, params, keyterms
)
def _parse_results(
self,
raw_words: list[dict[str, Any]],
transcript: str,
duration: float,
params: dict[str, Any],
keyterms: list[str] | None,
) -> TranscriptionResult:
"""Parse collected results into TranscriptionResult."""
words = []
speakers_set: set[str] = set()
for w in raw_words:
speaker = str(w.get("speaker", "")) if "speaker" in w else None
if speaker:
speakers_set.add(speaker)
words.append(
Word(
word=w.get("word", ""),
start=w.get("start", 0.0),
end=w.get("end", 0.0),
confidence=w.get("confidence", 0.0),
speaker=speaker,
speaker_confidence=w.get("speaker_confidence"),
)
)
stored_params = dict(params)
if keyterms:
stored_params["keyterms"] = keyterms
return TranscriptionResult(
provider=self.name,
transcript=transcript,
words=words,
speakers=sorted(speakers_set),
duration=duration,
raw_response={"words": raw_words},
params=stored_params,
)