dograh/evals/stt/event_capture.py
Abhishek 911c5ed416
fix: changes to update pipecat version to 0.0.100 (#122)
* feat: add stt evals

* add smart turn as provider

* chore: remove deprecations

* chore: format files

* fix: remove deprecated UserIdleProcessor

* fix: remove deprecated TranscriptProcessor

* chore: update pipecat submodule

* feat: add evals visualisation

* fix: trigger llm generation on client connected and pipeline started

* chore: update pipecat

* chore: update pipecat submodule

* Add tests

* fix: slow loading of workflow page

* chore: update pipecat submodule

* Show version after release

* Fixes #99

* fix: provider check for websocket connection

* Fixes #107

* Fix #96

* chore: fix documentation

* fix: cloudonix campaign call error

---------

Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
2026-01-23 18:53:59 +05:30

251 lines
7 KiB
Python

#!/usr/bin/env python3
"""STT Event Capture Runner.
Streams audio to STT providers and captures raw WebSocket events with timestamps
for visualization in the web UI.
Usage:
python -m evals.stt.event_capture audio/multi_speaker.m4a --provider deepgram
python -m evals.stt.event_capture audio/multi_speaker.m4a --provider speechmatics
"""
import argparse
import asyncio
import json
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable
from evals.stt.audio_streamer import AudioStreamer
from evals.stt.providers import (
DeepgramFluxProvider,
DeepgramProvider,
SpeechmaticsProvider,
STTProvider,
)
@dataclass
class CapturedEvent:
"""A captured WebSocket event with timestamp."""
timestamp: float # Time since stream start (seconds)
event_type: str # e.g., "Results", "TurnInfo", "AddTranscript"
data: dict[str, Any] # Raw event payload
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"event_type": self.event_type,
"data": self.data,
}
@dataclass
class EventCaptureResult:
"""Result from event capture session."""
audio_file: str
audio_path: str # Relative path to audio from results dir
provider: str
duration: float
created_at: str
events: list[CapturedEvent] = field(default_factory=list)
transcript: str = "" # Final transcript for reference
def to_dict(self) -> dict[str, Any]:
return {
"audio_file": self.audio_file,
"audio_path": self.audio_path,
"provider": self.provider,
"duration": self.duration,
"created_at": self.created_at,
"events": [e.to_dict() for e in self.events],
"transcript": self.transcript,
}
EventCallback = Callable[[str, dict[str, Any]], None]
def get_provider(name: str) -> STTProvider:
"""Get provider instance by name."""
providers = {
"deepgram": DeepgramProvider,
"deepgram-flux": DeepgramFluxProvider,
"speechmatics": SpeechmaticsProvider,
}
if name not in providers:
raise ValueError(f"Unknown provider: {name}. Available: {list(providers.keys())}")
return providers[name]()
async def capture_events(
provider: STTProvider,
audio_path: Path,
sample_rate: int = 8000,
**kwargs: Any,
) -> EventCaptureResult:
"""Capture WebSocket events from a provider.
Args:
provider: The STT provider to use
audio_path: Path to the audio file
sample_rate: Audio sample rate
**kwargs: Additional provider parameters
Returns:
EventCaptureResult with all captured events
"""
# Get audio duration
streamer = AudioStreamer()
duration = streamer.get_duration(audio_path)
# Event list and start time
events: list[CapturedEvent] = []
start_time: float | None = None
def on_event(event_type: str, data: dict[str, Any]) -> None:
"""Callback for capturing events."""
nonlocal start_time
if start_time is None:
start_time = asyncio.get_event_loop().time()
timestamp = asyncio.get_event_loop().time() - start_time
events.append(CapturedEvent(timestamp=timestamp, event_type=event_type, data=data))
# Run transcription with event callback
result = await provider.transcribe(
audio_path,
sample_rate=sample_rate,
on_event=on_event,
**kwargs,
)
return EventCaptureResult(
audio_file=audio_path.name,
audio_path=f"../audio/{audio_path.name}",
provider=provider.name,
duration=duration,
created_at=datetime.now().isoformat(),
events=events,
transcript=result.transcript,
)
def save_result(result: EventCaptureResult, output_dir: Path) -> Path:
"""Save capture result to JSON file.
Args:
result: The capture result to save
output_dir: Directory to save results
Returns:
Path to the saved file
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Format: {audio_name}-{provider}.json
audio_name = Path(result.audio_file).stem
output_file = output_dir / f"{audio_name}-{result.provider}.json"
with open(output_file, "w") as f:
json.dump(result.to_dict(), f, indent=2)
return output_file
async def main() -> int:
parser = argparse.ArgumentParser(
description="STT Event Capture - Capture WebSocket events for visualization",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python -m evals.stt.event_capture audio/multi_speaker.m4a --provider deepgram
python -m evals.stt.event_capture audio/multi_speaker.m4a --provider speechmatics --diarize
""",
)
parser.add_argument(
"audio_file",
type=str,
help="Path to audio file (relative to evals/stt/ or absolute)",
)
parser.add_argument(
"--provider",
required=True,
choices=["deepgram", "deepgram-flux", "speechmatics"],
help="STT provider to use",
)
parser.add_argument(
"--sample-rate",
type=int,
default=8000,
help="Audio sample rate for streaming (default: 8000)",
)
parser.add_argument(
"--diarize",
action="store_true",
help="Enable speaker diarization",
)
parser.add_argument(
"--output-dir",
type=str,
default="results",
help="Output directory for results (default: results)",
)
args = parser.parse_args()
# Resolve audio path
script_dir = Path(__file__).parent
audio_path = Path(args.audio_file)
if not audio_path.is_absolute():
audio_path = script_dir / audio_path
if not audio_path.exists():
print(f"Error: Audio file not found: {audio_path}")
return 1
print(f"Audio file: {audio_path}")
print(f"Provider: {args.provider}")
print(f"Sample rate: {args.sample_rate} Hz")
print(f"Diarization: {args.diarize}")
try:
provider = get_provider(args.provider)
print(f"\nCapturing events from {provider.name}...")
result = await capture_events(
provider,
audio_path,
sample_rate=args.sample_rate,
diarize=args.diarize,
)
output_dir = script_dir / args.output_dir
output_file = save_result(result, output_dir)
print(f"\nCapture complete!")
print(f" Duration: {result.duration:.2f}s")
print(f" Events: {len(result.events)}")
print(f" Saved to: {output_file}")
# Show first few events
print(f"\nFirst 5 events:")
for event in result.events[:5]:
print(f" [{event.timestamp:.2f}s] {event.event_type}")
return 0
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))