feat: add pre call fetch configuration (#222)

* feat: add pre call fetch configuration

* docs: add NEW tags for pages about new features

---------

Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
This commit is contained in:
Abhishek 2026-04-06 12:30:37 +05:30 committed by GitHub
parent c4c4b591db
commit ec2f322486
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 646 additions and 66 deletions

View file

@ -1,3 +1,5 @@
import asyncio
from loguru import logger
from api.db import db_client
@ -13,6 +15,7 @@ from api.services.pipecat.tracing_config import get_trace_url
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
from api.utils.hold_audio import play_hold_audio_loop
from pipecat.frames.frames import Frame, LLMContextFrame, TTSSpeakFrame
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
@ -28,6 +31,7 @@ def register_event_handlers(
in_memory_logs_buffer: InMemoryLogsBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
audio_config=AudioConfig,
pre_call_fetch_task: asyncio.Task | None = None,
):
"""Register all event handlers for transport and task events.
@ -58,6 +62,9 @@ def register_event_handlers(
async def maybe_trigger_initial_response():
"""Start the conversation after both pipeline_started and client_connected events.
If a pre-call fetch is in progress, plays a ringer while waiting for the
response, then merges the result into the call context before proceeding.
If the start node has a greeting configured, play it directly via TTS.
Otherwise, trigger an LLM generation for the opening message.
"""
@ -68,6 +75,43 @@ def register_event_handlers(
):
ready_state["initial_response_triggered"] = True
# Wait for pre-call fetch if in progress, playing ringer meanwhile
if pre_call_fetch_task is not None:
if not pre_call_fetch_task.done():
logger.info(
"Pre-call fetch still in progress, playing ringer while waiting"
)
stop_ringer = asyncio.Event()
sample_rate = audio_config.pipeline_sample_rate or 16000
ringer_task = asyncio.create_task(
play_hold_audio_loop(task, stop_ringer, sample_rate)
)
try:
fetch_result = await pre_call_fetch_task
finally:
stop_ringer.set()
await ringer_task
else:
fetch_result = pre_call_fetch_task.result()
if fetch_result:
engine._call_context_vars.update(fetch_result)
try:
await db_client.update_workflow_run(
workflow_run_id,
initial_context={**engine._call_context_vars},
)
except Exception as e:
logger.error(f"Failed to persist pre-call fetch context: {e}")
logger.info(
f"Pre-call fetch complete, merged keys: "
f"{list(fetch_result.keys())}"
)
# Set the start node now (after pre-call fetch data is merged)
# so that render_template() has the complete _call_context_vars.
await engine.set_node(engine.workflow.start_node_id)
greeting = engine.get_start_greeting()
if greeting:
logger.debug(

View file

@ -0,0 +1,115 @@
"""Pre-call HTTP data fetch for StartCall node.
Executes an HTTP request before a voice call starts to enrich the
call context with data from external systems (CRM, ERP, etc.).
"""
from typing import Any, Dict, Optional
import httpx
from loguru import logger
from api.db import db_client
from api.utils.credential_auth import build_auth_header
PRE_CALL_FETCH_TIMEOUT_SECONDS = 10
async def execute_pre_call_fetch(
*,
url: str,
credential_uuid: Optional[str],
call_context_vars: Dict[str, Any],
workflow_id: int,
organization_id: int,
) -> Dict[str, Any]:
"""Execute a POST request to fetch data before a call starts.
Sends a standardized payload with call metadata (agent_id, from/to numbers).
The response JSON is returned as a dict to be merged into initial_context.
Returns:
Response JSON dict on success, empty dict on any failure.
Never raises.
"""
# Build standardized payload
payload = {
"event": "call_inbound",
"call_inbound": {
"agent_id": workflow_id,
"from_number": call_context_vars.get("caller_number", ""),
"to_number": call_context_vars.get("called_number", ""),
},
}
# Build headers
headers: Dict[str, str] = {"Content-Type": "application/json"}
if credential_uuid:
try:
credential = await db_client.get_credential_by_uuid(
credential_uuid, organization_id
)
if credential:
headers.update(build_auth_header(credential))
else:
logger.warning(
f"Pre-call fetch: credential {credential_uuid} not found"
)
except Exception as e:
logger.error(f"Pre-call fetch: failed to resolve credential: {e}")
logger.info(f"Pre-call fetch: POST {url}")
try:
async with httpx.AsyncClient(timeout=PRE_CALL_FETCH_TIMEOUT_SECONDS) as client:
response = await client.post(url, headers=headers, json=payload)
try:
response_data = response.json()
except Exception:
response_data = {}
if response.is_success:
if not isinstance(response_data, dict):
logger.warning(
"Pre-call fetch: response is not a JSON object, skipping"
)
return {}
# Extract dynamic_variables from Retell-compatible response
# Supports: {call_inbound: {dynamic_variables: {...}}}
# or: {dynamic_variables: {...}}
dynamic_vars = {}
call_inbound = response_data.get("call_inbound")
if isinstance(call_inbound, dict):
dynamic_vars = call_inbound.get("dynamic_variables", {})
elif "dynamic_variables" in response_data:
dynamic_vars = response_data["dynamic_variables"]
if not isinstance(dynamic_vars, dict):
dynamic_vars = {}
logger.info(
f"Pre-call fetch: success ({response.status_code}), "
f"dynamic_variables keys: {list(dynamic_vars.keys())}"
)
return dynamic_vars
else:
logger.warning(
f"Pre-call fetch: HTTP {response.status_code} - "
f"{response.text[:200]}"
)
return {}
except httpx.TimeoutException:
logger.error(
f"Pre-call fetch: timed out after {PRE_CALL_FETCH_TIMEOUT_SECONDS}s"
)
return {}
except httpx.RequestError as e:
logger.error(f"Pre-call fetch: request failed: {e}")
return {}
except Exception as e:
logger.error(f"Pre-call fetch: unexpected error: {e}")
return {}

View file

@ -24,6 +24,7 @@ from api.services.pipecat.pipeline_engine_callbacks_processor import (
PipelineEngineCallbacksProcessor,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.pre_call_fetch import execute_pre_call_fetch
from api.services.pipecat.realtime_feedback_observer import (
RealtimeFeedbackObserver,
register_turn_log_handlers,
@ -622,6 +623,28 @@ async def _run_pipeline(
ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback)
)
# Pre-call fetch: fire early so it runs concurrently with remaining setup
pre_call_fetch_task = None
start_node = workflow_graph.nodes.get(workflow_graph.start_node_id)
if (
start_node
and start_node.pre_call_fetch_enabled
and start_node.pre_call_fetch_url
):
logger.info(
f"Pre-call fetch enabled for workflow run {workflow_run_id}, "
f"firing request to {start_node.pre_call_fetch_url}"
)
pre_call_fetch_task = asyncio.create_task(
execute_pre_call_fetch(
url=start_node.pre_call_fetch_url,
credential_uuid=start_node.pre_call_fetch_credential_uuid,
call_context_vars=merged_call_context_vars,
workflow_id=workflow_id,
organization_id=workflow.organization_id,
)
)
# Create in-memory logs buffer early so it can be used by engine callbacks
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
@ -952,6 +975,7 @@ async def _run_pipeline(
in_memory_logs_buffer=in_memory_logs_buffer,
pipeline_metrics_aggregator=pipeline_metrics_aggregator,
audio_config=audio_config,
pre_call_fetch_task=pre_call_fetch_task,
)
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)

View file

@ -59,6 +59,10 @@ class NodeDataDTO(BaseModel):
detect_voicemail: bool = False
delayed_start: bool = False
delayed_start_duration: Optional[float] = None
# Pre-call fetch (start node only)
pre_call_fetch_enabled: bool = False
pre_call_fetch_url: Optional[str] = None
pre_call_fetch_credential_uuid: Optional[str] = None
tool_uuids: Optional[List[str]] = None
document_uuids: Optional[List[str]] = None
trigger_path: Optional[str] = None

View file

@ -162,8 +162,6 @@ class PipecatEngine:
if self._context_compaction_enabled:
self._context_summarization_manager = ContextSummarizationManager(self)
await self.set_node(self.workflow.start_node_id)
logger.debug(f"{self.__class__.__name__} initialized")
except Exception as e:
logger.error(f"Error initializing {self.__class__.__name__}: {e}")

View file

@ -14,7 +14,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional
from loguru import logger
from api.constants import APP_ROOT_DIR
from api.db import db_client
from api.enums import ToolCategory, WorkflowRunMode
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
@ -28,11 +27,10 @@ from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
from api.utils.hold_audio import load_hold_audio
from api.utils.hold_audio import play_hold_audio_loop
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import (
FunctionCallResultProperties,
OutputAudioRawFrame,
TTSSpeakFrame,
)
from pipecat.services.llm_service import FunctionCallParams
@ -539,7 +537,11 @@ class CustomToolManager:
# Start hold music as background task
hold_music_task = asyncio.create_task(
self.play_hold_music_loop(hold_music_stop_event, sample_rate)
play_hold_audio_loop(
self._engine.task,
hold_music_stop_event,
sample_rate,
)
)
# Wait for transfer completion using Redis pub/sub
@ -666,44 +668,3 @@ class CustomToolManager:
# Unknown action, treat as generic success
logger.warning(f"Unknown transfer action: {action}, treating as success")
await function_call_params.result_callback(result)
async def play_hold_music_loop(
self, stop_event: asyncio.Event, sample_rate: int = 8000
):
"""Play hold music in a loop until stop event is triggered.
Args:
stop_event: Event to stop the hold music loop
sample_rate: Sample rate for the hold music (default 8000Hz for Twilio)
"""
try:
# Path to hold music file based on sample rate
hold_music_file = (
APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav"
)
hold_audio_data = load_hold_audio(hold_music_file, sample_rate)
num_samples = len(hold_audio_data) // 2
duration = int(num_samples / sample_rate)
logger.info(f"Starting hold music loop with file: {hold_music_file}")
while not stop_event.is_set():
# Queue the hold audio frame
frame = OutputAudioRawFrame(
audio=hold_audio_data,
sample_rate=sample_rate,
num_channels=1,
)
await self._engine.task.queue_frame(frame)
# Wait for the audio to play or until stopped
try:
await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5)
break # Stop event was set
except asyncio.TimeoutError:
pass # Continue looping
logger.info("Hold music loop stopped")
except Exception as e:
logger.error(f"Error in hold music loop: {e}")

View file

@ -82,6 +82,9 @@ class Node:
self.delayed_start_duration = data.delayed_start_duration
self.tool_uuids = data.tool_uuids
self.document_uuids = data.document_uuids
self.pre_call_fetch_enabled = data.pre_call_fetch_enabled
self.pre_call_fetch_url = data.pre_call_fetch_url
self.pre_call_fetch_credential_uuid = data.pre_call_fetch_credential_uuid
self.data = data