feat: better interrupt strategies (#479)

* chore: drain active calls before rolling updates

* Use provisional VAD interruption strategy

* feat: wire provisional VAD configuration

* chore: refactor user turn strategies

* chore: bump pipecat
This commit is contained in:
Abhishek 2026-06-30 14:52:17 +05:30 committed by GitHub
parent 962d5afa12
commit 6937e01b49
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 645 additions and 193 deletions

View file

@ -80,7 +80,8 @@ from pipecat.turns.user_mute import (
)
from pipecat.turns.user_start import (
ExternalUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
MinWordsUserTurnStartStrategy,
ProvisionalVADUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import (
VADUserTurnStartStrategy,
@ -99,6 +100,10 @@ ensure_tracing()
DEFAULT_USER_TURN_STOP_TIMEOUT = 5.0
EXTERNAL_TURN_USER_STOP_TIMEOUT = 30.0
DEFAULT_TURN_START_STRATEGY = "default"
DEFAULT_TURN_START_MIN_WORDS = 3
DEFAULT_PROVISIONAL_VAD_PAUSE_SECS = 1.5
DEFAULT_SMART_TURN_STOP_SECS = 2.0
def _resolve_user_turn_stop_timeout(
@ -111,6 +116,80 @@ def _resolve_user_turn_stop_timeout(
return DEFAULT_USER_TURN_STOP_TIMEOUT
def _resolve_turn_start_min_words(run_configs: dict) -> int:
return max(
1,
int(run_configs.get("turn_start_min_words", DEFAULT_TURN_START_MIN_WORDS)),
)
def _resolve_provisional_vad_pause_secs(run_configs: dict) -> float:
return max(
0.1,
float(
run_configs.get(
"provisional_vad_pause_secs", DEFAULT_PROVISIONAL_VAD_PAUSE_SECS
)
),
)
def _create_non_realtime_user_turn_start_strategies(
run_configs: dict, *, uses_external_turns: bool
):
"""Return user turn start strategies for non-realtime pipelines."""
turn_start_strategy = run_configs.get(
"turn_start_strategy", DEFAULT_TURN_START_STRATEGY
)
if turn_start_strategy == "min_words":
return [
MinWordsUserTurnStartStrategy(
min_words=_resolve_turn_start_min_words(run_configs)
)
]
if turn_start_strategy == "provisional_vad":
return [
ProvisionalVADUserTurnStartStrategy(
pause_secs=_resolve_provisional_vad_pause_secs(run_configs)
)
]
if uses_external_turns:
# The STT emits its own turn boundaries and owns interruptions. Local
# VAD is deliberately kept out of the default start strategies: it would
# win the race on raw voice activity and start the turn before the STT
# confirms a real turn.
return [ExternalUserTurnStartStrategy(enable_interruptions=True)]
return [VADUserTurnStartStrategy()]
def _create_non_realtime_user_turn_stop_strategies(
run_configs: dict, *, uses_external_turns: bool
):
"""Return user turn stop strategies for non-realtime pipelines."""
if uses_external_turns:
return [ExternalUserTurnStopStrategy()]
if run_configs.get("turn_stop_strategy") == "turn_analyzer":
smart_turn_params = SmartTurnParams(
stop_secs=run_configs.get(
"smart_turn_stop_secs", DEFAULT_SMART_TURN_STOP_SECS
)
)
return [
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3(params=smart_turn_params)
)
]
return [SpeechTimeoutUserTurnStopStrategy()]
def _create_realtime_user_turn_config(provider: str):
"""Return user turn strategies and optional local VAD for realtime providers."""
@ -461,8 +540,6 @@ async def _run_pipeline_impl(
# Extract configurations from the version's workflow_configurations
max_call_duration_seconds = 300 # Default 5 minutes
max_user_idle_timeout = 10.0 # Default 10 seconds
smart_turn_stop_secs = 2.0 # Default 2 seconds for incomplete turn timeout
turn_stop_strategy = "transcription" # Default to transcription-based detection
keyterms = None # Dictionary words for STT boosting
if run_configs:
@ -472,12 +549,6 @@ async def _run_pipeline_impl(
if "max_user_idle_timeout" in run_configs:
max_user_idle_timeout = run_configs["max_user_idle_timeout"]
if "smart_turn_stop_secs" in run_configs:
smart_turn_stop_secs = run_configs["smart_turn_stop_secs"]
if "turn_stop_strategy" in run_configs:
turn_stop_strategy = run_configs["turn_stop_strategy"]
if "dictionary" in run_configs:
dictionary = run_configs["dictionary"]
if dictionary and isinstance(dictionary, str):
@ -734,37 +805,27 @@ async def _run_pipeline_impl(
# follows those external signals. Other models use configurable turn
# detection.
uses_external_turns = stt_uses_external_turns(user_config)
if uses_external_turns:
user_turn_strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
ExternalUserTurnStartStrategy(enable_interruptions=True),
],
stop=[ExternalUserTurnStopStrategy()],
)
elif turn_stop_strategy == "turn_analyzer":
# Smart Turn Analyzer: best for longer responses with natural pauses
smart_turn_params = SmartTurnParams(stop_secs=smart_turn_stop_secs)
user_turn_strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
TranscriptionUserTurnStartStrategy(),
],
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3(params=smart_turn_params)
)
],
)
else:
# Transcription-based (default): best for short 1-2 word responses
user_turn_strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
TranscriptionUserTurnStartStrategy(),
],
stop=[SpeechTimeoutUserTurnStopStrategy()],
)
user_turn_start_strategies = _create_non_realtime_user_turn_start_strategies(
run_configs,
uses_external_turns=uses_external_turns,
)
turn_start_strategy = run_configs.get(
"turn_start_strategy", DEFAULT_TURN_START_STRATEGY
)
logger.info(
f"[run {workflow_run_id}] Non-realtime interrupt strategy "
f"requested={turn_start_strategy} "
f"uses_external_turns={uses_external_turns}"
)
user_turn_stop_strategies = _create_non_realtime_user_turn_stop_strategies(
run_configs,
uses_external_turns=uses_external_turns,
)
user_turn_strategies = UserTurnStrategies(
start=user_turn_start_strategies,
stop=user_turn_stop_strategies,
)
user_turn_stop_timeout = _resolve_user_turn_stop_timeout(
run_configs,

View file

@ -197,6 +197,7 @@ def create_stt_service(
return OpenAISTTService(
api_key=user_config.stt.api_key,
settings=OpenAISTTSettings(model=user_config.stt.model),
should_interrupt=False, # Let UserAggregator own interruption confirmation.
**kwargs,
)
elif user_config.stt.provider == ServiceProviders.GOOGLE.value:

View file

@ -1,6 +1,8 @@
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.turns.user_start import (
ExternalUserTurnStartStrategy,
MinWordsUserTurnStartStrategy,
ProvisionalVADUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import (
VADUserTurnStartStrategy,
@ -8,12 +10,18 @@ from pipecat.turns.user_start.vad_user_turn_start_strategy import (
from pipecat.turns.user_stop import (
ExternalUserTurnStopStrategy,
SpeechTimeoutUserTurnStopStrategy,
TurnAnalyzerUserTurnStopStrategy,
)
import api.services.pipecat.run_pipeline as run_pipeline_module
from api.services.configuration.registry import ServiceProviders
from api.services.pipecat.run_pipeline import (
DEFAULT_PROVISIONAL_VAD_PAUSE_SECS,
DEFAULT_TURN_START_MIN_WORDS,
DEFAULT_USER_TURN_STOP_TIMEOUT,
EXTERNAL_TURN_USER_STOP_TIMEOUT,
_create_non_realtime_user_turn_start_strategies,
_create_non_realtime_user_turn_stop_strategies,
_create_realtime_user_turn_config,
_resolve_user_turn_stop_timeout,
)
@ -115,6 +123,119 @@ def test_unknown_realtime_providers_keep_local_vad():
assert strategies.stop[0].wait_for_transcript is False
def test_non_realtime_default_uses_external_start_for_external_turn_stt():
strategies = _create_non_realtime_user_turn_start_strategies(
{},
uses_external_turns=True,
)
assert len(strategies) == 1
assert isinstance(strategies[0], ExternalUserTurnStartStrategy)
assert strategies[0]._enable_interruptions is True
def test_non_realtime_default_uses_vad_start_for_standard_stt():
strategies = _create_non_realtime_user_turn_start_strategies(
{},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], VADUserTurnStartStrategy)
def test_non_realtime_can_use_min_words_start_strategy():
strategies = _create_non_realtime_user_turn_start_strategies(
{"turn_start_strategy": "min_words", "turn_start_min_words": 4},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], MinWordsUserTurnStartStrategy)
assert strategies[0]._min_words == 4
def test_non_realtime_explicit_min_words_overrides_external_turn_default():
strategies = _create_non_realtime_user_turn_start_strategies(
{"turn_start_strategy": "min_words", "turn_start_min_words": 4},
uses_external_turns=True,
)
assert len(strategies) == 1
assert isinstance(strategies[0], MinWordsUserTurnStartStrategy)
assert strategies[0]._min_words == 4
def test_non_realtime_min_words_start_strategy_has_default_threshold():
strategies = _create_non_realtime_user_turn_start_strategies(
{"turn_start_strategy": "min_words"},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], MinWordsUserTurnStartStrategy)
assert strategies[0]._min_words == DEFAULT_TURN_START_MIN_WORDS
def test_non_realtime_can_use_provisional_vad_start_strategy():
strategies = _create_non_realtime_user_turn_start_strategies(
{"turn_start_strategy": "provisional_vad"},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], ProvisionalVADUserTurnStartStrategy)
assert strategies[0]._pause_secs == DEFAULT_PROVISIONAL_VAD_PAUSE_SECS
def test_non_realtime_provisional_vad_uses_configured_pause_secs():
strategies = _create_non_realtime_user_turn_start_strategies(
{"turn_start_strategy": "provisional_vad", "provisional_vad_pause_secs": 0.4},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], ProvisionalVADUserTurnStartStrategy)
assert strategies[0]._pause_secs == 0.4
def test_non_realtime_uses_external_stop_for_external_turn_stt():
strategies = _create_non_realtime_user_turn_stop_strategies(
{},
uses_external_turns=True,
)
assert len(strategies) == 1
assert isinstance(strategies[0], ExternalUserTurnStopStrategy)
def test_non_realtime_default_uses_speech_timeout_stop():
strategies = _create_non_realtime_user_turn_stop_strategies(
{},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], SpeechTimeoutUserTurnStopStrategy)
def test_non_realtime_can_use_turn_analyzer_stop_strategy(monkeypatch):
monkeypatch.setattr(
run_pipeline_module,
"LocalSmartTurnAnalyzerV3",
lambda *, params: params,
)
strategies = _create_non_realtime_user_turn_stop_strategies(
{"turn_stop_strategy": "turn_analyzer", "smart_turn_stop_secs": 1.5},
uses_external_turns=False,
)
assert len(strategies) == 1
assert isinstance(strategies[0], TurnAnalyzerUserTurnStopStrategy)
assert strategies[0]._turn_analyzer.stop_secs == 1.5
def test_external_turn_stt_uses_longer_stop_timeout():
assert (
_resolve_user_turn_stop_timeout({}, uses_external_turns=True)