Merge branch 'main' into feat/call-tags

This commit is contained in:
Abhishek Kumar 2026-02-16 13:07:23 +05:30
commit ea0967fd9c
41 changed files with 480 additions and 317 deletions

View file

@ -7,7 +7,7 @@ import loguru
from api.constants import SERIALIZE_LOG_OUTPUT
from api.enums import Environment
from api.utils.worker import get_worker_id, is_worker_process
from pipecat.utils.context import run_id_var, turn_var
from pipecat.utils.run_context import run_id_var, turn_var
ENVIRONMENT = os.getenv("ENVIRONMENT", Environment.LOCAL.value)
ENABLE_TURN_LOGGING = os.getenv("ENABLE_TURN_LOGGING", "false").lower() == "true"

View file

@ -1,5 +1,5 @@
[project]
name = "dograh-api"
version = "1.12.0"
version = "1.13.0"
description = "Backend API for Dograh voice AI platform"
requires-python = ">=3.12"

View file

@ -1,3 +1,7 @@
"""
Route for 3rd party integrations. Currently being backed by nango.
"""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict

View file

@ -1,12 +1,12 @@
import random
from loguru import logger
from pipecat.utils.context import set_current_run_id
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.pipecat.run_pipeline import run_pipeline_ari_stasis
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.utils.run_context import set_current_run_id
async def on_stasis_call(call: StasisRTPConnection, call_context_vars: dict):

View file

@ -37,7 +37,7 @@ from api.utils.telephony_helper import (
numbers_match,
parse_webhook_request,
)
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
router = APIRouter(prefix="/telephony")

View file

@ -22,7 +22,7 @@ from typing import Dict, List, Optional
from aiortc import RTCIceServer
from aiortc.sdp import candidate_from_sdp
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
from loguru import logger
from starlette.websockets import WebSocketState
@ -44,7 +44,7 @@ from api.services.pipecat.ws_sender_registry import (
)
from api.services.quota_service import check_dograh_quota
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
router = APIRouter(prefix="/ws")
@ -390,6 +390,11 @@ async def signaling_websocket(
user: UserModel = Depends(get_user_ws),
):
"""WebSocket endpoint for WebRTC signaling with ICE trickling."""
workflow_run = await db_client.get_workflow_run(workflow_run_id, user.id)
if not workflow_run:
logger.warning(f"workflow run {workflow_run_id} not found for user {user.id}")
raise HTTPException(status_code=400, detail="Bad workflow_run_id")
await signaling_manager.handle_websocket(
websocket, workflow_id, workflow_run_id, user
)

View file

@ -278,7 +278,48 @@ class DograhTTSService(BaseTTSConfiguration):
SARVAM_TTS_MODELS = ["bulbul:v2", "bulbul:v3"]
SARVAM_VOICES = ["anushka", "manisha", "vidya", "arya", "abhilash", "karun", "hitesh"]
SARVAM_V2_VOICES = ["anushka", "manisha", "vidya", "arya", "abhilash", "karun", "hitesh"]
SARVAM_V3_VOICES = [
"shubh",
"aditya",
"ritu",
"priya",
"neha",
"rahul",
"pooja",
"rohan",
"simran",
"kavya",
"amit",
"dev",
"ishita",
"shreya",
"ratan",
"varun",
"manan",
"sumit",
"roopa",
"kabir",
"aayan",
"ashutosh",
"advait",
"amelia",
"sophia",
"anand",
"tanya",
"tarun",
"sunny",
"mani",
"gokul",
"vijay",
"shruti",
"suhani",
"mohit",
"kavitha",
"rehan",
"soham",
"rupali",
]
SARVAM_LANGUAGES = [
"bn-IN",
"en-IN",
@ -301,7 +342,16 @@ class SarvamTTSConfiguration(BaseTTSConfiguration):
model: str = Field(
default="bulbul:v2", json_schema_extra={"examples": SARVAM_TTS_MODELS}
)
voice: str = Field(default="anushka", json_schema_extra={"examples": SARVAM_VOICES})
voice: str = Field(
default="anushka",
json_schema_extra={
"examples": SARVAM_V2_VOICES,
"model_options": {
"bulbul:v2": SARVAM_V2_VOICES,
"bulbul:v3": SARVAM_V3_VOICES,
},
},
)
language: str = Field(
default="hi-IN", json_schema_extra={"examples": SARVAM_LANGUAGES}
)
@ -322,39 +372,89 @@ TTSConfig = Annotated[
###################################################### STT ########################################################################
DEEPGRAM_STT_MODELS = ["nova-2", "nova-3-general", "flux-general-en"]
DEEPGRAM_STT_MODELS = ["nova-3-general", "flux-general-en"]
DEEPGRAM_LANGUAGES = [
"multi",
"ar",
"ar-AE",
"ar-SA",
"ar-QA",
"ar-KW",
"ar-SY",
"ar-LB",
"ar-PS",
"ar-JO",
"ar-EG",
"ar-SD",
"ar-TD",
"ar-MA",
"ar-DZ",
"ar-TN",
"ar-IQ",
"ar-IR",
"be",
"bn",
"bs",
"bg",
"ca",
"cs",
"da",
"da-DK",
"de",
"de-CH",
"el",
"en",
"en-US",
"en-GB",
"en-AU",
"en-GB",
"en-IN",
"en-NZ",
"es",
"es-419",
"et",
"fa",
"fi",
"fr",
"fr-CA",
"de",
"he",
"hi",
"hr",
"hu",
"id",
"it",
"ja",
"kn",
"ko",
"ko-KR",
"lt",
"lv",
"mk",
"mr",
"ms",
"nl",
"nl-BE",
"no",
"pl",
"pt",
"pt-BR",
"nl",
"hi",
"ja",
"ko",
"zh-CN",
"zh-TW",
"pt-PT",
"ro",
"ru",
"pl",
"sk",
"sl",
"sr",
"sv",
"sv-SE",
"ta",
"te",
"th",
"tl",
"tr",
"uk",
"ur",
"vi",
"sv",
"da",
"no",
"fi",
"id",
"th",
"zh-CN",
"zh-TW",
]
@ -365,7 +465,14 @@ class DeepgramSTTConfiguration(BaseSTTConfiguration):
default="nova-3-general", json_schema_extra={"examples": DEEPGRAM_STT_MODELS}
)
language: str = Field(
default="multi", json_schema_extra={"examples": DEEPGRAM_LANGUAGES}
default="multi",
json_schema_extra={
"examples": DEEPGRAM_LANGUAGES,
"model_options": {
"nova-3-general": DEEPGRAM_LANGUAGES,
"flux-general-en": ["en"],
},
},
)
api_key: str
@ -390,39 +497,7 @@ class OpenAISTTConfiguration(BaseSTTConfiguration):
# Dograh STT Service
DOGRAH_STT_MODELS = ["default"]
DOGRAH_STT_LANGUAGES = [
"multi",
"en",
"en-US",
"en-GB",
"en-AU",
"en-IN",
"es",
"es-419",
"fr",
"fr-CA",
"de",
"it",
"pt",
"pt-BR",
"nl",
"hi",
"ja",
"ko",
"zh-CN",
"zh-TW",
"ru",
"pl",
"tr",
"uk",
"vi",
"sv",
"da",
"no",
"fi",
"id",
"th",
]
DOGRAH_STT_LANGUAGES = DEEPGRAM_LANGUAGES
@register_stt

View file

@ -14,7 +14,7 @@ from api.services.looptalk.internal_transport import (
)
from api.services.pipecat.transport_setup import create_internal_transport
from pipecat.pipeline.task import PipelineTask
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
from .core.pipeline_builder import LoopTalkPipelineBuilder
from .core.recording_manager import RecordingManager

View file

@ -10,7 +10,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.utils.context import turn_var
from pipecat.utils.run_context import turn_var
def create_pipeline_components(audio_config: AudioConfig):

View file

@ -62,6 +62,7 @@ from pipecat.turns.user_mute import (
MuteUntilFirstBotCompleteUserMuteStrategy,
)
from pipecat.turns.user_start import (
ExternalUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import (
@ -69,12 +70,12 @@ from pipecat.turns.user_start.vad_user_turn_start_strategy import (
)
from pipecat.turns.user_stop import (
ExternalUserTurnStopStrategy,
TranscriptionUserTurnStopStrategy,
SpeechTimeoutUserTurnStopStrategy,
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.context import set_current_run_id
from pipecat.utils.enums import EndTaskReason
from pipecat.utils.run_context import set_current_run_id
from pipecat.utils.tracing.context_registry import ContextProviderRegistry
# Setup tracing if enabled
@ -265,7 +266,6 @@ async def run_pipeline_vobiz(
async def run_pipeline_cloudonix(
websocket_client: WebSocket,
stream_sid: str,
call_sid: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
@ -274,10 +274,15 @@ async def run_pipeline_cloudonix(
logger.debug(
f"Running pipeline for Cloudonix connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}"
)
set_current_run_id(workflow_run_id)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
call_id = workflow_run.gathered_context.get("call_id")
if not call_id:
logger.warning("call_id not found in gathered_context")
raise Exception()
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_sid}
cost_info = {"call_id": call_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract all pipeline configurations
@ -292,26 +297,18 @@ async def run_pipeline_cloudonix(
"ambient_noise_configuration"
]
# Retrieve session_token from workflow_run gathered_context
workflow_run = await db_client.get_workflow_run(workflow_run_id)
session_token = None
if workflow_run and workflow_run.gathered_context:
session_token = workflow_run.gathered_context.get("session_token")
logger.debug(f"Retrieved session_token from workflow_run: {session_token}")
# Create audio configuration for Cloudonix
audio_config = create_audio_config(WorkflowRunMode.CLOUDONIX.value)
transport = await create_cloudonix_transport(
websocket_client,
call_id,
stream_sid,
call_sid,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
session_token,
)
await _run_pipeline(
transport,
@ -580,7 +577,10 @@ async def _run_pipeline(
if is_deepgram_flux:
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
start=[
VADUserTurnStartStrategy(),
ExternalUserTurnStartStrategy(enable_interruptions=True),
],
stop=[ExternalUserTurnStopStrategy()],
)
elif turn_stop_strategy == "turn_analyzer":
@ -598,7 +598,7 @@ async def _run_pipeline(
# Transcription-based (default): best for short 1-2 word responses
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy()],
)
# Create user mute strategies

View file

@ -30,7 +30,9 @@ if TYPE_CHECKING:
from api.services.pipecat.audio_config import AudioConfig
def create_stt_service(user_config, audio_config: "AudioConfig", keyterms: list[str] | None = None):
def create_stt_service(
user_config, audio_config: "AudioConfig", keyterms: list[str] | None = None
):
"""Create and return appropriate STT service based on user configuration
Args:
@ -53,7 +55,7 @@ def create_stt_service(user_config, audio_config: "AudioConfig", keyterms: list[
keyterm=keyterms or [],
),
should_interrupt=False, # Let UserAggregator take care of sending InterruptionFrame
sample_rate=audio_config.transport_in_sample_rate
sample_rate=audio_config.transport_in_sample_rate,
)
# Other models than flux
@ -64,21 +66,24 @@ def create_stt_service(user_config, audio_config: "AudioConfig", keyterms: list[
profanity_filter=False,
endpointing=100,
model=user_config.stt.model,
keyterm=keyterms or []
keyterm=keyterms or [],
)
logger.debug(f"Using DeepGram Model - {user_config.stt.model}")
return DeepgramSTTService(
live_options=live_options,
api_key=user_config.stt.api_key,
should_interrupt=False, # Let UserAggregator take care of sending InterruptionFrame
sample_rate=audio_config.transport_in_sample_rate
sample_rate=audio_config.transport_in_sample_rate,
)
elif user_config.stt.provider == ServiceProviders.OPENAI.value:
return OpenAISTTService(
api_key=user_config.stt.api_key, model=user_config.stt.model
)
elif user_config.stt.provider == ServiceProviders.CARTESIA.value:
return CartesiaSTTService(api_key=user_config.stt.api_key, sample_rate=audio_config.transport_in_sample_rate)
return CartesiaSTTService(
api_key=user_config.stt.api_key,
sample_rate=audio_config.transport_in_sample_rate,
)
elif user_config.stt.provider == ServiceProviders.DOGRAH.value:
base_url = MPS_API_URL.replace("http://", "ws://").replace("https://", "wss://")
language = getattr(user_config.stt, "language", None) or "multi"
@ -88,7 +93,7 @@ def create_stt_service(user_config, audio_config: "AudioConfig", keyterms: list[
model=user_config.stt.model,
language=language,
keyterms=keyterms,
sample_rate=audio_config.transport_in_sample_rate
sample_rate=audio_config.transport_in_sample_rate,
)
elif user_config.stt.provider == ServiceProviders.SARVAM.value:
# Map Sarvam language code to pipecat Language enum
@ -112,7 +117,7 @@ def create_stt_service(user_config, audio_config: "AudioConfig", keyterms: list[
api_key=user_config.stt.api_key,
model=user_config.stt.model,
params=SarvamSTTService.InputParams(language=pipecat_language),
sample_rate=audio_config.transport_in_sample_rate
sample_rate=audio_config.transport_in_sample_rate,
)
elif user_config.stt.provider == ServiceProviders.SPEECHMATICS.value:
from pipecat.services.speechmatics.stt import (
@ -138,7 +143,7 @@ def create_stt_service(user_config, audio_config: "AudioConfig", keyterms: list[
operating_point=operating_point,
additional_vocab=additional_vocab,
),
sample_rate=audio_config.transport_in_sample_rate
sample_rate=audio_config.transport_in_sample_rate,
)
else:
raise HTTPException(

View file

@ -94,14 +94,13 @@ async def create_twilio_transport(
async def create_cloudonix_transport(
websocket_client: WebSocket,
call_id: str,
stream_sid: str,
call_sid: str,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
session_token: str | None = None,
):
"""Create a transport for Cloudonix connections"""
@ -125,11 +124,10 @@ async def create_cloudonix_transport(
from pipecat.serializers.cloudonix import CloudonixFrameSerializer
serializer = CloudonixFrameSerializer(
call_id=call_id,
stream_sid=stream_sid,
call_sid=call_sid,
domain_id=domain_id,
bearer_token=bearer_token,
session_token=session_token,
)
return FastAPIWebsocketTransport(

View file

@ -8,7 +8,7 @@ propagate through asyncio.create_task() calls.
import asyncio
from typing import Dict, Optional
from pipecat.utils.context import turn_var
from pipecat.utils.run_context import turn_var
class TurnContextManager:

View file

@ -395,10 +395,6 @@ class CloudonixProvider(TelephonyProvider):
await websocket.close(code=4400, reason="Expected connected event")
return
logger.debug(
f"Cloudonix WebSocket connected for workflow_run {workflow_run_id}"
)
# Wait for "start" event with stream details
start_msg = await websocket.receive_text()
logger.debug(f"Received start message: {start_msg}")
@ -418,9 +414,14 @@ class CloudonixProvider(TelephonyProvider):
await websocket.close(code=4400, reason="Missing stream identifiers")
return
logger.debug(
f"Cloudonix WebSocket connected for workflow_run {workflow_run_id} "
f"stream_sid: {stream_sid} call_sid: {call_sid}"
)
# Run the Cloudonix pipeline
await run_pipeline_cloudonix(
websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
websocket, stream_sid, workflow_id, workflow_run_id, user_id
)
except Exception as e:

View file

@ -110,7 +110,7 @@ class TwilioProvider(TelephonyProvider):
return CallInitiationResult(
call_id=response_data["sid"],
status=response_data.get("status", "queued"),
provider_metadata={}, # Twilio doesn't need to persist extra data
provider_metadata={"call_id": response_data["sid"]},
raw_response=response_data,
)

View file

@ -150,7 +150,7 @@ class VobizProvider(TelephonyProvider):
return CallInitiationResult(
call_id=call_id,
status="queued", # Vobiz returns "message": "call fired"
provider_metadata={},
provider_metadata={"call_id": call_id},
raw_response=response_data,
)

View file

@ -138,10 +138,8 @@ class VonageProvider(TelephonyProvider):
call_id=response_data["uuid"],
status=response_data.get("status", "started"),
provider_metadata={
"call_uuid": response_data[
"uuid"
] # Vonage needs UUID persisted for WebSocket
},
"call_uuid": response_data["uuid"]
}, # Vonage needs UUID persisted for WebSocket
raw_response=response_data,
)

View file

@ -23,7 +23,7 @@ from api.services.telephony.stasis_event_protocol import (
parse_event,
)
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
class WorkerEventSubscriber:

View file

@ -116,6 +116,10 @@ def create_aggregation_correction_callback(engine: "PipecatEngine"):
if corrupted in ref or len(alnum_ref) < len(alnum_corr) or len(alnum_corr) < 10:
return corrupted
logger.debug(
f"In correct_corrupted_aggregation: ref: {ref} corrupted: {corrupted}"
)
# 2) Find where in `ref` we should start aligning.
# We take the first N (N=10) characters of `corrupted`
# and look for all their occurrences in `ref`.

View file

@ -10,7 +10,7 @@ from api.db import db_client
from api.db.models import WorkflowRunModel
from api.utils.credential_auth import build_auth_header
from api.utils.template_renderer import render_template
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):

View file

@ -6,7 +6,7 @@ from loguru import logger
from api.db import db_client
from api.services.storage import get_current_storage_backend, storage_fs
from api.tasks.run_integrations import run_integrations_post_workflow_run
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
async def upload_voicemail_audio_to_s3(

View file

@ -4,7 +4,62 @@ from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.pricing.cost_calculator import cost_calculator
from api.services.telephony.factory import get_telephony_provider
from pipecat.utils.context import set_current_run_id
from pipecat.utils.run_context import set_current_run_id
async def _fetch_telephony_cost(workflow_run) -> dict | None:
"""Fetch telephony call cost. Returns a dict with cost_usd and provider_name, or None."""
if (
workflow_run.mode
not in [WorkflowRunMode.TWILIO.value, WorkflowRunMode.VONAGE.value]
or not workflow_run.cost_info
):
return None
call_id = workflow_run.cost_info.get("call_id")
if not call_id:
logger.warning(f"call_id not found in cost_info")
return None
provider_name = workflow_run.mode.lower() if workflow_run.mode else ""
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning("Workflow not found for workflow run")
raise Exception("Workflow not found")
provider = await get_telephony_provider(workflow.organization_id)
call_cost_info = await provider.get_call_cost(call_id)
if call_cost_info.get("status") == "error":
logger.error(
f"Failed to fetch {provider_name} call cost: {call_cost_info.get('error')}"
)
return None
cost_usd = call_cost_info.get("cost_usd", 0.0)
logger.info(
f"{provider_name.title()} call cost: ${cost_usd:.6f} USD for call {call_id}"
)
return {"cost_usd": cost_usd, "provider_name": provider_name}
async def _update_organization_usage(
org, dograh_tokens: float, duration_seconds: float, charge_usd: float | None
) -> None:
"""Update organization usage after a workflow run."""
org_id = org.id
await db_client.update_usage_after_run(
org_id, dograh_tokens, duration_seconds, charge_usd
)
if charge_usd is not None:
logger.info(
f"Updated organization usage with ${charge_usd:.2f} USD ({dograh_tokens} Dograh Tokens) and {duration_seconds}s duration for org {org_id}"
)
else:
logger.info(
f"Updated organization usage with {dograh_tokens} Dograh Tokens and {duration_seconds}s duration for org {org_id}"
)
async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
@ -26,62 +81,20 @@ async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
# Calculate cost breakdown
cost_breakdown = cost_calculator.calculate_total_cost(workflow_usage_info)
# Fetch telephony call cost for both Twilio and Vonage
telephony_cost_usd = 0.0
if (
workflow_run.mode
in [WorkflowRunMode.TWILIO.value, WorkflowRunMode.VONAGE.value]
and workflow_run.cost_info
):
# Get the call ID (provider-agnostic approach with backward compatibility)
call_id = workflow_run.cost_info.get("call_id")
# Fallback to legacy provider-specific fields if needed
if not call_id:
if workflow_run.mode == WorkflowRunMode.TWILIO.value:
call_id = workflow_run.cost_info.get("twilio_call_sid")
elif workflow_run.mode == WorkflowRunMode.VONAGE.value:
call_id = workflow_run.cost_info.get("vonage_call_uuid")
# Provider name is derived from workflow run mode
provider_name = workflow_run.mode.lower() if workflow_run.mode else ""
if call_id:
try:
# Get workflow to access organization_id
workflow = await db_client.get_workflow_by_id(
workflow_run.workflow_id
)
if not workflow:
logger.warning("Workflow not found for workflow run")
raise Exception("Workflow not found")
# Use telephony provider abstraction
provider = await get_telephony_provider(workflow.organization_id)
call_cost_info = await provider.get_call_cost(call_id)
if call_cost_info.get("status") != "error":
telephony_cost_usd = call_cost_info.get("cost_usd", 0.0)
cost_breakdown["telephony_call"] = telephony_cost_usd
cost_breakdown[f"{provider_name}_call"] = (
telephony_cost_usd # Keep backward compatibility
)
# Add telephony cost to the total
cost_breakdown["total"] = (
float(cost_breakdown["total"]) + telephony_cost_usd
)
logger.info(
f"{provider_name.title()} call cost: ${telephony_cost_usd:.6f} USD for call {call_id}"
)
else:
logger.error(
f"Failed to fetch {provider_name} call cost: {call_cost_info.get('error')}"
)
except Exception as e:
logger.error(f"Failed to fetch telephony call cost: {e}")
# Don't fail the whole cost calculation if telephony API fails
# Fetch telephony call cost
try:
telephony_cost = await _fetch_telephony_cost(workflow_run)
if telephony_cost:
telephony_cost_usd = telephony_cost["cost_usd"]
provider_name = telephony_cost["provider_name"]
cost_breakdown["telephony_call"] = telephony_cost_usd
cost_breakdown[f"{provider_name}_call"] = telephony_cost_usd
cost_breakdown["total"] = (
float(cost_breakdown["total"]) + telephony_cost_usd
)
except Exception as e:
logger.error(f"Failed to fetch telephony call cost: {e}")
# Don't fail the whole cost calculation if telephony API fails
# Store cost information back to the workflow run
# We'll add the cost breakdown to the workflow run
@ -106,6 +119,7 @@ async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
charge_usd = duration_seconds * org.price_per_second_usd
cost_info = {
**workflow_run.cost_info,
"cost_breakdown": cost_breakdown,
"total_cost_usd": float(cost_breakdown["total"]),
"dograh_token_usage": dograh_tokens,
@ -118,42 +132,19 @@ async def calculate_workflow_run_cost(ctx, workflow_run_id: int):
cost_info["charge_usd"] = charge_usd
cost_info["price_per_second_usd"] = org.price_per_second_usd
# Preserve call ID (provider-agnostic with backward compatibility)
if workflow_run.cost_info:
# Preserve generic call_id if it exists
if "call_id" in workflow_run.cost_info:
cost_info["call_id"] = workflow_run.cost_info["call_id"]
# Also preserve legacy fields for backward compatibility
elif "twilio_call_sid" in workflow_run.cost_info:
cost_info["twilio_call_sid"] = workflow_run.cost_info["twilio_call_sid"]
elif "vonage_call_uuid" in workflow_run.cost_info:
cost_info["vonage_call_uuid"] = workflow_run.cost_info[
"vonage_call_uuid"
]
# Update workflow run with cost information
await db_client.update_workflow_run(run_id=workflow_run_id, cost_info=cost_info)
# Update organization usage if applicable
if org:
org_id = org.id
try:
duration_seconds = workflow_usage_info.get("call_duration_seconds", 0)
# Pass USD amount if organization has pricing
await db_client.update_usage_after_run(
org_id, dograh_tokens, duration_seconds, charge_usd
await _update_organization_usage(
org, dograh_tokens, duration_seconds, charge_usd
)
if charge_usd is not None:
logger.info(
f"Updated organization usage with ${charge_usd:.2f} USD ({dograh_tokens} Dograh Tokens) and {duration_seconds}s duration for org {org_id}"
)
else:
logger.info(
f"Updated organization usage with {dograh_tokens} Dograh Tokens and {duration_seconds}s duration for org {org_id}"
)
except Exception as e:
logger.error(
f"Failed to update organization usage for org {org_id}: {e}"
f"Failed to update organization usage for org {org.id}: {e}"
)
# Don't fail the whole task if usage update fails

View file

@ -31,8 +31,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
)
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -46,8 +46,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)

View file

@ -30,8 +30,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)

View file

@ -18,8 +18,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
)
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -27,8 +27,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
)
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -43,8 +43,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)

View file

@ -29,8 +29,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)

View file

@ -34,8 +34,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregator,
LLMUserAggregatorParams,

View file

@ -21,8 +21,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)