feat: support flux for dograh multi

This commit is contained in:
Abhishek Kumar 2026-06-24 12:45:07 +05:30
parent 0956157029
commit ca598933ef
7 changed files with 176 additions and 11 deletions

View file

@ -61,6 +61,7 @@ from api.services.configuration.check_validity import UserConfigurationValidator
from api.services.configuration.defaults import DEFAULT_SERVICE_PROVIDERS
from api.services.configuration.masking import is_mask_of, mask_key, mask_user_config
from api.services.configuration.registry import (
DOGRAH_MULTILINGUAL_AUTODETECT_LANGUAGES,
DOGRAH_STT_LANGUAGES,
REGISTRY,
DograhTTSService,
@ -274,6 +275,7 @@ async def get_model_configuration_v2_defaults(
"step": DOGRAH_SPEED_STEP,
},
"languages": DOGRAH_STT_LANGUAGES,
"multilingual_languages": DOGRAH_MULTILINGUAL_AUTODETECT_LANGUAGES,
"defaults": {
"voice": DOGRAH_DEFAULT_VOICE,
"speed": 1.0,

View file

@ -15,6 +15,7 @@ from api.services.configuration.options import (
AZURE_SPEECH_TTS_LANGUAGES,
AZURE_SPEECH_TTS_VOICES,
DEEPGRAM_FLUX_MULTILINGUAL_LANGUAGE_OPTIONS,
DEEPGRAM_FLUX_MULTILINGUAL_LANGUAGES,
DEEPGRAM_LANGUAGES,
DEEPGRAM_STT_MODELS,
GLADIA_STT_LANGUAGES,
@ -1397,6 +1398,10 @@ class GoogleSTTConfiguration(BaseSTTConfiguration):
# Dograh STT Service
DOGRAH_STT_MODELS = ["default"]
DOGRAH_STT_LANGUAGES = DEEPGRAM_LANGUAGES
# Languages auto-detected when the Dograh STT language is "multi". Dograh STT runs
# Deepgram Flux multilingual under the hood, which only auto-detects this subset —
# not the full DOGRAH_STT_LANGUAGES list offered for explicit single-language selection.
DOGRAH_MULTILINGUAL_AUTODETECT_LANGUAGES = DEEPGRAM_FLUX_MULTILINGUAL_LANGUAGES
@register_stt

View file

@ -6,7 +6,6 @@ from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.configuration.options import DEEPGRAM_FLUX_MODELS
from api.services.configuration.registry import ServiceProviders
from api.services.integrations import (
IntegrationRuntimeContext,
@ -47,6 +46,7 @@ from api.services.pipecat.service_factory import (
create_realtime_llm_service,
create_stt_service,
create_tts_service,
stt_uses_flux_turns,
)
from api.services.pipecat.tracing_config import (
ensure_tracing,
@ -626,14 +626,10 @@ async def _run_pipeline(
user_config.realtime.provider
)
else:
# Deepgram Flux uses external turn detection (VAD + External start/stop)
# Other models use configurable turn detection strategy
is_deepgram_flux = (
user_config.stt.provider == ServiceProviders.DEEPGRAM.value
and user_config.stt.model in DEEPGRAM_FLUX_MODELS
)
if is_deepgram_flux:
# Deepgram Flux and supported Dograh managed Flux languages emit their
# own turn boundaries, so the aggregator follows those external signals.
# Other models use configurable turn detection.
if stt_uses_flux_turns(user_config):
user_turn_strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),

View file

@ -6,7 +6,10 @@ from fastapi import HTTPException
from loguru import logger
from api.constants import MPS_API_URL
from api.services.configuration.options import DEEPGRAM_FLUX_MODELS
from api.services.configuration.options import (
DEEPGRAM_FLUX_MODELS,
DEEPGRAM_FLUX_MULTILINGUAL_LANGUAGE_OPTIONS,
)
from api.services.configuration.registry import ServiceProviders
from api.services.pipecat.minimax_tts import MiniMaxOwnedSessionTTSService
from api.utils.url_security import validate_user_configured_service_url
@ -27,6 +30,7 @@ from pipecat.services.deepgram.flux.stt import (
)
from pipecat.services.deepgram.stt import DeepgramSTTService, DeepgramSTTSettings
from pipecat.services.deepgram.tts import DeepgramTTSService, DeepgramTTSSettings
from pipecat.services.dograh.flux.stt import DograhFluxSTTService
from pipecat.services.dograh.llm import DograhLLMService
from pipecat.services.dograh.stt import DograhSTTService, DograhSTTSettings
from pipecat.services.dograh.tts import DograhTTSService, DograhTTSSettings
@ -94,6 +98,19 @@ DEEPGRAM_FLUX_LANGUAGE_HINTS = {
}
def dograh_stt_uses_flux_language(language: str | None) -> bool:
language = language or "multi"
return language in DEEPGRAM_FLUX_MULTILINGUAL_LANGUAGE_OPTIONS
def stt_uses_flux_turns(user_config) -> bool:
if user_config.stt.provider == ServiceProviders.DEEPGRAM.value:
return user_config.stt.model in DEEPGRAM_FLUX_MODELS
if user_config.stt.provider == ServiceProviders.DOGRAH.value:
return dograh_stt_uses_flux_language(getattr(user_config.stt, "language", None))
return False
def _validate_runtime_service_url(url: str, field_name: str) -> None:
try:
validate_user_configured_service_url(
@ -193,6 +210,29 @@ def create_stt_service(
elif user_config.stt.provider == ServiceProviders.DOGRAH.value:
base_url = MPS_API_URL.replace("http://", "ws://").replace("https://", "wss://")
language = getattr(user_config.stt, "language", None) or "multi"
if dograh_stt_uses_flux_language(language):
# Dograh's Flux proxy only supports multilingual auto-detect and the
# same language hint subset as Deepgram Flux multilingual.
settings_kwargs = {
"model": "flux-general-multi",
"eot_timeout_ms": 3000,
"eot_threshold": 0.7,
"eager_eot_threshold": 0.5,
"keyterm": keyterms or [],
}
language_hint = DEEPGRAM_FLUX_LANGUAGE_HINTS.get(language)
if language_hint:
settings_kwargs["language_hints"] = [language_hint]
return DograhFluxSTTService(
base_url=base_url,
api_key=user_config.stt.api_key,
correlation_id=correlation_id,
settings=DeepgramFluxSTTSettings(**settings_kwargs),
should_interrupt=False, # external turn strategies own interruption
sample_rate=audio_config.transport_in_sample_rate,
)
return DograhSTTService(
base_url=base_url,
api_key=user_config.stt.api_key,

View file

@ -0,0 +1,107 @@
from types import SimpleNamespace
from unittest.mock import patch
from pipecat.services.settings import NOT_GIVEN
from pipecat.transcriptions.language import Language
from api.services.configuration.registry import ServiceProviders
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.service_factory import (
create_stt_service,
dograh_stt_uses_flux_language,
stt_uses_flux_turns,
)
def _audio_config() -> AudioConfig:
return AudioConfig(
transport_in_sample_rate=16000,
transport_out_sample_rate=16000,
)
def _dograh_config(language: str | None) -> SimpleNamespace:
return SimpleNamespace(
stt=SimpleNamespace(
provider=ServiceProviders.DOGRAH.value,
api_key="mps-key",
model="default",
language=language,
)
)
def test_dograh_flux_language_predicate_matches_multilingual_support():
assert dograh_stt_uses_flux_language(None)
assert dograh_stt_uses_flux_language("multi")
assert dograh_stt_uses_flux_language("es")
assert not dograh_stt_uses_flux_language("ar")
def test_stt_uses_flux_turns_only_for_dograh_flux_supported_languages():
assert stt_uses_flux_turns(_dograh_config("multi"))
assert stt_uses_flux_turns(_dograh_config("es"))
assert not stt_uses_flux_turns(_dograh_config("ar"))
def test_create_dograh_multi_uses_flux_service_without_language_hint():
user_config = _dograh_config("multi")
with (
patch(
"api.services.pipecat.service_factory.DograhFluxSTTService"
) as flux_service,
patch("api.services.pipecat.service_factory.DograhSTTService") as stt_service,
):
create_stt_service(user_config, _audio_config(), correlation_id="corr-123")
flux_service.assert_called_once()
stt_service.assert_not_called()
kwargs = flux_service.call_args.kwargs
assert kwargs["correlation_id"] == "corr-123"
assert kwargs["settings"].model == "flux-general-multi"
assert kwargs["settings"].language_hints is NOT_GIVEN
def test_create_dograh_supported_language_uses_flux_service_with_hint():
user_config = _dograh_config("es")
with (
patch(
"api.services.pipecat.service_factory.DograhFluxSTTService"
) as flux_service,
patch("api.services.pipecat.service_factory.DograhSTTService") as stt_service,
):
create_stt_service(user_config, _audio_config(), keyterms=["Dograh"])
flux_service.assert_called_once()
stt_service.assert_not_called()
kwargs = flux_service.call_args.kwargs
assert kwargs["settings"].model == "flux-general-multi"
assert kwargs["settings"].language_hints == [Language.ES]
assert kwargs["settings"].keyterm == ["Dograh"]
def test_create_dograh_unsupported_language_falls_back_to_standard_stt_service():
user_config = _dograh_config("ar")
with (
patch(
"api.services.pipecat.service_factory.DograhFluxSTTService"
) as flux_service,
patch("api.services.pipecat.service_factory.DograhSTTService") as stt_service,
):
create_stt_service(
user_config,
_audio_config(),
keyterms=["Dograh"],
correlation_id="corr-123",
)
flux_service.assert_not_called()
stt_service.assert_called_once()
kwargs = stt_service.call_args.kwargs
assert kwargs["correlation_id"] == "corr-123"
assert kwargs["settings"].model == "default"
assert kwargs["settings"].language == "ar"
assert kwargs["keyterms"] == ["Dograh"]