fix: fix circuit breaker failure recording

fix: fix circuit breaker failure recording
chore: provide advanced configuration option in UI for campaigns
This commit is contained in:
Abhishek Kumar 2026-03-05 13:43:13 +05:30
parent 628132f29b
commit 3ea235a666
17 changed files with 448 additions and 58 deletions

View file

@ -22,6 +22,7 @@ class CampaignClient(BaseDBClient):
retry_config: Optional[dict] = None,
max_concurrency: Optional[int] = None,
schedule_config: Optional[dict] = None,
circuit_breaker: Optional[dict] = None,
) -> CampaignModel:
"""Create a new campaign"""
async with self.async_session() as session:
@ -31,6 +32,8 @@ class CampaignClient(BaseDBClient):
orchestrator_metadata["max_concurrency"] = max_concurrency
if schedule_config is not None:
orchestrator_metadata["schedule_config"] = schedule_config
if circuit_breaker is not None:
orchestrator_metadata["circuit_breaker"] = circuit_breaker
campaign = CampaignModel(
name=name,
@ -68,6 +71,21 @@ class CampaignClient(BaseDBClient):
result = await session.execute(query)
return list(result.scalars().all())
async def get_latest_campaign(
self,
organization_id: int,
) -> Optional[CampaignModel]:
"""Get the most recently created campaign for an organization"""
async with self.async_session() as session:
query = (
select(CampaignModel)
.where(CampaignModel.organization_id == organization_id)
.order_by(CampaignModel.created_at.desc())
.limit(1)
)
result = await session.execute(query)
return result.scalars().first()
async def get_campaign(
self,
campaign_id: int,

View file

@ -6,7 +6,10 @@ from zoneinfo import ZoneInfo
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field, field_validator, model_validator
from api.constants import DEFAULT_CAMPAIGN_RETRY_CONFIG, DEFAULT_ORG_CONCURRENCY_LIMIT
from api.constants import (
DEFAULT_CAMPAIGN_RETRY_CONFIG,
DEFAULT_ORG_CONCURRENCY_LIMIT,
)
from api.db import db_client
from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
@ -126,6 +129,20 @@ class ScheduleConfigResponse(BaseModel):
slots: List[TimeSlotResponse]
class CircuitBreakerConfigRequest(BaseModel):
enabled: bool = True
failure_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
window_seconds: int = Field(default=120, ge=30, le=600)
min_calls_in_window: int = Field(default=5, ge=1, le=100)
class CircuitBreakerConfigResponse(BaseModel):
enabled: bool
failure_threshold: float
window_seconds: int
min_calls_in_window: int
class CreateCampaignRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
workflow_id: int
@ -134,6 +151,7 @@ class CreateCampaignRequest(BaseModel):
retry_config: Optional[RetryConfigRequest] = None
max_concurrency: Optional[int] = Field(default=None, ge=1, le=100)
schedule_config: Optional[ScheduleConfigRequest] = None
circuit_breaker: Optional[CircuitBreakerConfigRequest] = None
class UpdateCampaignRequest(BaseModel):
@ -141,6 +159,7 @@ class UpdateCampaignRequest(BaseModel):
retry_config: Optional[RetryConfigRequest] = None
max_concurrency: Optional[int] = Field(default=None, ge=1, le=100)
schedule_config: Optional[ScheduleConfigRequest] = None
circuit_breaker: Optional[CircuitBreakerConfigRequest] = None
class CampaignResponse(BaseModel):
@ -160,6 +179,7 @@ class CampaignResponse(BaseModel):
retry_config: RetryConfigResponse
max_concurrency: Optional[int] = None
schedule_config: Optional[ScheduleConfigResponse] = None
circuit_breaker: Optional[CircuitBreakerConfigResponse] = None
class CampaignsResponse(BaseModel):
@ -209,9 +229,10 @@ def _build_campaign_response(campaign, workflow_name: str) -> CampaignResponse:
else DEFAULT_CAMPAIGN_RETRY_CONFIG
)
# Get max_concurrency and schedule_config from orchestrator_metadata
# Get max_concurrency, schedule_config, circuit_breaker from orchestrator_metadata
max_concurrency = None
schedule_config = None
circuit_breaker_config = None
if campaign.orchestrator_metadata:
max_concurrency = campaign.orchestrator_metadata.get("max_concurrency")
sc = campaign.orchestrator_metadata.get("schedule_config")
@ -221,6 +242,9 @@ def _build_campaign_response(campaign, workflow_name: str) -> CampaignResponse:
timezone=sc.get("timezone", "UTC"),
slots=[TimeSlotResponse(**slot) for slot in sc.get("slots", [])],
)
cb = campaign.orchestrator_metadata.get("circuit_breaker")
if cb:
circuit_breaker_config = CircuitBreakerConfigResponse(**cb)
return CampaignResponse(
id=campaign.id,
@ -239,6 +263,7 @@ def _build_campaign_response(campaign, workflow_name: str) -> CampaignResponse:
retry_config=RetryConfigResponse(**retry_config),
max_concurrency=max_concurrency,
schedule_config=schedule_config,
circuit_breaker=circuit_breaker_config,
)
@ -276,6 +301,11 @@ async def create_campaign(
if request.schedule_config:
schedule_config = request.schedule_config.model_dump()
# Build circuit_breaker dict if provided
circuit_breaker_config = None
if request.circuit_breaker:
circuit_breaker_config = request.circuit_breaker.model_dump()
campaign = await db_client.create_campaign(
name=request.name,
workflow_id=request.workflow_id,
@ -286,6 +316,7 @@ async def create_campaign(
retry_config=retry_config,
max_concurrency=request.max_concurrency,
schedule_config=schedule_config,
circuit_breaker=circuit_breaker_config,
)
return _build_campaign_response(campaign, workflow_name)
@ -436,6 +467,10 @@ async def update_campaign(
metadata["schedule_config"] = request.schedule_config.model_dump()
metadata_changed = True
if request.circuit_breaker is not None:
metadata["circuit_breaker"] = request.circuit_breaker.model_dump()
metadata_changed = True
if metadata_changed:
update_kwargs["orchestrator_metadata"] = metadata

View file

@ -1,4 +1,4 @@
from typing import Union
from typing import List, Optional, Union
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
@ -257,14 +257,41 @@ class RetryConfigResponse(BaseModel):
retry_on_voicemail: bool
class CampaignLimitsResponse(BaseModel):
class TimeSlotResponse(BaseModel):
day_of_week: int
start_time: str
end_time: str
class ScheduleConfigResponse(BaseModel):
enabled: bool
timezone: str
slots: List[TimeSlotResponse]
class CircuitBreakerConfigResponse(BaseModel):
enabled: bool
failure_threshold: float
window_seconds: int
min_calls_in_window: int
class LastCampaignSettingsResponse(BaseModel):
retry_config: Optional[RetryConfigResponse] = None
max_concurrency: Optional[int] = None
schedule_config: Optional[ScheduleConfigResponse] = None
circuit_breaker: Optional[CircuitBreakerConfigResponse] = None
class CampaignDefaultsResponse(BaseModel):
concurrent_call_limit: int
from_numbers_count: int
default_retry_config: RetryConfigResponse
last_campaign_settings: Optional[LastCampaignSettingsResponse] = None
@router.get("/campaign-limits", response_model=CampaignLimitsResponse)
async def get_campaign_limits(user: UserModel = Depends(get_user)):
@router.get("/campaign-defaults", response_model=CampaignDefaultsResponse)
async def get_campaign_defaults(user: UserModel = Depends(get_user)):
"""Get campaign limits for the user's organization.
Returns the organization's concurrent call limit and default retry configuration.
@ -299,8 +326,47 @@ async def get_campaign_limits(user: UserModel = Depends(get_user)):
except Exception:
pass
return CampaignLimitsResponse(
# Get last campaign settings for pre-population
last_campaign_settings = None
try:
last_campaign = await db_client.get_latest_campaign(
user.selected_organization_id
)
if last_campaign:
retry = None
if last_campaign.retry_config:
retry = RetryConfigResponse(**last_campaign.retry_config)
max_conc = None
sched = None
cb = None
if last_campaign.orchestrator_metadata:
max_conc = last_campaign.orchestrator_metadata.get("max_concurrency")
sc = last_campaign.orchestrator_metadata.get("schedule_config")
if sc:
sched = ScheduleConfigResponse(
enabled=sc.get("enabled", False),
timezone=sc.get("timezone", "UTC"),
slots=[
TimeSlotResponse(**slot) for slot in sc.get("slots", [])
],
)
cb_data = last_campaign.orchestrator_metadata.get("circuit_breaker")
if cb_data:
cb = CircuitBreakerConfigResponse(**cb_data)
last_campaign_settings = LastCampaignSettingsResponse(
retry_config=retry,
max_concurrency=max_conc,
schedule_config=sched,
circuit_breaker=cb,
)
except Exception:
pass
return CampaignDefaultsResponse(
concurrent_call_limit=concurrent_limit,
from_numbers_count=from_numbers_count,
default_retry_config=RetryConfigResponse(**DEFAULT_CAMPAIGN_RETRY_CONFIG),
last_campaign_settings=last_campaign_settings,
)

View file

@ -783,7 +783,8 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq
if workflow_run.campaign_id:
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
await circuit_breaker.record_and_evaluate(
workflow_run.campaign_id, is_failure=True
workflow_run.campaign_id,
is_failure=status.status == "error",
)
# Check if retry is needed for campaign calls (busy/no-answer)
@ -1209,6 +1210,7 @@ async def handle_cloudonix_status_callback(
return {"status": "success"}
@router.post("/cloudonix/amd-callback/{workflow_run_id}")
async def handle_cloudonix_amd_callback(
workflow_run_id: int,

View file

@ -9,6 +9,7 @@ from api.constants import DEFAULT_ORG_CONCURRENCY_LIMIT
from api.db import db_client
from api.db.models import QueuedRunModel, WorkflowRunModel
from api.enums import OrganizationConfigurationKey, WorkflowRunState
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.campaign.errors import (
ConcurrentSlotAcquisitionError,
PhoneNumberPoolExhaustedError,
@ -315,6 +316,9 @@ class CampaignCallDispatcher:
},
)
# Record call initiation failure in circuit breaker
await circuit_breaker.record_and_evaluate(campaign.id, is_failure=True)
# Release concurrent slot on failure
mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run.id)
if mapping:

View file

@ -14,7 +14,6 @@ setup_logging()
import asyncio
import json
import signal
import time
from typing import Dict, Optional, Set
from urllib.parse import urlparse
@ -628,7 +627,7 @@ class ARIConnection:
bridge_id = ctx.get("bridge_id")
transfer_state = ctx.get("transfer_state")
# Check if this is a call transfer scenario external channel. Skip full teardown if
# Check if this is a call transfer scenario external channel. Skip full teardown if
# transfer is in progress and this is the external media channel
# During call transfer, we preserve the caller-destination bridge
if (

View file

@ -45,13 +45,16 @@ class ARIBridgeSwapStrategy(TransferStrategy):
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
auth = BasicAuth(app_name, app_password)
# Get call transfer manager instance
call_transfer_manager = await get_call_transfer_manager()
# 1. Find active transfer context for this caller channel
transfer_context = await call_transfer_manager.find_transfer_context_for_call(channel_id)
transfer_context = (
await call_transfer_manager.find_transfer_context_for_call(channel_id)
)
if not transfer_context:
logger.error(
f"[ARI Transfer] No active transfer context found for caller {channel_id}"
@ -178,6 +181,7 @@ class ARIBridgeSwapStrategy(TransferStrategy):
logger.exception(f"Failed to execute ARI transfer: {e}")
return False
class ARIHangupStrategy(HangupStrategy):
"""Implements hangup for Asterisk ARI channels."""
@ -223,4 +227,4 @@ class ARIHangupStrategy(HangupStrategy):
except Exception as e:
logger.exception(f"Failed to hang up Asterisk channel: {e}")
return False
return False

View file

@ -455,7 +455,9 @@ class ARIProvider(TelephonyProvider):
}
except Exception as e:
logger.error(f"[ARI Transfer] Failed to originate call transfer destination channel: {e}")
logger.error(
f"[ARI Transfer] Failed to originate call transfer destination channel: {e}"
)
await call_transfer_manager.remove_transfer_context(transfer_id)
raise

View file

@ -107,9 +107,10 @@ class CloudonixProvider(TelephonyProvider):
}
data["machineDetection"] = "DetectMessageEnd"
data["asyncAmd"] = True
data["asyncAmdStatusCallback"] = f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}"
data["asyncAmdStatusCallbackMethod"]= "POST"
data["asyncAmdStatusCallback"] = (
f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}"
)
data["asyncAmdStatusCallbackMethod"] = "POST"
# TODO: Cloudonix status callbacks are spammy, so commenting it out. Can send it to
# some persistent logging system instead of transcational database.

View file

@ -76,20 +76,26 @@ class TwilioConferenceStrategy(TransferStrategy):
)
# 3. Clean up transfer context after successful transfer
await self._cleanup_transfer_context(transfer_context.transfer_id)
await self._cleanup_transfer_context(
transfer_context.transfer_id
)
return True
elif response.status == 404:
logger.error(
f"Failed to transfer Twilio call {call_sid}: Call not found (404)"
)
await self._cleanup_transfer_context(transfer_context.transfer_id)
await self._cleanup_transfer_context(
transfer_context.transfer_id
)
return False
else:
logger.error(
f"Failed to transfer Twilio call {call_sid} to conference {conference_name}: "
f"Status {response.status}, Response: {response_text}"
)
await self._cleanup_transfer_context(transfer_context.transfer_id)
await self._cleanup_transfer_context(
transfer_context.transfer_id
)
return False
except Exception as e:
@ -132,7 +138,7 @@ class TwilioConferenceStrategy(TransferStrategy):
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
call_transfer_manager = await get_call_transfer_manager()
await call_transfer_manager.remove_transfer_context(transfer_id)
except Exception as e:
@ -183,4 +189,4 @@ class TwilioHangupStrategy(HangupStrategy):
except Exception as e:
logger.exception(f"Failed to hang up Twilio call: {e}")
return False
return False

View file

@ -69,6 +69,16 @@ async def process_knowledge_base_document(
file_size = os.path.getsize(temp_file_path)
logger.info(f"Downloaded file size: {file_size} bytes")
# Validate file size (max 5MB)
max_file_size = 5 * 1024 * 1024
if file_size > max_file_size:
error_message = f"File size ({file_size / (1024 * 1024):.1f}MB) exceeds the maximum allowed size of 5MB."
logger.warning(f"Document {document_id}: {error_message}")
await db_client.update_document_status(
document_id, "failed", error_message=error_message
)
return
# Compute file hash and get mime type
file_hash = db_client.compute_file_hash(temp_file_path)
mime_type = db_client.get_mime_type(temp_file_path)