chore: return formatted transcript url

- Return formatted transcript and recording URL
- Harden campaign dispatcher logic
This commit is contained in:
Abhishek Kumar 2026-05-26 13:24:12 +05:30
parent 0716582aa7
commit 7810923bca
30 changed files with 525 additions and 136 deletions

View file

@ -2,7 +2,7 @@ import json
from datetime import UTC, datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import func, text
from sqlalchemy import func, text, update
from sqlalchemy.future import select
from api.db.base_client import BaseDBClient
@ -466,6 +466,63 @@ class CampaignClient(BaseDBClient):
await session.rollback()
raise
async def increment_campaign_metadata_counter(
self, campaign_id: int, key: str
) -> int:
"""Atomically increment an integer field in campaign orchestrator_metadata."""
async with self.async_session() as session:
result = await session.execute(
text(
"UPDATE campaigns "
"SET orchestrator_metadata = ("
" COALESCE(orchestrator_metadata::jsonb, '{}'::jsonb) "
" || jsonb_build_object("
" :key, "
" COALESCE((orchestrator_metadata::jsonb ->> :key)::int, 0) + 1"
" )"
" )::json, "
" updated_at = :now "
"WHERE id = :campaign_id "
"RETURNING (orchestrator_metadata::jsonb ->> :key)::int"
),
{
"campaign_id": campaign_id,
"key": key,
"now": datetime.now(UTC),
},
)
attempt = result.scalar_one()
try:
await session.commit()
except Exception:
await session.rollback()
raise
return attempt
async def reset_campaign_metadata_counter(self, campaign_id: int, key: str) -> None:
"""Remove a counter field from campaign orchestrator_metadata."""
async with self.async_session() as session:
await session.execute(
text(
"UPDATE campaigns "
"SET orchestrator_metadata = ("
" COALESCE(orchestrator_metadata::jsonb, '{}'::jsonb) - :key"
" )::json, "
" updated_at = :now "
"WHERE id = :campaign_id"
),
{
"campaign_id": campaign_id,
"key": key,
"now": datetime.now(UTC),
},
)
try:
await session.commit()
except Exception:
await session.rollback()
raise
# QueuedRun methods
async def bulk_create_queued_runs(self, queued_runs_data: list[dict]) -> None:
"""Bulk create queued runs"""
@ -501,6 +558,35 @@ class CampaignClient(BaseDBClient):
await session.refresh(queued_run)
return queued_run
async def return_processing_queued_runs_without_workflow(
self, queued_run_ids: list[int]
) -> int:
"""Return claimed queued_runs to queued if no workflow was created for them."""
if not queued_run_ids:
return 0
workflow_exists = (
select(WorkflowRunModel.id)
.where(WorkflowRunModel.queued_run_id == QueuedRunModel.id)
.exists()
)
async with self.async_session() as session:
result = await session.execute(
update(QueuedRunModel)
.where(
QueuedRunModel.id.in_(queued_run_ids),
QueuedRunModel.state == "processing",
~workflow_exists,
)
.values(state="queued")
)
try:
await session.commit()
except Exception:
await session.rollback()
raise
return result.rowcount or 0
async def count_queued_runs(
self, campaign_id: int, state: Optional[str] = None
) -> int:

View file

@ -350,6 +350,7 @@ class OrganizationUsageClient(BaseDBClient):
"call_duration_seconds": int(round(call_duration)),
"recording_url": run.recording_url,
"transcript_url": run.transcript_url,
"public_access_token": run.public_access_token,
"phone_number": phone_number,
"caller_number": caller_number,
"called_number": called_number,

View file

@ -13,6 +13,7 @@ from api.db.models import UserModel
from api.services.auth.depends import get_user
from api.services.mps_service_key_client import mps_service_key_client
from api.services.reports import generate_usage_runs_report_csv
from api.utils.artifacts import artifact_url
router = APIRouter(prefix="/organizations")
@ -49,6 +50,7 @@ class WorkflowRunUsageResponse(BaseModel):
call_duration_seconds: int
recording_url: Optional[str] = None
transcript_url: Optional[str] = None
public_access_token: Optional[str] = None
phone_number: Optional[str] = Field(
default=None,
deprecated=True,
@ -223,6 +225,15 @@ async def get_usage_history(
total_pages = (total_count + limit - 1) // limit
for run in runs:
public_access_token = run.get("public_access_token")
run["transcript_url"] = artifact_url(
public_access_token, "transcript", fallback=run.get("transcript_url")
)
run["recording_url"] = artifact_url(
public_access_token, "recording", fallback=run.get("recording_url")
)
return {
"runs": runs,
"total_dograh_tokens": total_tokens,

View file

@ -41,6 +41,7 @@ from api.services.workflow.trigger_paths import (
validate_trigger_paths,
)
from api.services.workflow.workflow_graph import WorkflowGraph
from api.utils.artifacts import artifact_url
router = APIRouter(prefix="/workflow")
@ -1113,14 +1114,24 @@ async def get_workflow_run(
)
if not run:
raise HTTPException(status_code=404, detail="Workflow run not found")
public_access_token = run.public_access_token
if (run.transcript_url or run.recording_url) and not public_access_token:
public_access_token = await db_client.ensure_public_access_token(run.id)
return {
"id": run.id,
"workflow_id": run.workflow_id,
"name": run.name,
"mode": run.mode,
"is_completed": run.is_completed,
"transcript_url": run.transcript_url,
"recording_url": run.recording_url,
"transcript_url": artifact_url(
public_access_token, "transcript", fallback=run.transcript_url
),
"recording_url": artifact_url(
public_access_token, "recording", fallback=run.recording_url
),
"public_access_token": public_access_token,
"cost_info": {
"dograh_token_usage": (
run.cost_info.get("dograh_token_usage")

View file

@ -15,6 +15,7 @@ class WorkflowRunResponseSchema(BaseModel):
is_completed: bool
transcript_url: str | None
recording_url: str | None
public_access_token: str | None = None
cost_info: Dict[str, Any] | None
definition_id: int | None # This is for backward compatibility
initial_context: dict | None = None

View file

@ -108,6 +108,7 @@ class CampaignCallDispatcher:
logger.warning(f"Failed to initialize from_number pool: {e}")
processed_count = 0
processed_run_ids: set[int] = set()
for i, queued_run in enumerate(queued_runs):
try:
# Apply rate limiting, i.e lets not initiate more than rate_limit_per_second
@ -133,28 +134,48 @@ class CampaignCallDispatcher:
)
processed_count += 1
processed_run_ids.add(queued_run.id)
# Update campaign processed count
await db_client.update_campaign(
campaign_id=campaign_id, processed_rows=campaign.processed_rows + 1
)
except (ConcurrentSlotAcquisitionError, PhoneNumberPoolExhaustedError):
# Revert all unprocessed runs (current and remaining) back to queued
# so they can be picked up again when campaign is resumed
for unprocessed_run in queued_runs[i:]:
try:
await db_client.update_queued_run(
queued_run_id=unprocessed_run.id,
state="queued",
)
logger.info(
f"Reverted queued run {unprocessed_run.id} back to queued state"
)
except Exception as revert_error:
logger.error(
f"Failed to revert queued run {unprocessed_run.id}: {revert_error}"
)
except asyncio.CancelledError:
logger.warning(
f"Campaign {campaign_id} batch cancelled; returning claimed "
"queued runs that were not dispatched"
)
await self._return_unprocessed_claims(
queued_runs, processed_run_ids, reason="task_cancelled"
)
raise
except PhoneNumberPoolExhaustedError as e:
logger.warning(
f"Phone number pool exhausted for campaign {campaign_id}; "
"returning claimed queued runs that were not dispatched: "
f"{e}"
)
await self._return_unprocessed_claims(
queued_runs,
processed_run_ids,
reason="phone_number_pool_exhausted",
)
# Re-raise to propagate to process_campaign_batch
raise
except ConcurrentSlotAcquisitionError as e:
logger.warning(
f"Concurrent slot acquisition failed for campaign {campaign_id}; "
"returning claimed queued runs that were not dispatched: "
f"{e}"
)
await self._return_unprocessed_claims(
queued_runs,
processed_run_ids,
reason="concurrent_slot_acquisition_failed",
)
# Re-raise to propagate to process_campaign_batch
raise
@ -178,6 +199,38 @@ class CampaignCallDispatcher:
return processed_count
async def _return_unprocessed_claims(
self,
queued_runs: list[QueuedRunModel],
processed_run_ids: set[int],
*,
reason: str,
) -> None:
queued_run_ids = [
queued_run.id
for queued_run in queued_runs
if queued_run.id not in processed_run_ids
]
if not queued_run_ids:
return
try:
returned_count = (
await db_client.return_processing_queued_runs_without_workflow(
queued_run_ids
)
)
logger.info(
f"Returned {returned_count}/{len(queued_run_ids)} claimed queued runs "
f"back to queued state; reason={reason}; "
f"queued_run_ids={queued_run_ids}"
)
except Exception as revert_error:
logger.error(
f"Failed to return claimed queued runs; reason={reason}; "
f"queued_run_ids={queued_run_ids}; error={revert_error}"
)
async def dispatch_call(
self, queued_run: QueuedRunModel, campaign: any, slot_id: str
) -> Optional[WorkflowRunModel]:

View file

@ -10,14 +10,8 @@ import io
from datetime import UTC, datetime
from typing import Any, List, Optional
from api.constants import BACKEND_API_ENDPOINT
from api.db import db_client
def _artifact_url(token: str | None, artifact: str) -> str:
if not token:
return ""
return f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow/{token}/{artifact}"
from api.utils.artifacts import artifact_url
def _collect_extracted_variable_keys(runs: List[Any]) -> list[str]:
@ -83,8 +77,8 @@ def build_run_report_csv(runs: List[Any]) -> io.StringIO:
post_values = [
call_tags,
_artifact_url(run.public_access_token, "transcript"),
_artifact_url(run.public_access_token, "recording"),
artifact_url(run.public_access_token, "transcript") or "",
artifact_url(run.public_access_token, "recording") or "",
]
writer.writerow(pre_values + extracted_values + post_values)

View file

@ -14,6 +14,9 @@ from api.services.campaign.errors import (
)
from api.services.campaign.source_sync_factory import get_sync_service
PHONE_NUMBER_POOL_EXHAUSTED_COUNTER_KEY = "phone_number_pool_exhausted_attempts"
MAX_PHONE_NUMBER_POOL_EXHAUSTED_ATTEMPTS = 3
async def sync_campaign_source(ctx: Dict, campaign_id: int) -> None:
"""
@ -118,6 +121,12 @@ async def process_campaign_batch(
campaign_id=campaign_id, batch_size=batch_size
)
if processed_count > 0:
await db_client.reset_campaign_metadata_counter(
campaign_id=campaign_id,
key=PHONE_NUMBER_POOL_EXHAUSTED_COUNTER_KEY,
)
# Publish batch completed event - orchestrator will handle next batch scheduling
publisher = await get_campaign_event_publisher()
await publisher.publish_batch_completed(
@ -157,9 +166,43 @@ async def process_campaign_batch(
raise
except PhoneNumberPoolExhaustedError as e:
logger.warning(f"Phone number pool exhausted for campaign {campaign_id}: {e}")
attempt = await db_client.increment_campaign_metadata_counter(
campaign_id=campaign_id,
key=PHONE_NUMBER_POOL_EXHAUSTED_COUNTER_KEY,
)
logger.warning(
f"Phone number pool exhausted for campaign {campaign_id}: {e}; "
f"attempt={attempt}/{MAX_PHONE_NUMBER_POOL_EXHAUSTED_ATTEMPTS}"
)
publisher = await get_campaign_event_publisher()
if attempt < MAX_PHONE_NUMBER_POOL_EXHAUSTED_ATTEMPTS:
await db_client.append_campaign_log(
campaign_id=campaign_id,
level="warning",
event="phone_number_pool_exhausted_retry",
message=(
f"Phone number pool exhausted for org {e.organization_id}: "
"no free from_number available to dispatch outbound calls; "
f"retry attempt {attempt}/"
f"{MAX_PHONE_NUMBER_POOL_EXHAUSTED_ATTEMPTS}"
),
details={
"error": str(e),
"organization_id": e.organization_id,
"attempt": attempt,
"max_attempts": MAX_PHONE_NUMBER_POOL_EXHAUSTED_ATTEMPTS,
},
)
await publisher.publish_batch_completed(
campaign_id=campaign_id,
processed_count=0,
failed_count=0,
batch_size=batch_size,
)
return
await publisher.publish_batch_failed(
campaign_id=campaign_id,
error=f"Phone number pool exhausted: {e}",
@ -172,12 +215,15 @@ async def process_campaign_batch(
level="error",
event="phone_number_pool_exhausted",
message=(
f"Phone number pool exhausted for org {e.organization_id}: "
"no free from_number available to dispatch outbound calls"
f"Phone number pool exhausted for org {e.organization_id} after "
f"{attempt} consecutive attempts: no free from_number available "
"to dispatch outbound calls"
),
details={
"error": str(e),
"organization_id": e.organization_id,
"attempt": attempt,
"max_attempts": MAX_PHONE_NUMBER_POOL_EXHAUSTED_ATTEMPTS,
},
)
raise

View file

@ -681,6 +681,54 @@ class TestProcessBatchConcurrency:
assert states.get("processing", 0) == 0
class TestProcessBatchCancellation:
"""Cancellation cleanup for claimed queued runs."""
@pytest.mark.asyncio
async def test_cancelled_batch_returns_claimed_runs_without_workflows(self):
dispatcher = CampaignCallDispatcher()
campaign = MagicMock()
campaign.id = 42
campaign.state = "running"
campaign.organization_id = 7
campaign.rate_limit_per_second = 1
campaign.telephony_configuration_id = 170
queued_runs = [MagicMock(id=101), MagicMock(id=102), MagicMock(id=103)]
provider = MagicMock()
provider.from_numbers = []
with (
patch(
"api.services.campaign.campaign_call_dispatcher.db_client"
) as mock_db,
patch.object(
dispatcher,
"get_provider_for_campaign",
AsyncMock(return_value=provider),
),
patch.object(
dispatcher,
"apply_rate_limit",
AsyncMock(side_effect=asyncio.CancelledError),
),
):
mock_db.get_campaign_by_id = AsyncMock(return_value=campaign)
mock_db.claim_queued_runs_for_processing = AsyncMock(
return_value=queued_runs
)
mock_db.return_processing_queued_runs_without_workflow = AsyncMock(
return_value=3
)
with pytest.raises(asyncio.CancelledError):
await dispatcher.process_batch(campaign_id=42, batch_size=3)
mock_db.return_processing_queued_runs_without_workflow.assert_awaited_once_with(
[101, 102, 103]
)
class TestProcessBatchEdgeCases:
"""Edge case tests for process_batch."""

View file

@ -23,10 +23,9 @@ class TestProcessCampaignBatchFailureLogs:
``batch_failed`` entry."""
@pytest.mark.asyncio
async def test_phone_number_pool_exhausted_logs_specific_event(self):
"""When PhoneNumberPoolExhaustedError propagates from process_batch,
the campaign log entry should use event='phone_number_pool_exhausted'
with a clear message not the generic 'batch_failed' bucket."""
async def test_phone_number_pool_exhausted_retries_before_final_failure(self):
"""The first two consecutive pool exhaustion attempts keep the
campaign running and schedule another batch."""
with (
patch("api.tasks.campaign_tasks.campaign_call_dispatcher") as mock_disp,
patch("api.tasks.campaign_tasks.db_client") as mock_db,
@ -37,6 +36,46 @@ class TestProcessCampaignBatchFailureLogs:
mock_disp.process_batch = AsyncMock(
side_effect=PhoneNumberPoolExhaustedError(organization_id=7)
)
mock_db.increment_campaign_metadata_counter = AsyncMock(return_value=2)
mock_db.update_campaign = AsyncMock()
mock_db.append_campaign_log = AsyncMock()
mock_pub = AsyncMock()
mock_get_pub.return_value = mock_pub
await process_campaign_batch({}, campaign_id=42)
mock_db.update_campaign.assert_not_awaited()
mock_pub.publish_batch_failed.assert_not_awaited()
mock_pub.publish_batch_completed.assert_awaited_once_with(
campaign_id=42,
processed_count=0,
failed_count=0,
batch_size=10,
)
mock_db.append_campaign_log.assert_called_once()
kwargs = mock_db.append_campaign_log.call_args.kwargs
assert kwargs["campaign_id"] == 42
assert kwargs["event"] == "phone_number_pool_exhausted_retry"
assert kwargs["level"] == "warning"
assert kwargs["details"]["organization_id"] == 7
assert kwargs["details"]["attempt"] == 2
@pytest.mark.asyncio
async def test_phone_number_pool_exhausted_fails_on_third_attempt(self):
"""The third consecutive pool exhaustion attempt marks the campaign
failed with a specific operator-facing log entry."""
with (
patch("api.tasks.campaign_tasks.campaign_call_dispatcher") as mock_disp,
patch("api.tasks.campaign_tasks.db_client") as mock_db,
patch(
"api.tasks.campaign_tasks.get_campaign_event_publisher"
) as mock_get_pub,
):
mock_disp.process_batch = AsyncMock(
side_effect=PhoneNumberPoolExhaustedError(organization_id=7)
)
mock_db.increment_campaign_metadata_counter = AsyncMock(return_value=3)
mock_db.update_campaign = AsyncMock()
mock_db.append_campaign_log = AsyncMock()
mock_pub = AsyncMock()
@ -48,6 +87,7 @@ class TestProcessCampaignBatchFailureLogs:
mock_db.update_campaign.assert_called_once_with(
campaign_id=42, state="failed"
)
mock_pub.publish_batch_failed.assert_awaited_once()
mock_db.append_campaign_log.assert_called_once()
kwargs = mock_db.append_campaign_log.call_args.kwargs
@ -56,6 +96,7 @@ class TestProcessCampaignBatchFailureLogs:
assert kwargs["level"] == "error"
assert "phone number" in kwargs["message"].lower()
assert kwargs["details"]["organization_id"] == 7
assert kwargs["details"]["attempt"] == 3
@pytest.mark.asyncio
async def test_concurrent_slot_timeout_still_logs_specific_event(self):

11
api/utils/artifacts.py Normal file
View file

@ -0,0 +1,11 @@
"""Helpers for workflow run artifact access."""
from api.constants import BACKEND_API_ENDPOINT
def artifact_url(
token: str | None, artifact: str, fallback: str | None = None
) -> str | None:
if not token:
return fallback
return f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow/{token}/{artifact}"