2026-06-12 14:55:30 +05:30
|
|
|
import os
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
from pipecat.utils.run_context import set_current_run_id
|
|
|
|
|
|
|
|
|
|
from api.db import db_client
|
|
|
|
|
from api.services.storage import get_current_storage_backend, storage_fs
|
|
|
|
|
from api.services.workflow_run_billing import (
|
|
|
|
|
report_completed_workflow_run_platform_usage,
|
|
|
|
|
)
|
|
|
|
|
from api.tasks.run_integrations import run_integrations_post_workflow_run
|
|
|
|
|
|
|
|
|
|
|
2026-06-16 15:19:49 +05:30
|
|
|
def _recording_metadata(storage_key: str, storage_backend: str, track: str) -> dict:
|
|
|
|
|
return {
|
|
|
|
|
"storage_key": storage_key,
|
|
|
|
|
"storage_backend": storage_backend,
|
|
|
|
|
"format": "wav",
|
|
|
|
|
"track": track,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _upload_temp_file(
|
|
|
|
|
workflow_run_id: int,
|
|
|
|
|
temp_file_path: str,
|
|
|
|
|
storage_key: str,
|
|
|
|
|
label: str,
|
|
|
|
|
) -> bool:
|
|
|
|
|
try:
|
|
|
|
|
if not os.path.exists(temp_file_path):
|
|
|
|
|
logger.warning(f"{label} temp file not found: {temp_file_path}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
file_size = os.path.getsize(temp_file_path)
|
|
|
|
|
logger.debug(f"{label} file size: {file_size} bytes")
|
|
|
|
|
|
|
|
|
|
await storage_fs.aupload_file(temp_file_path, storage_key)
|
|
|
|
|
logger.info(f"Successfully uploaded {label}: {storage_key}")
|
|
|
|
|
return True
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error uploading {label} for workflow {workflow_run_id}: {e}")
|
|
|
|
|
return False
|
|
|
|
|
finally:
|
|
|
|
|
if os.path.exists(temp_file_path):
|
|
|
|
|
try:
|
|
|
|
|
os.remove(temp_file_path)
|
|
|
|
|
logger.debug(f"Cleaned up temp {label} file: {temp_file_path}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"Failed to clean up temp {label} file: {e}")
|
|
|
|
|
|
|
|
|
|
|
2026-06-12 14:55:30 +05:30
|
|
|
async def process_workflow_completion(
|
|
|
|
|
_ctx,
|
|
|
|
|
workflow_run_id: int,
|
|
|
|
|
audio_temp_path: Optional[str] = None,
|
|
|
|
|
transcript_temp_path: Optional[str] = None,
|
2026-06-16 15:19:49 +05:30
|
|
|
user_audio_temp_path: Optional[str] = None,
|
|
|
|
|
bot_audio_temp_path: Optional[str] = None,
|
2026-06-12 14:55:30 +05:30
|
|
|
):
|
|
|
|
|
"""Process workflow completion: upload artifacts and run integrations.
|
|
|
|
|
|
|
|
|
|
This task combines audio upload, transcript upload, and webhook integrations
|
|
|
|
|
into a single sequential task to ensure integrations run after uploads complete.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
_ctx: ARQ context (unused)
|
|
|
|
|
workflow_run_id: The workflow run ID
|
|
|
|
|
audio_temp_path: Optional path to temp audio file
|
|
|
|
|
transcript_temp_path: Optional path to temp transcript file
|
2026-06-16 15:19:49 +05:30
|
|
|
user_audio_temp_path: Optional path to temp user-track audio file
|
|
|
|
|
bot_audio_temp_path: Optional path to temp bot-track audio file
|
2026-06-12 14:55:30 +05:30
|
|
|
"""
|
|
|
|
|
run_id = str(workflow_run_id)
|
|
|
|
|
set_current_run_id(run_id)
|
|
|
|
|
|
|
|
|
|
logger.info(f"Processing workflow completion for run {workflow_run_id}")
|
|
|
|
|
|
|
|
|
|
storage_backend = get_current_storage_backend()
|
|
|
|
|
|
|
|
|
|
# Step 1: Upload audio if provided
|
2026-06-16 15:19:49 +05:30
|
|
|
recordings_metadata: dict[str, dict] = {}
|
|
|
|
|
|
2026-06-12 14:55:30 +05:30
|
|
|
if audio_temp_path:
|
2026-06-16 15:19:49 +05:30
|
|
|
recording_url = f"recordings/{workflow_run_id}.wav"
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Uploading mixed audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
|
|
|
|
|
)
|
|
|
|
|
if await _upload_temp_file(
|
|
|
|
|
workflow_run_id, audio_temp_path, recording_url, "mixed audio"
|
|
|
|
|
):
|
|
|
|
|
recordings_metadata["mixed"] = _recording_metadata(
|
|
|
|
|
recording_url, storage_backend.value, "mixed"
|
|
|
|
|
)
|
|
|
|
|
await db_client.update_workflow_run(
|
|
|
|
|
run_id=workflow_run_id,
|
|
|
|
|
recording_url=recording_url,
|
|
|
|
|
storage_backend=storage_backend.value,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if user_audio_temp_path:
|
|
|
|
|
user_recording_url = f"recordings/{workflow_run_id}/user.wav"
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Uploading user audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
|
|
|
|
|
)
|
|
|
|
|
if await _upload_temp_file(
|
|
|
|
|
workflow_run_id, user_audio_temp_path, user_recording_url, "user audio"
|
|
|
|
|
):
|
|
|
|
|
recordings_metadata["user"] = _recording_metadata(
|
|
|
|
|
user_recording_url, storage_backend.value, "user"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if bot_audio_temp_path:
|
|
|
|
|
bot_recording_url = f"recordings/{workflow_run_id}/bot.wav"
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Uploading bot audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
|
|
|
|
|
)
|
|
|
|
|
if await _upload_temp_file(
|
|
|
|
|
workflow_run_id, bot_audio_temp_path, bot_recording_url, "bot audio"
|
|
|
|
|
):
|
|
|
|
|
recordings_metadata["bot"] = _recording_metadata(
|
|
|
|
|
bot_recording_url, storage_backend.value, "bot"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if recordings_metadata:
|
|
|
|
|
await db_client.update_workflow_run(
|
|
|
|
|
run_id=workflow_run_id,
|
|
|
|
|
storage_backend=storage_backend.value,
|
|
|
|
|
extra={"recordings": recordings_metadata},
|
|
|
|
|
)
|
2026-06-12 14:55:30 +05:30
|
|
|
|
|
|
|
|
# Step 2: Upload transcript if provided
|
|
|
|
|
if transcript_temp_path:
|
|
|
|
|
try:
|
|
|
|
|
if os.path.exists(transcript_temp_path):
|
|
|
|
|
file_size = os.path.getsize(transcript_temp_path)
|
|
|
|
|
logger.debug(f"Transcript file size: {file_size} bytes")
|
|
|
|
|
|
|
|
|
|
transcript_url = f"transcripts/{workflow_run_id}.txt"
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Uploading transcript to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await storage_fs.aupload_file(transcript_temp_path, transcript_url)
|
|
|
|
|
await db_client.update_workflow_run(
|
|
|
|
|
run_id=workflow_run_id,
|
|
|
|
|
transcript_url=transcript_url,
|
|
|
|
|
storage_backend=storage_backend.value,
|
|
|
|
|
)
|
|
|
|
|
logger.info(f"Successfully uploaded transcript: {transcript_url}")
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(
|
|
|
|
|
f"Transcript temp file not found: {transcript_temp_path}"
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
f"Error uploading transcript for workflow {workflow_run_id}: {e}"
|
|
|
|
|
)
|
|
|
|
|
finally:
|
|
|
|
|
if transcript_temp_path and os.path.exists(transcript_temp_path):
|
|
|
|
|
try:
|
|
|
|
|
os.remove(transcript_temp_path)
|
|
|
|
|
logger.debug(
|
|
|
|
|
f"Cleaned up temp transcript file: {transcript_temp_path}"
|
|
|
|
|
)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"Failed to clean up temp transcript file: {e}")
|
|
|
|
|
|
|
|
|
|
# Step 3: Run integrations including QA analysis (after uploads are complete)
|
|
|
|
|
try:
|
|
|
|
|
await run_integrations_post_workflow_run(_ctx, workflow_run_id)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error running integrations for workflow {workflow_run_id}: {e}")
|
|
|
|
|
|
|
|
|
|
# Step 4: Notify MPS after completion. MPS owns credit accounting.
|
|
|
|
|
try:
|
|
|
|
|
await report_completed_workflow_run_platform_usage(workflow_run_id)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
f"Error reporting platform usage for workflow {workflow_run_id}: {e}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.info(f"Completed workflow completion processing for run {workflow_run_id}")
|