feat: enable FORCE_TURN_RELAY to diagnose turn connectivity for local deployment setups (#272)

* filter out local sdp candidates on non local environment

* feat: add FORCE_TURN_RELAY variable

* add FORCE_TURN_RELAY option in docker-compose

* fix: fix github workflow
This commit is contained in:
Abhishek 2026-05-11 17:13:01 +05:30 committed by GitHub
parent 01c201bf09
commit e2fe1f3cd4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 410 additions and 37 deletions

View file

@ -133,6 +133,11 @@ TURN_HOST = os.getenv("TURN_HOST", "localhost")
TURN_PORT = int(os.getenv("TURN_PORT", "3478"))
TURN_TLS_PORT = int(os.getenv("TURN_TLS_PORT", "5349"))
TURN_CREDENTIAL_TTL = int(os.getenv("TURN_CREDENTIAL_TTL", "86400"))
# Diagnostic flag: when true, strip all non-relay ICE candidates from the
# answer SDP so every media path must traverse the TURN server. Use for
# verifying TURN connectivity end-to-end; expect connection failures if
# TURN is misconfigured or unreachable.
FORCE_TURN_RELAY = os.getenv("FORCE_TURN_RELAY", "false").lower() == "true"
# OSS Email/Password Auth
OSS_JWT_SECRET = os.getenv("OSS_JWT_SECRET", "change-me-in-production")

View file

@ -66,11 +66,19 @@ class HealthResponse(BaseModel):
backend_api_endpoint: str
deployment_mode: str
auth_provider: str
turn_enabled: bool
force_turn_relay: bool
@router.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
from api.constants import APP_VERSION, AUTH_PROVIDER, DEPLOYMENT_MODE
from api.constants import (
APP_VERSION,
AUTH_PROVIDER,
DEPLOYMENT_MODE,
FORCE_TURN_RELAY,
TURN_SECRET,
)
from api.utils.common import get_backend_endpoints
logger.debug("Health endpoint called")
@ -81,4 +89,6 @@ async def health() -> HealthResponse:
backend_api_endpoint=backend_endpoint,
deployment_mode=DEPLOYMENT_MODE,
auth_provider=AUTH_PROVIDER,
turn_enabled=bool(TURN_SECRET),
force_turn_relay=FORCE_TURN_RELAY,
)

View file

@ -28,7 +28,7 @@ from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.utils.run_context import set_current_org_id, set_current_run_id
from starlette.websockets import WebSocketState
from api.constants import ENVIRONMENT
from api.constants import ENVIRONMENT, FORCE_TURN_RELAY
from api.db import db_client
from api.db.models import UserModel
from api.enums import Environment
@ -77,6 +77,58 @@ def is_private_ip_candidate(candidate_str: str) -> bool:
return False
def filter_outbound_sdp(sdp: str) -> str:
"""Strip ICE candidates from an outbound answer SDP based on env config.
Two filters apply:
1. In non-LOCAL environments, drop host candidates with private/CGNAT IPs.
aiortc gathers host candidates from every interface on the box, including
Docker bridges (172.17.0.1, 172.18.0.1). Advertising those to the browser
causes coturn "peer IP X denied" errors when the browser asks TURN to
permit them.
2. When FORCE_TURN_RELAY is set, drop every non-relay candidate so the
only path the browser can use is via TURN. Lets you verify TURN
connectivity end-to-end if TURN is broken, the call simply fails.
"""
if ENVIRONMENT == Environment.LOCAL.value and not FORCE_TURN_RELAY:
return sdp
lines = sdp.split("\r\n")
filtered: List[str] = []
dropped_non_relay = 0
kept_relay = 0
for line in lines:
if line.startswith("a=candidate:"):
candidate_str = line[2:]
if FORCE_TURN_RELAY and " typ relay" not in candidate_str:
dropped_non_relay += 1
continue
if ENVIRONMENT != Environment.LOCAL.value and is_private_ip_candidate(
candidate_str
):
continue
if FORCE_TURN_RELAY:
kept_relay += 1
filtered.append(line)
if FORCE_TURN_RELAY:
if kept_relay == 0:
logger.warning(
"FORCE_TURN_RELAY is on but the answer SDP has no relay candidates "
f"(dropped {dropped_non_relay} non-relay). TURN may be unreachable; "
"the connection will fail."
)
else:
logger.info(
f"FORCE_TURN_RELAY: kept {kept_relay} relay candidates, "
f"dropped {dropped_non_relay} non-relay"
)
return "\r\n".join(filtered)
def get_ice_servers(user_id: Optional[str] = None) -> List[RTCIceServer]:
"""Build ICE servers configuration including TURN if configured.
@ -247,7 +299,11 @@ class SignalingManager:
await ws.send_json(
{
"type": "answer",
"payload": {"sdp": answer["sdp"], "type": "answer", "pc_id": pc_id},
"payload": {
"sdp": filter_outbound_sdp(answer["sdp"]),
"type": "answer",
"pc_id": pc_id,
},
}
)
else:
@ -299,7 +355,7 @@ class SignalingManager:
{
"type": "answer",
"payload": {
"sdp": answer["sdp"],
"sdp": filter_outbound_sdp(answer["sdp"]),
"type": answer["type"],
"pc_id": answer["pc_id"],
},
@ -380,7 +436,7 @@ class SignalingManager:
{
"type": "answer",
"payload": {
"sdp": answer["sdp"],
"sdp": filter_outbound_sdp(answer["sdp"]),
"type": "answer",
"pc_id": pc_id, # Use the client's pc_id
},

View file

@ -241,12 +241,10 @@ async def _run_pipeline(
raise HTTPException(status_code=400, detail="Workflow run already completed")
merged_call_context_vars = workflow_run.initial_context
# If there is some extra call_context_vars, update them
# If there is some extra call_context_vars, fold them in. Persistence
# happens once below, after runtime_configuration is also resolved.
if call_context_vars:
merged_call_context_vars = {**merged_call_context_vars, **call_context_vars}
await db_client.update_workflow_run(
workflow_run_id, initial_context=merged_call_context_vars
)
# Get user configuration
user_config = await db_client.get_user_configurations(user_id)
@ -312,6 +310,36 @@ async def _run_pipeline(
llm = create_llm_service(user_config)
inference_llm = None
# Stamp the providers/models actually resolved for this run onto
# initial_context so they're available for post-call analytics
# (model_overrides may have shifted them away from the org-level
# user_config).
if is_realtime:
# llm_* refers to the side-channel text LLM (variable extraction,
# voicemail detection); realtime_* is the speech-to-speech service.
runtime_configuration = {
"realtime_provider": user_config.realtime.provider,
"realtime_model": user_config.realtime.model,
"llm_provider": user_config.llm.provider,
"llm_model": user_config.llm.model,
}
else:
runtime_configuration = {
"stt_provider": user_config.stt.provider,
"stt_model": user_config.stt.model,
"tts_provider": user_config.tts.provider,
"tts_model": user_config.tts.model,
"llm_provider": user_config.llm.provider,
"llm_model": user_config.llm.model,
}
merged_call_context_vars = {
**merged_call_context_vars,
"runtime_configuration": runtime_configuration,
}
await db_client.update_workflow_run(
workflow_run_id, initial_context=merged_call_context_vars
)
workflow_graph = WorkflowGraph(ReactFlowDTO.model_validate(run_workflow_json))
# Pre-call fetch: fire early so it runs concurrently with remaining setup

View file

@ -106,7 +106,7 @@ async def check_dograh_quota(
logger.info(
f"Dograh quota check passed for key ...{api_key[-8:]}: "
f"${remaining:.2f} remaining"
f"{remaining:.2f} credits remaining"
)
except Exception as e:
logger.error(f"Failed to check quota for Dograh key: {str(e)}")