From a54ab519b8e22097f8105dc766b6b9aa4c473cf8 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Fri, 3 Jul 2026 20:01:52 +0530 Subject: [PATCH 1/2] chore: refactor file upload mechanism to avoid NFS dependency (#496) * chore: refactor file upload mechanism to avoid NFS dependency * add regression test for deregistration of calls * fix: fix minio upload issue * fix: make transcript upload async --- api/services/filesystem/base.py | 34 ++- api/services/filesystem/local.py | 6 +- api/services/filesystem/minio.py | 10 +- api/services/filesystem/null.py | 6 +- api/services/filesystem/s3.py | 6 +- api/services/pipecat/event_handlers.py | 48 ++-- api/services/pipecat/in_memory_buffers.py | 53 ++--- api/services/workflow_run_artifacts.py | 126 +++++++++++ api/tasks/arq.py | 2 - api/tasks/function_names.py | 1 - api/tasks/s3_upload.py | 67 ------ api/tasks/workflow_completion.py | 212 +++++++----------- api/tests/integrations/test_run_pipeline.py | 72 ++++++ deploy/helm/dograh/README.md | 38 +--- deploy/helm/dograh/templates/NOTES.txt | 12 - deploy/helm/dograh/templates/_helpers.tpl | 18 -- .../templates/arq-worker-deployment.yaml | 4 - .../helm/dograh/templates/shared-tmp-pvc.yaml | 18 -- .../helm/dograh/templates/web-deployment.yaml | 4 - deploy/helm/dograh/values.yaml | 22 -- deploy/hostinger/docker-compose.yaml | 4 - docker-compose.yaml | 4 - .../src/dograh_sdk/_generated_models.py | 4 +- 23 files changed, 370 insertions(+), 401 deletions(-) create mode 100644 api/services/workflow_run_artifacts.py delete mode 100644 api/tasks/s3_upload.py delete mode 100644 deploy/helm/dograh/templates/shared-tmp-pvc.yaml diff --git a/api/services/filesystem/base.py b/api/services/filesystem/base.py index d840d2bd..d1c54ca7 100644 --- a/api/services/filesystem/base.py +++ b/api/services/filesystem/base.py @@ -1,23 +1,51 @@ from abc import ABC, abstractmethod -from typing import Any, BinaryIO, Dict, Optional +from typing import Any, Dict, Optional, Protocol + + +class AsyncReadable(Protocol): + """Anything exposing ``await .read() -> bytes`` (aiofiles handles, in-memory wrappers).""" + + async def read(self) -> bytes: ... + + +class _AsyncBytesReader: + """Async file-like wrapper over in-memory bytes for acreate_file().""" + + def __init__(self, data: bytes): + self._data = data + + async def read(self) -> bytes: + return self._data class BaseFileSystem(ABC): """Abstract base class for filesystem operations.""" @abstractmethod - async def acreate_file(self, file_path: str, content: BinaryIO) -> bool: + async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool: """Create a new file with the given content. Args: file_path: Path where the file should be created - content: File content as a binary stream + content: File content readable via ``await content.read()`` Returns: bool: True if file was created successfully, False otherwise """ pass + async def acreate_file_from_bytes(self, file_path: str, data: bytes) -> bool: + """Create a file directly from in-memory bytes (no local file needed). + + Args: + file_path: Path where the file should be created + data: File content as bytes + + Returns: + bool: True if file was created successfully, False otherwise + """ + return await self.acreate_file(file_path, _AsyncBytesReader(data)) + @abstractmethod async def aupload_file(self, local_path: str, destination_path: str) -> bool: """Upload a file from local path to destination. diff --git a/api/services/filesystem/local.py b/api/services/filesystem/local.py index 7a9d9756..a73a8a2f 100644 --- a/api/services/filesystem/local.py +++ b/api/services/filesystem/local.py @@ -1,11 +1,11 @@ import asyncio import os from datetime import datetime -from typing import BinaryIO, Optional +from typing import Optional import aiofiles -from .base import BaseFileSystem +from .base import AsyncReadable, BaseFileSystem class LocalFileSystem(BaseFileSystem): @@ -24,7 +24,7 @@ class LocalFileSystem(BaseFileSystem): """Get the full path by joining with base path.""" return os.path.join(self.base_path, file_path) - async def acreate_file(self, file_path: str, content: BinaryIO) -> bool: + async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool: try: full_path = self._get_full_path(file_path) os.makedirs(os.path.dirname(full_path), exist_ok=True) diff --git a/api/services/filesystem/minio.py b/api/services/filesystem/minio.py index f26bf1cc..e81c6095 100644 --- a/api/services/filesystem/minio.py +++ b/api/services/filesystem/minio.py @@ -1,12 +1,13 @@ import asyncio +import io import json -from typing import Any, BinaryIO, Dict, Optional +from typing import Any, Dict, Optional from loguru import logger from minio import Minio from minio.error import S3Error -from .base import BaseFileSystem +from .base import AsyncReadable, BaseFileSystem class MinioFileSystem(BaseFileSystem): @@ -89,15 +90,16 @@ class MinioFileSystem(BaseFileSystem): logger.debug(f"Bucket setup note: {e}") pass - async def acreate_file(self, file_path: str, content: BinaryIO) -> bool: + async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool: try: data = await content.read() def _put(): + # The MinIO SDK requires a stream with .read(), not raw bytes. self.client.put_object( self.bucket_name, file_path, - data=bytes(data), + data=io.BytesIO(data), length=len(data), ) diff --git a/api/services/filesystem/null.py b/api/services/filesystem/null.py index e01c72b1..90b4a090 100644 --- a/api/services/filesystem/null.py +++ b/api/services/filesystem/null.py @@ -1,6 +1,6 @@ -from typing import Any, BinaryIO, Dict, NoReturn, Optional +from typing import Any, Dict, NoReturn, Optional -from .base import BaseFileSystem +from .base import AsyncReadable, BaseFileSystem class NullFileSystem(BaseFileSystem): @@ -16,7 +16,7 @@ class NullFileSystem(BaseFileSystem): "Set ENVIRONMENT to a non-test value or inject a real filesystem fixture." ) - async def acreate_file(self, file_path: str, content: BinaryIO) -> bool: + async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool: self._fail("acreate_file") async def aupload_file(self, local_path: str, destination_path: str) -> bool: diff --git a/api/services/filesystem/s3.py b/api/services/filesystem/s3.py index e6b99fa3..68d633bb 100644 --- a/api/services/filesystem/s3.py +++ b/api/services/filesystem/s3.py @@ -1,10 +1,10 @@ -from typing import Any, BinaryIO, Dict, Optional +from typing import Any, Dict, Optional import aioboto3 from botocore.config import Config from botocore.exceptions import ClientError -from .base import BaseFileSystem +from .base import AsyncReadable, BaseFileSystem class S3FileSystem(BaseFileSystem): @@ -57,7 +57,7 @@ class S3FileSystem(BaseFileSystem): kwargs["config"] = self._config return kwargs - async def acreate_file(self, file_path: str, content: BinaryIO) -> bool: + async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool: try: async with self.session.client("s3", **self._client_kwargs()) as s3_client: await s3_client.put_object( diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index bb66d19f..dbd27998 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -16,6 +16,7 @@ from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggr from api.services.pipecat.tracing_config import get_trace_url from api.services.posthog_client import capture_event from api.services.workflow.pipecat_engine import PipecatEngine +from api.services.workflow_run_artifacts import upload_workflow_run_artifacts from api.tasks.arq import enqueue_job from api.tasks.function_names import FunctionNames from pipecat.frames.frames import ( @@ -361,50 +362,49 @@ def register_event_handlers( except Exception as e: logger.error(f"Error saving workflow run logs: {e}", exc_info=True) - # Write buffers to temp files and enqueue combined processing task - audio_temp_path = None - user_audio_temp_path = None - bot_audio_temp_path = None - transcript_temp_path = None - + # Upload artifacts straight from the in-memory buffers so nothing has + # to cross a process/host boundary via temp files. Must complete + # before the completion job is enqueued so QA and webhooks see the + # artifacts in storage. try: + mixed_audio_wav = None + user_audio_wav = None + bot_audio_wav = None + if not in_memory_audio_buffers.mixed.is_empty: - audio_temp_path = ( - await in_memory_audio_buffers.mixed.write_to_temp_file() - ) + mixed_audio_wav = await in_memory_audio_buffers.mixed.to_wav_bytes() else: logger.debug("Audio buffer is empty, skipping upload") if not in_memory_audio_buffers.user.is_empty: - user_audio_temp_path = ( - await in_memory_audio_buffers.user.write_to_temp_file() - ) + user_audio_wav = await in_memory_audio_buffers.user.to_wav_bytes() else: logger.debug("User audio buffer is empty, skipping upload") if not in_memory_audio_buffers.bot.is_empty: - bot_audio_temp_path = ( - await in_memory_audio_buffers.bot.write_to_temp_file() - ) + bot_audio_wav = await in_memory_audio_buffers.bot.to_wav_bytes() else: logger.debug("Bot audio buffer is empty, skipping upload") - transcript_temp_path = in_memory_logs_buffer.write_transcript_to_temp_file() - if not transcript_temp_path: + transcript_text = in_memory_logs_buffer.generate_transcript_text() + if not transcript_text: logger.debug("No transcript events in logs buffer, skipping upload") + await upload_workflow_run_artifacts( + workflow_run_id, + mixed_audio_wav=mixed_audio_wav, + user_audio_wav=user_audio_wav, + bot_audio_wav=bot_audio_wav, + transcript_text=transcript_text, + ) except Exception as e: - logger.error(f"Error preparing buffers for S3 upload: {e}", exc_info=True) + logger.error(f"Error uploading call artifacts: {e}", exc_info=True) - # Combined task: uploads artifacts, runs integrations (including QA), - # then calculates cost (so QA token usage is captured in usage_info) + # Combined task: runs integrations (including QA), then calculates + # cost (so QA token usage is captured in usage_info) await enqueue_job( FunctionNames.PROCESS_WORKFLOW_COMPLETION, workflow_run_id, - audio_temp_path, - transcript_temp_path, - user_audio_temp_path, - bot_audio_temp_path, ) # Return the buffer so it can be passed to other handlers diff --git a/api/services/pipecat/in_memory_buffers.py b/api/services/pipecat/in_memory_buffers.py index 5c7f3030..12d29018 100644 --- a/api/services/pipecat/in_memory_buffers.py +++ b/api/services/pipecat/in_memory_buffers.py @@ -1,5 +1,5 @@ import asyncio -import tempfile +import io import wave from datetime import UTC, datetime from typing import List, Optional @@ -15,7 +15,7 @@ from pipecat.utils.enums import RealtimeFeedbackType class InMemoryAudioBuffer: - """Buffer audio data in memory during a call, then write to temp file on disconnect.""" + """Buffer audio data in memory during a call, then encode to WAV bytes on disconnect.""" def __init__(self, workflow_run_id: int, sample_rate: int, num_channels: int = 1): self._workflow_run_id = workflow_run_id @@ -41,28 +41,30 @@ class InMemoryAudioBuffer: f"Appended {len(pcm_data)} bytes to audio buffer. Total size: {self._total_size}" ) - async def write_to_temp_file(self) -> str: - """Write audio data to a temporary WAV file and return the path.""" + async def to_wav_bytes(self) -> bytes: + """Encode the buffered PCM data as an in-memory WAV file.""" async with self._lock: - temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) - logger.debug( - f"Writing audio buffer to temp file {temp_file.name} for workflow {self._workflow_run_id}" - ) + chunks = list(self._chunks) - # Write WAV header and PCM data - with wave.open(temp_file.name, "wb") as wf: + def _encode() -> bytes: + wav_io = io.BytesIO() + with wave.open(wav_io, "wb") as wf: wf.setnchannels(self._num_channels) wf.setsampwidth(2) # 16-bit audio wf.setframerate(self._sample_rate) # Concatenate all chunks - for chunk in self._chunks: + for chunk in chunks: wf.writeframes(chunk) + return wav_io.getvalue() - logger.info( - f"Successfully wrote {self._total_size} bytes of audio to {temp_file.name}" - ) - return temp_file.name + # Encoding is mostly memcpy but can touch ~100MB; keep it off the event loop + data = await asyncio.to_thread(_encode) + logger.info( + f"Encoded {self._total_size} bytes of audio to {len(data)} WAV bytes " + f"for workflow {self._workflow_run_id}" + ) + return data @property def is_empty(self) -> bool: @@ -172,27 +174,6 @@ class InMemoryLogsBuffer: """ return _generate_transcript_text(self._sorted_events()) - def write_transcript_to_temp_file(self) -> Optional[str]: - """Write transcript to a temporary text file and return the path. - - Returns None if there are no transcript events. - """ - content = self.generate_transcript_text() - if not content: - return None - - temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) - logger.debug( - f"Writing transcript to temp file {temp_file.name} for workflow {self._workflow_run_id}" - ) - temp_file.write(content) - temp_file.close() - - logger.info( - f"Successfully wrote {len(content)} chars of transcript to {temp_file.name}" - ) - return temp_file.name - @property def is_empty(self) -> bool: """Check if the buffer is empty.""" diff --git a/api/services/workflow_run_artifacts.py b/api/services/workflow_run_artifacts.py new file mode 100644 index 00000000..e61c47e8 --- /dev/null +++ b/api/services/workflow_run_artifacts.py @@ -0,0 +1,126 @@ +"""Upload end-of-call artifacts (recordings, transcript) to object storage. + +Called from the pipeline process itself, straight from the in-memory call +buffers, so no local file ever has to cross a process/host boundary (no +shared /tmp between web and ARQ workers). Uploads happen before the +workflow-completion job is enqueued so QA and webhooks see the artifacts +in storage. +""" + +from loguru import logger + +from api.db import db_client +from api.services.storage import get_current_storage_backend, storage_fs + + +def _recording_metadata(storage_key: str, storage_backend: str, track: str) -> dict: + return { + "storage_key": storage_key, + "storage_backend": storage_backend, + "format": "wav", + "track": track, + } + + +async def _upload_bytes( + workflow_run_id: int, + data: bytes, + storage_key: str, + label: str, +) -> bool: + try: + logger.debug(f"{label} size: {len(data)} bytes") + if await storage_fs.acreate_file_from_bytes(storage_key, data): + logger.info(f"Successfully uploaded {label}: {storage_key}") + return True + logger.error( + f"Storage backend rejected {label} upload for workflow " + f"{workflow_run_id}: {storage_key}" + ) + return False + except Exception as e: + logger.error(f"Error uploading {label} for workflow {workflow_run_id}: {e}") + return False + + +async def upload_workflow_run_artifacts( + workflow_run_id: int, + *, + mixed_audio_wav: bytes | None = None, + user_audio_wav: bytes | None = None, + bot_audio_wav: bytes | None = None, + transcript_text: str | None = None, +) -> None: + """Upload call artifacts to object storage and persist their metadata. + + Each artifact is uploaded independently; a failure is logged and the + remaining artifacts are still attempted. + """ + storage_backend = get_current_storage_backend() + + recordings_metadata: dict[str, dict] = {} + + if mixed_audio_wav: + recording_url = f"recordings/{workflow_run_id}.wav" + logger.info( + f"Uploading mixed audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}" + ) + if await _upload_bytes( + workflow_run_id, mixed_audio_wav, recording_url, "mixed audio" + ): + recordings_metadata["mixed"] = _recording_metadata( + recording_url, storage_backend.value, "mixed" + ) + await db_client.update_workflow_run( + run_id=workflow_run_id, + recording_url=recording_url, + storage_backend=storage_backend.value, + ) + + if user_audio_wav: + user_recording_url = f"recordings/{workflow_run_id}/user.wav" + logger.info( + f"Uploading user audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}" + ) + if await _upload_bytes( + workflow_run_id, user_audio_wav, user_recording_url, "user audio" + ): + recordings_metadata["user"] = _recording_metadata( + user_recording_url, storage_backend.value, "user" + ) + + if bot_audio_wav: + bot_recording_url = f"recordings/{workflow_run_id}/bot.wav" + logger.info( + f"Uploading bot audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}" + ) + if await _upload_bytes( + workflow_run_id, bot_audio_wav, bot_recording_url, "bot audio" + ): + recordings_metadata["bot"] = _recording_metadata( + bot_recording_url, storage_backend.value, "bot" + ) + + if recordings_metadata: + await db_client.update_workflow_run( + run_id=workflow_run_id, + storage_backend=storage_backend.value, + extra={"recordings": recordings_metadata}, + ) + + if transcript_text: + transcript_url = f"transcripts/{workflow_run_id}.txt" + logger.info( + f"Uploading transcript to {storage_backend.name} - workflow_run_id: {workflow_run_id}" + ) + if await _upload_bytes( + workflow_run_id, + transcript_text.encode("utf-8"), + transcript_url, + "transcript", + ): + await db_client.update_workflow_run( + run_id=workflow_run_id, + transcript_url=transcript_url, + storage_backend=storage_backend.value, + ) diff --git a/api/tasks/arq.py b/api/tasks/arq.py index 01815b7b..189a32a3 100644 --- a/api/tasks/arq.py +++ b/api/tasks/arq.py @@ -45,7 +45,6 @@ from api.tasks.campaign_tasks import ( ) from api.tasks.knowledge_base_processing import process_knowledge_base_document from api.tasks.run_integrations import run_integrations_post_workflow_run -from api.tasks.s3_upload import upload_voicemail_audio_to_s3 from api.tasks.webhook_delivery import deliver_webhook, sweep_webhook_deliveries from api.tasks.workflow_completion import process_workflow_completion @@ -53,7 +52,6 @@ from api.tasks.workflow_completion import process_workflow_completion class WorkerSettings: functions = [ run_integrations_post_workflow_run, - upload_voicemail_audio_to_s3, process_workflow_completion, sync_campaign_source, process_campaign_batch, diff --git a/api/tasks/function_names.py b/api/tasks/function_names.py index 599555b5..bd41131d 100644 --- a/api/tasks/function_names.py +++ b/api/tasks/function_names.py @@ -1,7 +1,6 @@ class FunctionNames: RUN_INTEGRATIONS_POST_WORKFLOW_RUN = "run_integrations_post_workflow_run" PROCESS_WORKFLOW_COMPLETION = "process_workflow_completion" - UPLOAD_VOICEMAIL_AUDIO_TO_S3 = "upload_voicemail_audio_to_s3" SYNC_CAMPAIGN_SOURCE = "sync_campaign_source" PROCESS_CAMPAIGN_BATCH = "process_campaign_batch" PROCESS_KNOWLEDGE_BASE_DOCUMENT = "process_knowledge_base_document" diff --git a/api/tasks/s3_upload.py b/api/tasks/s3_upload.py deleted file mode 100644 index bbbc8bf4..00000000 --- a/api/tasks/s3_upload.py +++ /dev/null @@ -1,67 +0,0 @@ -import os - -from loguru import logger -from pipecat.utils.run_context import set_current_run_id - -from api.services.storage import storage_fs - - -async def upload_voicemail_audio_to_s3( - _ctx, - workflow_run_id: int, - temp_file_path: str, - s3_key: str, -): - """Upload voicemail detection audio from temp file to S3. - - Handles voicemail-specific paths and doesn't update the workflow run's - recording_url field. - - Args: - _ctx: ARQ context (unused) - workflow_run_id: The workflow run ID - temp_file_path: Path to the temporary WAV file - s3_key: The S3 key where the file should be uploaded - """ - run_id = str(workflow_run_id) - set_current_run_id(run_id) - - logger.info(f"Starting voicemail audio upload to S3 from {temp_file_path}") - - try: - # Verify temp file exists - if not os.path.exists(temp_file_path): - logger.error(f"Temp voicemail audio file not found: {temp_file_path}") - raise FileNotFoundError( - f"Temp voicemail audio file not found: {temp_file_path}" - ) - - file_size = os.path.getsize(temp_file_path) - logger.debug(f"Voicemail audio file size: {file_size} bytes") - - # Upload to S3 - upload_ok = await storage_fs.aupload_file(temp_file_path, s3_key) - - if upload_ok: - logger.info(f"Successfully uploaded voicemail audio to S3: {s3_key}") - else: - logger.error( - f"Failed to upload voicemail audio to S3 for workflow {workflow_run_id}" - ) - raise Exception(f"S3 upload failed for {s3_key}") - - except Exception as e: - logger.error( - f"Error uploading voicemail audio to S3 for workflow {workflow_run_id}: {e}" - ) - raise - finally: - # Clean up temp file - if os.path.exists(temp_file_path): - try: - os.remove(temp_file_path) - logger.debug(f"Cleaned up temp voicemail audio file: {temp_file_path}") - except Exception as e: - logger.warning( - f"Failed to clean up temp voicemail audio file {temp_file_path}: {e}" - ) diff --git a/api/tasks/workflow_completion.py b/api/tasks/workflow_completion.py index 6943c0d6..294182a2 100644 --- a/api/tasks/workflow_completion.py +++ b/api/tasks/workflow_completion.py @@ -1,178 +1,118 @@ +import asyncio import os -from typing import Optional from loguru import logger from pipecat.utils.run_context import set_current_run_id -from api.db import db_client -from api.services.storage import get_current_storage_backend, storage_fs +from api.services.workflow_run_artifacts import upload_workflow_run_artifacts from api.services.workflow_run_billing import ( report_completed_workflow_run_platform_usage, ) from api.tasks.run_integrations import run_integrations_post_workflow_run -def _recording_metadata(storage_key: str, storage_backend: str, track: str) -> dict: - return { - "storage_key": storage_key, - "storage_backend": storage_backend, - "format": "wav", - "track": track, - } - - -async def _upload_temp_file( - workflow_run_id: int, - temp_file_path: str, - storage_key: str, - label: str, -) -> bool: +def _read_and_remove_temp_file(temp_file_path: str | None, label: str) -> bytes | None: + if not temp_file_path: + return None try: if not os.path.exists(temp_file_path): logger.warning(f"{label} temp file not found: {temp_file_path}") - return False - - file_size = os.path.getsize(temp_file_path) - logger.debug(f"{label} file size: {file_size} bytes") - - await storage_fs.aupload_file(temp_file_path, storage_key) - logger.info(f"Successfully uploaded {label}: {storage_key}") - return True + return None + with open(temp_file_path, "rb") as f: + data = f.read() + os.remove(temp_file_path) + return data except Exception as e: - logger.error(f"Error uploading {label} for workflow {workflow_run_id}: {e}") - return False - finally: - if os.path.exists(temp_file_path): - try: - os.remove(temp_file_path) - logger.debug(f"Cleaned up temp {label} file: {temp_file_path}") - except Exception as e: - logger.warning(f"Failed to clean up temp {label} file: {e}") + logger.error(f"Error reading legacy {label} temp file {temp_file_path}: {e}") + return None + + +async def _upload_legacy_temp_artifacts( + workflow_run_id: int, + audio_temp_path: str | None, + transcript_temp_path: str | None, + user_audio_temp_path: str | None, + bot_audio_temp_path: str | None, +) -> None: + """Handle jobs enqueued before uploads moved into the pipeline process. + + Pre-refactor web workers passed local temp-file paths; upload them if this + worker can still see the files (same host / shared volume). + + Deprecated: remove once no pre-refactor jobs can remain in the queue. + """ + logger.info( + f"Processing legacy temp-file artifacts for workflow run {workflow_run_id}" + ) + transcript_bytes = await asyncio.to_thread( + _read_and_remove_temp_file, transcript_temp_path, "transcript" + ) + await upload_workflow_run_artifacts( + workflow_run_id, + mixed_audio_wav=await asyncio.to_thread( + _read_and_remove_temp_file, audio_temp_path, "mixed audio" + ), + user_audio_wav=await asyncio.to_thread( + _read_and_remove_temp_file, user_audio_temp_path, "user audio" + ), + bot_audio_wav=await asyncio.to_thread( + _read_and_remove_temp_file, bot_audio_temp_path, "bot audio" + ), + transcript_text=( + transcript_bytes.decode("utf-8") if transcript_bytes else None + ), + ) async def process_workflow_completion( _ctx, workflow_run_id: int, - audio_temp_path: Optional[str] = None, - transcript_temp_path: Optional[str] = None, - user_audio_temp_path: Optional[str] = None, - bot_audio_temp_path: Optional[str] = None, + audio_temp_path: str | None = None, + transcript_temp_path: str | None = None, + user_audio_temp_path: str | None = None, + bot_audio_temp_path: str | None = None, ): - """Process workflow completion: upload artifacts and run integrations. + """Process workflow completion: run integrations and report billing. - This task combines audio upload, transcript upload, and webhook integrations - into a single sequential task to ensure integrations run after uploads complete. + Recording/transcript uploads happen in the pipeline process itself + (api/services/workflow_run_artifacts.py) before this job is enqueued, + so this task needs no shared filesystem with the web tier. The temp-path + arguments only exist for jobs enqueued by pre-refactor web workers. Args: _ctx: ARQ context (unused) workflow_run_id: The workflow run ID - audio_temp_path: Optional path to temp audio file - transcript_temp_path: Optional path to temp transcript file - user_audio_temp_path: Optional path to temp user-track audio file - bot_audio_temp_path: Optional path to temp bot-track audio file + audio_temp_path: Deprecated, pre-refactor jobs only + transcript_temp_path: Deprecated, pre-refactor jobs only + user_audio_temp_path: Deprecated, pre-refactor jobs only + bot_audio_temp_path: Deprecated, pre-refactor jobs only """ run_id = str(workflow_run_id) set_current_run_id(run_id) logger.info(f"Processing workflow completion for run {workflow_run_id}") - storage_backend = get_current_storage_backend() - - # Step 1: Upload audio if provided - recordings_metadata: dict[str, dict] = {} - - if audio_temp_path: - recording_url = f"recordings/{workflow_run_id}.wav" - logger.info( - f"Uploading mixed audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}" - ) - if await _upload_temp_file( - workflow_run_id, audio_temp_path, recording_url, "mixed audio" - ): - recordings_metadata["mixed"] = _recording_metadata( - recording_url, storage_backend.value, "mixed" - ) - await db_client.update_workflow_run( - run_id=workflow_run_id, - recording_url=recording_url, - storage_backend=storage_backend.value, - ) - - if user_audio_temp_path: - user_recording_url = f"recordings/{workflow_run_id}/user.wav" - logger.info( - f"Uploading user audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}" - ) - if await _upload_temp_file( - workflow_run_id, user_audio_temp_path, user_recording_url, "user audio" - ): - recordings_metadata["user"] = _recording_metadata( - user_recording_url, storage_backend.value, "user" - ) - - if bot_audio_temp_path: - bot_recording_url = f"recordings/{workflow_run_id}/bot.wav" - logger.info( - f"Uploading bot audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}" - ) - if await _upload_temp_file( - workflow_run_id, bot_audio_temp_path, bot_recording_url, "bot audio" - ): - recordings_metadata["bot"] = _recording_metadata( - bot_recording_url, storage_backend.value, "bot" - ) - - if recordings_metadata: - await db_client.update_workflow_run( - run_id=workflow_run_id, - storage_backend=storage_backend.value, - extra={"recordings": recordings_metadata}, + if ( + audio_temp_path + or transcript_temp_path + or user_audio_temp_path + or bot_audio_temp_path + ): + await _upload_legacy_temp_artifacts( + workflow_run_id, + audio_temp_path, + transcript_temp_path, + user_audio_temp_path, + bot_audio_temp_path, ) - # Step 2: Upload transcript if provided - if transcript_temp_path: - try: - if os.path.exists(transcript_temp_path): - file_size = os.path.getsize(transcript_temp_path) - logger.debug(f"Transcript file size: {file_size} bytes") - - transcript_url = f"transcripts/{workflow_run_id}.txt" - logger.info( - f"Uploading transcript to {storage_backend.name} - workflow_run_id: {workflow_run_id}" - ) - - await storage_fs.aupload_file(transcript_temp_path, transcript_url) - await db_client.update_workflow_run( - run_id=workflow_run_id, - transcript_url=transcript_url, - storage_backend=storage_backend.value, - ) - logger.info(f"Successfully uploaded transcript: {transcript_url}") - else: - logger.warning( - f"Transcript temp file not found: {transcript_temp_path}" - ) - except Exception as e: - logger.error( - f"Error uploading transcript for workflow {workflow_run_id}: {e}" - ) - finally: - if transcript_temp_path and os.path.exists(transcript_temp_path): - try: - os.remove(transcript_temp_path) - logger.debug( - f"Cleaned up temp transcript file: {transcript_temp_path}" - ) - except Exception as e: - logger.warning(f"Failed to clean up temp transcript file: {e}") - - # Step 3: Run integrations including QA analysis (after uploads are complete) + # Run integrations including QA analysis (after uploads are complete) try: await run_integrations_post_workflow_run(_ctx, workflow_run_id) except Exception as e: logger.error(f"Error running integrations for workflow {workflow_run_id}: {e}") - # Step 4: Notify MPS after completion. MPS owns credit accounting. + # Notify MPS after completion. MPS owns credit accounting. try: await report_completed_workflow_run_platform_usage(workflow_run_id) except Exception as e: diff --git a/api/tests/integrations/test_run_pipeline.py b/api/tests/integrations/test_run_pipeline.py index 9806c509..baa3b061 100644 --- a/api/tests/integrations/test_run_pipeline.py +++ b/api/tests/integrations/test_run_pipeline.py @@ -21,6 +21,7 @@ from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams from api.enums import WorkflowRunMode, WorkflowRunState +from api.services.pipecat import active_calls from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started @@ -135,3 +136,74 @@ async def test_run_pipeline_fires_initial_response_and_completes_run( # on_pipeline_finished merges call_tags into gathered_context. assert "Start" in refreshed.gathered_context.get("nodes_visited", []) assert "call_tags" in refreshed.gathered_context + + +@pytest.mark.asyncio +async def test_call_stays_registered_for_drain_until_artifacts_uploaded( + workflow_run_setup, monkeypatch +): + """The active-call registry must not drop a run while its artifacts are + still uploading: deploy draining polls the registry and SIGTERMs the + worker at zero, which would kill in-flight uploads. + + The ordering rests on pipecat waiting for spawned ``on_pipeline_finished`` + handler tasks before ``PipelineWorker.run()`` — and therefore + ``run_pipeline_worker()`` — returns, with ``unregister_active_call`` in + the ``finally`` after that. This test pins the guarantee: a slow upload + samples the registry mid-flight and must still see the call registered. + """ + workflow_run, user, workflow = workflow_run_setup + active_calls._active_run_ids.clear() + + run_task_ref: list[asyncio.Task] = [] + observed: dict = {} + + async def slow_upload(workflow_run_id, **_kwargs): + # Give _run_pipeline a chance to (incorrectly) return and unregister + # while the upload is still in flight, then sample the registry. + await asyncio.sleep(0.2) + observed["count_during_upload"] = active_calls.active_call_count() + observed["run_task_done_during_upload"] = run_task_ref[0].done() + + monkeypatch.setattr( + "api.services.pipecat.event_handlers.upload_workflow_run_artifacts", + slow_upload, + ) + + transport = MockTransport( + TransportParams(audio_in_enabled=True, audio_out_enabled=True) + ) + captured_task: list = [] + audio_config = create_audio_config(WorkflowRunMode.SMALLWEBRTC.value) + with patch_run_pipeline_externals(captured_task): + run_task = asyncio.create_task( + _run_pipeline( + transport=transport, + workflow_id=workflow.id, + workflow_run_id=workflow_run.id, + user_id=user.id, + audio_config=audio_config, + user_provider_id=user.provider_id, + ) + ) + run_task_ref.append(run_task) + + for _ in range(60): + if captured_task or run_task.done(): + break + await asyncio.sleep(0.05) + if run_task.done() and not captured_task: + run_task.result() # re-raise the failure + assert captured_task, "create_pipeline_task was never invoked" + pipeline_task = captured_task[0] + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task + ) + assert active_calls.active_call_count() == 1 + await pipeline_task.cancel() + await asyncio.wait_for(run_task, timeout=5.0) + + assert observed, "upload_workflow_run_artifacts was never called" + assert observed["count_during_upload"] == 1 + assert observed["run_task_done_during_upload"] is False + assert active_calls.active_call_count() == 0 diff --git a/deploy/helm/dograh/README.md b/deploy/helm/dograh/README.md index 31308333..a0dba4aa 100644 --- a/deploy/helm/dograh/README.md +++ b/deploy/helm/dograh/README.md @@ -71,39 +71,16 @@ silent. Each is exposed in `values.yaml` for operator override. ## `/tmp` audit (review fix #6) -The current docker-compose mounts a `shared-tmp` volume across all -logical services so file handoffs between processes Just Work. In -Kubernetes with separated pods this is broken by default. - -**Findings:** - -| File | Process | Behavior | Cross-pod? | -|------|---------|----------|------------| -| `api/services/pipecat/event_handlers.py` (lines 364–383) | **web** | Writes WAV + transcript via `NamedTemporaryFile`, then `enqueue_job(...)` to ARQ with the local path | **YES — broken** | -| `api/tasks/s3_upload.py` | **arq-worker** | Receives `temp_file_path`, `os.path.exists`, uploads, deletes | **reads from web's path** | -| `api/services/pipecat/in_memory_buffers.py` | web | Writes tempfiles consumed in the same process | No | -| `api/services/pipecat/audio_file_cache.py` | web | Per-process cache | No | -| `api/tasks/knowledge_base_processing.py` | arq-worker | Writes + reads in the same task | No | - -**Mitigation in this chart:** `sharedTmp.enabled` flag in `values.yaml`. -When enabled, the chart creates a `ReadWriteMany` PVC mounted into -both `dograh-web` and `dograh-arq-worker` at -`/tmp/dograh-shared/`. Default is `enabled: false` because most -cloud-default storage classes are RWO; enabling it on RWO will fail -PVC binding. - -**If your cluster lacks an RWX storage class** (most cloud defaults are -RWO), you MUST either: -- provision an RWX class (EFS, Azure Files, Longhorn-RWX, Rook-Ceph) and - set `sharedTmp.storageClassName`, or -- complete the long-term fix in TODOs below before splitting web/worker. +Resolved. End-of-call artifacts (recordings, transcript) are uploaded to +object storage directly from the web process +(`api/services/workflow_run_artifacts.py`) before the ARQ completion job +is enqueued; the job carries only the workflow run id. No file handoff +crosses a pod boundary, so web and arq-worker pods need no shared +volume. The remaining `/tmp` uses (`audio_file_cache.py`, +`knowledge_base_processing.py`) write and read within a single process. ## Open TODOs (deferred from v1) -- **Refactor `event_handlers.py` to handle uploads in-web.** Upload to - object storage from the web process and pass the resulting storage - key (not a local path) to the ARQ job. This removes the need for a - shared `/tmp` PVC entirely. - **Leader election for singletons.** Adopt Kubernetes lease-based leader election so `ari-manager` / `campaign-orchestrator` can run HA. Until then, replicas remain hard-coded to 1. @@ -162,7 +139,6 @@ deploy/helm/dograh/ ├── configmap.yaml ├── secret.yaml ├── migrate-job.yaml - ├── shared-tmp-pvc.yaml ├── web-deployment.yaml ├── web-service.yaml ├── web-hpa.yaml diff --git a/deploy/helm/dograh/templates/NOTES.txt b/deploy/helm/dograh/templates/NOTES.txt index ab74de81..74780de6 100644 --- a/deploy/helm/dograh/templates/NOTES.txt +++ b/deploy/helm/dograh/templates/NOTES.txt @@ -57,18 +57,6 @@ Alembic migrations run as a post-install / pre-upgrade hook. Inspect with: kubectl logs job/{{ include "dograh.migrate.fullname" . }} -n {{ .Release.Namespace }} {{- end }} -=== /tmp shared volume === -{{- if .Values.sharedTmp.enabled }} -sharedTmp.enabled=true — web and arq-worker pods mount a ReadWriteMany PVC -at {{ .Values.sharedTmp.mountPath }} so end-of-call uploads survive pod boundaries. -{{- else }} -WARNING: sharedTmp.enabled=false. End-of-call uploads (event_handlers.py → -ARQ s3_upload) hand off via /tmp paths. With separated web and worker pods -this WILL fail unless you have an RWX storage class configured. - -See deploy/helm/dograh/README.md "/tmp audit" section. -{{- end }} - === Singletons === ari-manager and campaign-orchestrator run with replicas=1 and strategy=Recreate by design. Do NOT scale these via kubectl scale — diff --git a/deploy/helm/dograh/templates/_helpers.tpl b/deploy/helm/dograh/templates/_helpers.tpl index d8469848..9c32f151 100644 --- a/deploy/helm/dograh/templates/_helpers.tpl +++ b/deploy/helm/dograh/templates/_helpers.tpl @@ -227,21 +227,3 @@ are added inline because they may need composition from subchart secrets. - secretRef: name: {{ include "dograh.secretName" . }} {{- end }} - -{{/* -Volume mounts for the shared-tmp PVC when enabled. -*/}} -{{- define "dograh.sharedTmpVolumeMounts" -}} -{{- if .Values.sharedTmp.enabled }} -- name: shared-tmp - mountPath: {{ .Values.sharedTmp.mountPath }} -{{- end }} -{{- end }} - -{{- define "dograh.sharedTmpVolumes" -}} -{{- if .Values.sharedTmp.enabled }} -- name: shared-tmp - persistentVolumeClaim: - claimName: {{ include "dograh.fullname" . }}-shared-tmp -{{- end }} -{{- end }} diff --git a/deploy/helm/dograh/templates/arq-worker-deployment.yaml b/deploy/helm/dograh/templates/arq-worker-deployment.yaml index 4af2fc5a..d52f1100 100644 --- a/deploy/helm/dograh/templates/arq-worker-deployment.yaml +++ b/deploy/helm/dograh/templates/arq-worker-deployment.yaml @@ -45,10 +45,6 @@ spec: {{- toYaml .Values.workers.livenessProbe | nindent 12 }} resources: {{- toYaml .Values.workers.resources | nindent 12 }} - volumeMounts: - {{- include "dograh.sharedTmpVolumeMounts" . | nindent 12 }} - volumes: - {{- include "dograh.sharedTmpVolumes" . | nindent 8 }} {{- with .Values.workers.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/deploy/helm/dograh/templates/shared-tmp-pvc.yaml b/deploy/helm/dograh/templates/shared-tmp-pvc.yaml deleted file mode 100644 index 5a3546f0..00000000 --- a/deploy/helm/dograh/templates/shared-tmp-pvc.yaml +++ /dev/null @@ -1,18 +0,0 @@ -{{- if .Values.sharedTmp.enabled }} -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: {{ include "dograh.fullname" . }}-shared-tmp - namespace: {{ .Release.Namespace }} - labels: - {{- include "dograh.labels" . | nindent 4 }} -spec: - accessModes: - - ReadWriteMany - {{- if .Values.sharedTmp.storageClassName }} - storageClassName: {{ .Values.sharedTmp.storageClassName | quote }} - {{- end }} - resources: - requests: - storage: {{ .Values.sharedTmp.size }} -{{- end }} diff --git a/deploy/helm/dograh/templates/web-deployment.yaml b/deploy/helm/dograh/templates/web-deployment.yaml index c4829d56..91b5d78e 100644 --- a/deploy/helm/dograh/templates/web-deployment.yaml +++ b/deploy/helm/dograh/templates/web-deployment.yaml @@ -68,10 +68,6 @@ spec: command: ["sh", "-c", "sleep {{ .Values.web.preStopSleepSeconds }}"] resources: {{- toYaml .Values.web.resources | nindent 12 }} - volumeMounts: - {{- include "dograh.sharedTmpVolumeMounts" . | nindent 12 }} - volumes: - {{- include "dograh.sharedTmpVolumes" . | nindent 8 }} {{- with .Values.web.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/deploy/helm/dograh/values.yaml b/deploy/helm/dograh/values.yaml index 2cb09b9c..ed9c1c33 100644 --- a/deploy/helm/dograh/values.yaml +++ b/deploy/helm/dograh/values.yaml @@ -150,28 +150,6 @@ secrets: langfusePublicKey: "" langfuseHost: "" -# ----------------------------------------------------------------------------- -# Shared /tmp PVC. -# -# AUDIT FINDING: api/services/pipecat/event_handlers.py writes WAV/transcript -# tempfiles in the web process and enqueues an ARQ job that reads those exact -# paths in the worker (api/tasks/s3_upload.py). In compose this works because -# all processes share the `shared-tmp` volume. In Kubernetes web and worker run -# in separate pods. Options: -# 1. Enable this PVC (ReadWriteMany required) to mount /tmp/dograh-shared -# into both web and arq-worker pods. Use this for v1. -# 2. Refactor event_handlers.py to upload from the web process and pass a -# storage key (not a local path) to the ARQ job. Preferred long-term; -# see deploy/helm/dograh/README.md "Open TODOs". -# If your cluster lacks RWX (most cloud default storage classes are RWO), -# you MUST take option (2) before splitting web and worker pods, or end-of- -# call uploads will fail silently. -sharedTmp: - enabled: false - storageClassName: "" # must be an RWX-capable class (e.g. efs-sc, azurefile, longhorn-rwx) - size: 10Gi - mountPath: /tmp/dograh-shared - # ----------------------------------------------------------------------------- # Web tier (FastAPI + WebSocket signaling) # ----------------------------------------------------------------------------- diff --git a/deploy/hostinger/docker-compose.yaml b/deploy/hostinger/docker-compose.yaml index 9504de45..efce0d12 100644 --- a/deploy/hostinger/docker-compose.yaml +++ b/deploy/hostinger/docker-compose.yaml @@ -138,8 +138,6 @@ services: api: image: ${REGISTRY:-dograhai}/dograh-api:${DOGRAH_VERSION:-latest} restart: unless-stopped - volumes: - - shared-tmp:/tmp environment: # production => drop private-IP host ICE candidates on a public VPS and # order TURN URIs UDP-first. Required for correct remote WebRTC. @@ -266,8 +264,6 @@ volumes: redis_data: minio-data: driver: local - shared-tmp: - driver: local networks: # Internal network for service-to-service traffic (db, redis, minio, coturn). diff --git a/docker-compose.yaml b/docker-compose.yaml index 6e07c396..9a6d7f7a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -142,8 +142,6 @@ services: api: image: ${REGISTRY:-dograhai}/dograh-api:latest - volumes: - - shared-tmp:/tmp environment: # Core application config ENVIRONMENT: "${ENVIRONMENT:-local}" @@ -330,8 +328,6 @@ volumes: redis_data: minio-data: driver: local - shared-tmp: - driver: local nginx-generated: driver: local coturn-generated: diff --git a/sdk/python/src/dograh_sdk/_generated_models.py b/sdk/python/src/dograh_sdk/_generated_models.py index ac441c34..5039f26e 100644 --- a/sdk/python/src/dograh_sdk/_generated_models.py +++ b/sdk/python/src/dograh_sdk/_generated_models.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: -# filename: dograh-openapi-XXXXXX.json.5rayRuwwwc -# timestamp: 2026-07-03T07:27:55+00:00 +# filename: dograh-openapi-XXXXXX.json.d7UvbEKHrl +# timestamp: 2026-07-03T12:48:12+00:00 from __future__ import annotations From 820727f25f7e93f226e81f8e22c34b15ec81199a Mon Sep 17 00:00:00 2001 From: Sabiha Khan <87858386+chewwbaka@users.noreply.github.com> Date: Fri, 3 Jul 2026 20:02:07 +0530 Subject: [PATCH 2/2] chore(main): release dograh 1.40.0 (#473) --- .release-please-manifest.json | 2 +- CHANGELOG.md | 30 ++++++++++++++++++++++++++++++ api/pyproject.toml | 2 +- ui/package.json | 2 +- 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index aefc8e17..d0538f75 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "1.39.0" + ".": "1.40.0" } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index d1d34b3e..899876a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,35 @@ # Changelog +## 1.40.0 (2026-07-03) + + + +## What's Changed +### Features +* feat: support inbound vonage calls by @chewwbaka in https://github.com/dograh-hq/dograh/pull/480 +* feat: better interrupt strategies by @a6kme in https://github.com/dograh-hq/dograh/pull/479 +* feat(webhooks): durable retrying delivery for final webhooks by @xTararAisx in https://github.com/dograh-hq/dograh/pull/478 +* feat: add Helm chart for Kubernetes deployment by @a6kme in https://github.com/dograh-hq/dograh/pull/365 +### Bug Fixes +* fix: fix initial greeting for realtime models by @a6kme in https://github.com/dograh-hq/dograh/pull/481 +* fix: guard Chatwoot bubble toggle until holder is in the DOM by @pk-198 in https://github.com/dograh-hq/dograh/pull/485 +### Documentation +* docs: fix dead entry points, add first-agent tutorial, explain unexplained features by @rushilbh27 in https://github.com/dograh-hq/dograh/pull/489 +* docs: add missing cross-links for machine and human readability by @rushilbh27 in https://github.com/dograh-hq/dograh/pull/492 +* docs: clarify Asterisk ARI websocket_client.conf URI and why /ws/ari 403s when tested directly by @mvanhorn in https://github.com/dograh-hq/dograh/pull/490 +### Other Changes +* embed cal and chatwoot bubble missing fix by @pk-198 in https://github.com/dograh-hq/dograh/pull/483 +* Docs/add japanese readme by @sscodeai in https://github.com/dograh-hq/dograh/pull/477 +* Implement cost calculator for Tuber by @Mohamed-Mamdouh in https://github.com/dograh-hq/dograh/pull/471 + +## New Contributors +* @pk-198 made their first contribution in https://github.com/dograh-hq/dograh/pull/483 +* @sscodeai made their first contribution in https://github.com/dograh-hq/dograh/pull/477 +* @rushilbh27 made their first contribution in https://github.com/dograh-hq/dograh/pull/489 +* @xTararAisx made their first contribution in https://github.com/dograh-hq/dograh/pull/478 + +**Full Changelog**: https://github.com/dograh-hq/dograh/compare/dograh-v1.39.0...dograh-v1.40.0 + ## 1.39.0 (2026-06-27) diff --git a/api/pyproject.toml b/api/pyproject.toml index 1f26cc2c..a258f8a9 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -1,5 +1,5 @@ [project] name = "dograh-api" -version = "1.39.0" +version = "1.40.0" description = "Backend API for Dograh voice AI platform" requires-python = ">=3.13,<3.14" diff --git a/ui/package.json b/ui/package.json index 39c4fcae..cffbc477 100644 --- a/ui/package.json +++ b/ui/package.json @@ -1,6 +1,6 @@ { "name": "ui", - "version": "1.39.0", + "version": "1.40.0", "private": true, "scripts": { "dev": "cross-env NODE_OPTIONS=--enable-source-maps next dev --turbopack",