chore: refactor file upload mechanism to avoid NFS dependency (#496)

* chore: refactor file upload mechanism to avoid NFS dependency

* add regression test for deregistration of calls

* fix: fix minio upload issue

* fix: make transcript upload async
This commit is contained in:
Abhishek 2026-07-03 20:01:52 +05:30 committed by GitHub
parent 79a4a3c9f1
commit a54ab519b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 370 additions and 401 deletions

View file

@ -1,23 +1,51 @@
from abc import ABC, abstractmethod
from typing import Any, BinaryIO, Dict, Optional
from typing import Any, Dict, Optional, Protocol
class AsyncReadable(Protocol):
"""Anything exposing ``await .read() -> bytes`` (aiofiles handles, in-memory wrappers)."""
async def read(self) -> bytes: ...
class _AsyncBytesReader:
"""Async file-like wrapper over in-memory bytes for acreate_file()."""
def __init__(self, data: bytes):
self._data = data
async def read(self) -> bytes:
return self._data
class BaseFileSystem(ABC):
"""Abstract base class for filesystem operations."""
@abstractmethod
async def acreate_file(self, file_path: str, content: BinaryIO) -> bool:
async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool:
"""Create a new file with the given content.
Args:
file_path: Path where the file should be created
content: File content as a binary stream
content: File content readable via ``await content.read()``
Returns:
bool: True if file was created successfully, False otherwise
"""
pass
async def acreate_file_from_bytes(self, file_path: str, data: bytes) -> bool:
"""Create a file directly from in-memory bytes (no local file needed).
Args:
file_path: Path where the file should be created
data: File content as bytes
Returns:
bool: True if file was created successfully, False otherwise
"""
return await self.acreate_file(file_path, _AsyncBytesReader(data))
@abstractmethod
async def aupload_file(self, local_path: str, destination_path: str) -> bool:
"""Upload a file from local path to destination.

View file

@ -1,11 +1,11 @@
import asyncio
import os
from datetime import datetime
from typing import BinaryIO, Optional
from typing import Optional
import aiofiles
from .base import BaseFileSystem
from .base import AsyncReadable, BaseFileSystem
class LocalFileSystem(BaseFileSystem):
@ -24,7 +24,7 @@ class LocalFileSystem(BaseFileSystem):
"""Get the full path by joining with base path."""
return os.path.join(self.base_path, file_path)
async def acreate_file(self, file_path: str, content: BinaryIO) -> bool:
async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool:
try:
full_path = self._get_full_path(file_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)

View file

@ -1,12 +1,13 @@
import asyncio
import io
import json
from typing import Any, BinaryIO, Dict, Optional
from typing import Any, Dict, Optional
from loguru import logger
from minio import Minio
from minio.error import S3Error
from .base import BaseFileSystem
from .base import AsyncReadable, BaseFileSystem
class MinioFileSystem(BaseFileSystem):
@ -89,15 +90,16 @@ class MinioFileSystem(BaseFileSystem):
logger.debug(f"Bucket setup note: {e}")
pass
async def acreate_file(self, file_path: str, content: BinaryIO) -> bool:
async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool:
try:
data = await content.read()
def _put():
# The MinIO SDK requires a stream with .read(), not raw bytes.
self.client.put_object(
self.bucket_name,
file_path,
data=bytes(data),
data=io.BytesIO(data),
length=len(data),
)

View file

@ -1,6 +1,6 @@
from typing import Any, BinaryIO, Dict, NoReturn, Optional
from typing import Any, Dict, NoReturn, Optional
from .base import BaseFileSystem
from .base import AsyncReadable, BaseFileSystem
class NullFileSystem(BaseFileSystem):
@ -16,7 +16,7 @@ class NullFileSystem(BaseFileSystem):
"Set ENVIRONMENT to a non-test value or inject a real filesystem fixture."
)
async def acreate_file(self, file_path: str, content: BinaryIO) -> bool:
async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool:
self._fail("acreate_file")
async def aupload_file(self, local_path: str, destination_path: str) -> bool:

View file

@ -1,10 +1,10 @@
from typing import Any, BinaryIO, Dict, Optional
from typing import Any, Dict, Optional
import aioboto3
from botocore.config import Config
from botocore.exceptions import ClientError
from .base import BaseFileSystem
from .base import AsyncReadable, BaseFileSystem
class S3FileSystem(BaseFileSystem):
@ -57,7 +57,7 @@ class S3FileSystem(BaseFileSystem):
kwargs["config"] = self._config
return kwargs
async def acreate_file(self, file_path: str, content: BinaryIO) -> bool:
async def acreate_file(self, file_path: str, content: AsyncReadable) -> bool:
try:
async with self.session.client("s3", **self._client_kwargs()) as s3_client:
await s3_client.put_object(

View file

@ -16,6 +16,7 @@ from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggr
from api.services.pipecat.tracing_config import get_trace_url
from api.services.posthog_client import capture_event
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow_run_artifacts import upload_workflow_run_artifacts
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
from pipecat.frames.frames import (
@ -361,50 +362,49 @@ def register_event_handlers(
except Exception as e:
logger.error(f"Error saving workflow run logs: {e}", exc_info=True)
# Write buffers to temp files and enqueue combined processing task
audio_temp_path = None
user_audio_temp_path = None
bot_audio_temp_path = None
transcript_temp_path = None
# Upload artifacts straight from the in-memory buffers so nothing has
# to cross a process/host boundary via temp files. Must complete
# before the completion job is enqueued so QA and webhooks see the
# artifacts in storage.
try:
mixed_audio_wav = None
user_audio_wav = None
bot_audio_wav = None
if not in_memory_audio_buffers.mixed.is_empty:
audio_temp_path = (
await in_memory_audio_buffers.mixed.write_to_temp_file()
)
mixed_audio_wav = await in_memory_audio_buffers.mixed.to_wav_bytes()
else:
logger.debug("Audio buffer is empty, skipping upload")
if not in_memory_audio_buffers.user.is_empty:
user_audio_temp_path = (
await in_memory_audio_buffers.user.write_to_temp_file()
)
user_audio_wav = await in_memory_audio_buffers.user.to_wav_bytes()
else:
logger.debug("User audio buffer is empty, skipping upload")
if not in_memory_audio_buffers.bot.is_empty:
bot_audio_temp_path = (
await in_memory_audio_buffers.bot.write_to_temp_file()
)
bot_audio_wav = await in_memory_audio_buffers.bot.to_wav_bytes()
else:
logger.debug("Bot audio buffer is empty, skipping upload")
transcript_temp_path = in_memory_logs_buffer.write_transcript_to_temp_file()
if not transcript_temp_path:
transcript_text = in_memory_logs_buffer.generate_transcript_text()
if not transcript_text:
logger.debug("No transcript events in logs buffer, skipping upload")
await upload_workflow_run_artifacts(
workflow_run_id,
mixed_audio_wav=mixed_audio_wav,
user_audio_wav=user_audio_wav,
bot_audio_wav=bot_audio_wav,
transcript_text=transcript_text,
)
except Exception as e:
logger.error(f"Error preparing buffers for S3 upload: {e}", exc_info=True)
logger.error(f"Error uploading call artifacts: {e}", exc_info=True)
# Combined task: uploads artifacts, runs integrations (including QA),
# then calculates cost (so QA token usage is captured in usage_info)
# Combined task: runs integrations (including QA), then calculates
# cost (so QA token usage is captured in usage_info)
await enqueue_job(
FunctionNames.PROCESS_WORKFLOW_COMPLETION,
workflow_run_id,
audio_temp_path,
transcript_temp_path,
user_audio_temp_path,
bot_audio_temp_path,
)
# Return the buffer so it can be passed to other handlers

View file

@ -1,5 +1,5 @@
import asyncio
import tempfile
import io
import wave
from datetime import UTC, datetime
from typing import List, Optional
@ -15,7 +15,7 @@ from pipecat.utils.enums import RealtimeFeedbackType
class InMemoryAudioBuffer:
"""Buffer audio data in memory during a call, then write to temp file on disconnect."""
"""Buffer audio data in memory during a call, then encode to WAV bytes on disconnect."""
def __init__(self, workflow_run_id: int, sample_rate: int, num_channels: int = 1):
self._workflow_run_id = workflow_run_id
@ -41,28 +41,30 @@ class InMemoryAudioBuffer:
f"Appended {len(pcm_data)} bytes to audio buffer. Total size: {self._total_size}"
)
async def write_to_temp_file(self) -> str:
"""Write audio data to a temporary WAV file and return the path."""
async def to_wav_bytes(self) -> bytes:
"""Encode the buffered PCM data as an in-memory WAV file."""
async with self._lock:
temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
logger.debug(
f"Writing audio buffer to temp file {temp_file.name} for workflow {self._workflow_run_id}"
)
chunks = list(self._chunks)
# Write WAV header and PCM data
with wave.open(temp_file.name, "wb") as wf:
def _encode() -> bytes:
wav_io = io.BytesIO()
with wave.open(wav_io, "wb") as wf:
wf.setnchannels(self._num_channels)
wf.setsampwidth(2) # 16-bit audio
wf.setframerate(self._sample_rate)
# Concatenate all chunks
for chunk in self._chunks:
for chunk in chunks:
wf.writeframes(chunk)
return wav_io.getvalue()
logger.info(
f"Successfully wrote {self._total_size} bytes of audio to {temp_file.name}"
)
return temp_file.name
# Encoding is mostly memcpy but can touch ~100MB; keep it off the event loop
data = await asyncio.to_thread(_encode)
logger.info(
f"Encoded {self._total_size} bytes of audio to {len(data)} WAV bytes "
f"for workflow {self._workflow_run_id}"
)
return data
@property
def is_empty(self) -> bool:
@ -172,27 +174,6 @@ class InMemoryLogsBuffer:
"""
return _generate_transcript_text(self._sorted_events())
def write_transcript_to_temp_file(self) -> Optional[str]:
"""Write transcript to a temporary text file and return the path.
Returns None if there are no transcript events.
"""
content = self.generate_transcript_text()
if not content:
return None
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
logger.debug(
f"Writing transcript to temp file {temp_file.name} for workflow {self._workflow_run_id}"
)
temp_file.write(content)
temp_file.close()
logger.info(
f"Successfully wrote {len(content)} chars of transcript to {temp_file.name}"
)
return temp_file.name
@property
def is_empty(self) -> bool:
"""Check if the buffer is empty."""

View file

@ -0,0 +1,126 @@
"""Upload end-of-call artifacts (recordings, transcript) to object storage.
Called from the pipeline process itself, straight from the in-memory call
buffers, so no local file ever has to cross a process/host boundary (no
shared /tmp between web and ARQ workers). Uploads happen before the
workflow-completion job is enqueued so QA and webhooks see the artifacts
in storage.
"""
from loguru import logger
from api.db import db_client
from api.services.storage import get_current_storage_backend, storage_fs
def _recording_metadata(storage_key: str, storage_backend: str, track: str) -> dict:
return {
"storage_key": storage_key,
"storage_backend": storage_backend,
"format": "wav",
"track": track,
}
async def _upload_bytes(
workflow_run_id: int,
data: bytes,
storage_key: str,
label: str,
) -> bool:
try:
logger.debug(f"{label} size: {len(data)} bytes")
if await storage_fs.acreate_file_from_bytes(storage_key, data):
logger.info(f"Successfully uploaded {label}: {storage_key}")
return True
logger.error(
f"Storage backend rejected {label} upload for workflow "
f"{workflow_run_id}: {storage_key}"
)
return False
except Exception as e:
logger.error(f"Error uploading {label} for workflow {workflow_run_id}: {e}")
return False
async def upload_workflow_run_artifacts(
workflow_run_id: int,
*,
mixed_audio_wav: bytes | None = None,
user_audio_wav: bytes | None = None,
bot_audio_wav: bytes | None = None,
transcript_text: str | None = None,
) -> None:
"""Upload call artifacts to object storage and persist their metadata.
Each artifact is uploaded independently; a failure is logged and the
remaining artifacts are still attempted.
"""
storage_backend = get_current_storage_backend()
recordings_metadata: dict[str, dict] = {}
if mixed_audio_wav:
recording_url = f"recordings/{workflow_run_id}.wav"
logger.info(
f"Uploading mixed audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_bytes(
workflow_run_id, mixed_audio_wav, recording_url, "mixed audio"
):
recordings_metadata["mixed"] = _recording_metadata(
recording_url, storage_backend.value, "mixed"
)
await db_client.update_workflow_run(
run_id=workflow_run_id,
recording_url=recording_url,
storage_backend=storage_backend.value,
)
if user_audio_wav:
user_recording_url = f"recordings/{workflow_run_id}/user.wav"
logger.info(
f"Uploading user audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_bytes(
workflow_run_id, user_audio_wav, user_recording_url, "user audio"
):
recordings_metadata["user"] = _recording_metadata(
user_recording_url, storage_backend.value, "user"
)
if bot_audio_wav:
bot_recording_url = f"recordings/{workflow_run_id}/bot.wav"
logger.info(
f"Uploading bot audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_bytes(
workflow_run_id, bot_audio_wav, bot_recording_url, "bot audio"
):
recordings_metadata["bot"] = _recording_metadata(
bot_recording_url, storage_backend.value, "bot"
)
if recordings_metadata:
await db_client.update_workflow_run(
run_id=workflow_run_id,
storage_backend=storage_backend.value,
extra={"recordings": recordings_metadata},
)
if transcript_text:
transcript_url = f"transcripts/{workflow_run_id}.txt"
logger.info(
f"Uploading transcript to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_bytes(
workflow_run_id,
transcript_text.encode("utf-8"),
transcript_url,
"transcript",
):
await db_client.update_workflow_run(
run_id=workflow_run_id,
transcript_url=transcript_url,
storage_backend=storage_backend.value,
)

View file

@ -45,7 +45,6 @@ from api.tasks.campaign_tasks import (
)
from api.tasks.knowledge_base_processing import process_knowledge_base_document
from api.tasks.run_integrations import run_integrations_post_workflow_run
from api.tasks.s3_upload import upload_voicemail_audio_to_s3
from api.tasks.webhook_delivery import deliver_webhook, sweep_webhook_deliveries
from api.tasks.workflow_completion import process_workflow_completion
@ -53,7 +52,6 @@ from api.tasks.workflow_completion import process_workflow_completion
class WorkerSettings:
functions = [
run_integrations_post_workflow_run,
upload_voicemail_audio_to_s3,
process_workflow_completion,
sync_campaign_source,
process_campaign_batch,

View file

@ -1,7 +1,6 @@
class FunctionNames:
RUN_INTEGRATIONS_POST_WORKFLOW_RUN = "run_integrations_post_workflow_run"
PROCESS_WORKFLOW_COMPLETION = "process_workflow_completion"
UPLOAD_VOICEMAIL_AUDIO_TO_S3 = "upload_voicemail_audio_to_s3"
SYNC_CAMPAIGN_SOURCE = "sync_campaign_source"
PROCESS_CAMPAIGN_BATCH = "process_campaign_batch"
PROCESS_KNOWLEDGE_BASE_DOCUMENT = "process_knowledge_base_document"

View file

@ -1,67 +0,0 @@
import os
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from api.services.storage import storage_fs
async def upload_voicemail_audio_to_s3(
_ctx,
workflow_run_id: int,
temp_file_path: str,
s3_key: str,
):
"""Upload voicemail detection audio from temp file to S3.
Handles voicemail-specific paths and doesn't update the workflow run's
recording_url field.
Args:
_ctx: ARQ context (unused)
workflow_run_id: The workflow run ID
temp_file_path: Path to the temporary WAV file
s3_key: The S3 key where the file should be uploaded
"""
run_id = str(workflow_run_id)
set_current_run_id(run_id)
logger.info(f"Starting voicemail audio upload to S3 from {temp_file_path}")
try:
# Verify temp file exists
if not os.path.exists(temp_file_path):
logger.error(f"Temp voicemail audio file not found: {temp_file_path}")
raise FileNotFoundError(
f"Temp voicemail audio file not found: {temp_file_path}"
)
file_size = os.path.getsize(temp_file_path)
logger.debug(f"Voicemail audio file size: {file_size} bytes")
# Upload to S3
upload_ok = await storage_fs.aupload_file(temp_file_path, s3_key)
if upload_ok:
logger.info(f"Successfully uploaded voicemail audio to S3: {s3_key}")
else:
logger.error(
f"Failed to upload voicemail audio to S3 for workflow {workflow_run_id}"
)
raise Exception(f"S3 upload failed for {s3_key}")
except Exception as e:
logger.error(
f"Error uploading voicemail audio to S3 for workflow {workflow_run_id}: {e}"
)
raise
finally:
# Clean up temp file
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.debug(f"Cleaned up temp voicemail audio file: {temp_file_path}")
except Exception as e:
logger.warning(
f"Failed to clean up temp voicemail audio file {temp_file_path}: {e}"
)

View file

@ -1,178 +1,118 @@
import asyncio
import os
from typing import Optional
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from api.db import db_client
from api.services.storage import get_current_storage_backend, storage_fs
from api.services.workflow_run_artifacts import upload_workflow_run_artifacts
from api.services.workflow_run_billing import (
report_completed_workflow_run_platform_usage,
)
from api.tasks.run_integrations import run_integrations_post_workflow_run
def _recording_metadata(storage_key: str, storage_backend: str, track: str) -> dict:
return {
"storage_key": storage_key,
"storage_backend": storage_backend,
"format": "wav",
"track": track,
}
async def _upload_temp_file(
workflow_run_id: int,
temp_file_path: str,
storage_key: str,
label: str,
) -> bool:
def _read_and_remove_temp_file(temp_file_path: str | None, label: str) -> bytes | None:
if not temp_file_path:
return None
try:
if not os.path.exists(temp_file_path):
logger.warning(f"{label} temp file not found: {temp_file_path}")
return False
file_size = os.path.getsize(temp_file_path)
logger.debug(f"{label} file size: {file_size} bytes")
await storage_fs.aupload_file(temp_file_path, storage_key)
logger.info(f"Successfully uploaded {label}: {storage_key}")
return True
return None
with open(temp_file_path, "rb") as f:
data = f.read()
os.remove(temp_file_path)
return data
except Exception as e:
logger.error(f"Error uploading {label} for workflow {workflow_run_id}: {e}")
return False
finally:
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.debug(f"Cleaned up temp {label} file: {temp_file_path}")
except Exception as e:
logger.warning(f"Failed to clean up temp {label} file: {e}")
logger.error(f"Error reading legacy {label} temp file {temp_file_path}: {e}")
return None
async def _upload_legacy_temp_artifacts(
workflow_run_id: int,
audio_temp_path: str | None,
transcript_temp_path: str | None,
user_audio_temp_path: str | None,
bot_audio_temp_path: str | None,
) -> None:
"""Handle jobs enqueued before uploads moved into the pipeline process.
Pre-refactor web workers passed local temp-file paths; upload them if this
worker can still see the files (same host / shared volume).
Deprecated: remove once no pre-refactor jobs can remain in the queue.
"""
logger.info(
f"Processing legacy temp-file artifacts for workflow run {workflow_run_id}"
)
transcript_bytes = await asyncio.to_thread(
_read_and_remove_temp_file, transcript_temp_path, "transcript"
)
await upload_workflow_run_artifacts(
workflow_run_id,
mixed_audio_wav=await asyncio.to_thread(
_read_and_remove_temp_file, audio_temp_path, "mixed audio"
),
user_audio_wav=await asyncio.to_thread(
_read_and_remove_temp_file, user_audio_temp_path, "user audio"
),
bot_audio_wav=await asyncio.to_thread(
_read_and_remove_temp_file, bot_audio_temp_path, "bot audio"
),
transcript_text=(
transcript_bytes.decode("utf-8") if transcript_bytes else None
),
)
async def process_workflow_completion(
_ctx,
workflow_run_id: int,
audio_temp_path: Optional[str] = None,
transcript_temp_path: Optional[str] = None,
user_audio_temp_path: Optional[str] = None,
bot_audio_temp_path: Optional[str] = None,
audio_temp_path: str | None = None,
transcript_temp_path: str | None = None,
user_audio_temp_path: str | None = None,
bot_audio_temp_path: str | None = None,
):
"""Process workflow completion: upload artifacts and run integrations.
"""Process workflow completion: run integrations and report billing.
This task combines audio upload, transcript upload, and webhook integrations
into a single sequential task to ensure integrations run after uploads complete.
Recording/transcript uploads happen in the pipeline process itself
(api/services/workflow_run_artifacts.py) before this job is enqueued,
so this task needs no shared filesystem with the web tier. The temp-path
arguments only exist for jobs enqueued by pre-refactor web workers.
Args:
_ctx: ARQ context (unused)
workflow_run_id: The workflow run ID
audio_temp_path: Optional path to temp audio file
transcript_temp_path: Optional path to temp transcript file
user_audio_temp_path: Optional path to temp user-track audio file
bot_audio_temp_path: Optional path to temp bot-track audio file
audio_temp_path: Deprecated, pre-refactor jobs only
transcript_temp_path: Deprecated, pre-refactor jobs only
user_audio_temp_path: Deprecated, pre-refactor jobs only
bot_audio_temp_path: Deprecated, pre-refactor jobs only
"""
run_id = str(workflow_run_id)
set_current_run_id(run_id)
logger.info(f"Processing workflow completion for run {workflow_run_id}")
storage_backend = get_current_storage_backend()
# Step 1: Upload audio if provided
recordings_metadata: dict[str, dict] = {}
if audio_temp_path:
recording_url = f"recordings/{workflow_run_id}.wav"
logger.info(
f"Uploading mixed audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_temp_file(
workflow_run_id, audio_temp_path, recording_url, "mixed audio"
):
recordings_metadata["mixed"] = _recording_metadata(
recording_url, storage_backend.value, "mixed"
)
await db_client.update_workflow_run(
run_id=workflow_run_id,
recording_url=recording_url,
storage_backend=storage_backend.value,
)
if user_audio_temp_path:
user_recording_url = f"recordings/{workflow_run_id}/user.wav"
logger.info(
f"Uploading user audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_temp_file(
workflow_run_id, user_audio_temp_path, user_recording_url, "user audio"
):
recordings_metadata["user"] = _recording_metadata(
user_recording_url, storage_backend.value, "user"
)
if bot_audio_temp_path:
bot_recording_url = f"recordings/{workflow_run_id}/bot.wav"
logger.info(
f"Uploading bot audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
if await _upload_temp_file(
workflow_run_id, bot_audio_temp_path, bot_recording_url, "bot audio"
):
recordings_metadata["bot"] = _recording_metadata(
bot_recording_url, storage_backend.value, "bot"
)
if recordings_metadata:
await db_client.update_workflow_run(
run_id=workflow_run_id,
storage_backend=storage_backend.value,
extra={"recordings": recordings_metadata},
if (
audio_temp_path
or transcript_temp_path
or user_audio_temp_path
or bot_audio_temp_path
):
await _upload_legacy_temp_artifacts(
workflow_run_id,
audio_temp_path,
transcript_temp_path,
user_audio_temp_path,
bot_audio_temp_path,
)
# Step 2: Upload transcript if provided
if transcript_temp_path:
try:
if os.path.exists(transcript_temp_path):
file_size = os.path.getsize(transcript_temp_path)
logger.debug(f"Transcript file size: {file_size} bytes")
transcript_url = f"transcripts/{workflow_run_id}.txt"
logger.info(
f"Uploading transcript to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
await storage_fs.aupload_file(transcript_temp_path, transcript_url)
await db_client.update_workflow_run(
run_id=workflow_run_id,
transcript_url=transcript_url,
storage_backend=storage_backend.value,
)
logger.info(f"Successfully uploaded transcript: {transcript_url}")
else:
logger.warning(
f"Transcript temp file not found: {transcript_temp_path}"
)
except Exception as e:
logger.error(
f"Error uploading transcript for workflow {workflow_run_id}: {e}"
)
finally:
if transcript_temp_path and os.path.exists(transcript_temp_path):
try:
os.remove(transcript_temp_path)
logger.debug(
f"Cleaned up temp transcript file: {transcript_temp_path}"
)
except Exception as e:
logger.warning(f"Failed to clean up temp transcript file: {e}")
# Step 3: Run integrations including QA analysis (after uploads are complete)
# Run integrations including QA analysis (after uploads are complete)
try:
await run_integrations_post_workflow_run(_ctx, workflow_run_id)
except Exception as e:
logger.error(f"Error running integrations for workflow {workflow_run_id}: {e}")
# Step 4: Notify MPS after completion. MPS owns credit accounting.
# Notify MPS after completion. MPS owns credit accounting.
try:
await report_completed_workflow_run_platform_usage(workflow_run_id)
except Exception as e:

View file

@ -21,6 +21,7 @@ from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.enums import WorkflowRunMode, WorkflowRunState
from api.services.pipecat import active_calls
from api.services.pipecat.audio_config import create_audio_config
from api.services.pipecat.run_pipeline import _run_pipeline
from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started
@ -135,3 +136,74 @@ async def test_run_pipeline_fires_initial_response_and_completes_run(
# on_pipeline_finished merges call_tags into gathered_context.
assert "Start" in refreshed.gathered_context.get("nodes_visited", [])
assert "call_tags" in refreshed.gathered_context
@pytest.mark.asyncio
async def test_call_stays_registered_for_drain_until_artifacts_uploaded(
workflow_run_setup, monkeypatch
):
"""The active-call registry must not drop a run while its artifacts are
still uploading: deploy draining polls the registry and SIGTERMs the
worker at zero, which would kill in-flight uploads.
The ordering rests on pipecat waiting for spawned ``on_pipeline_finished``
handler tasks before ``PipelineWorker.run()`` and therefore
``run_pipeline_worker()`` returns, with ``unregister_active_call`` in
the ``finally`` after that. This test pins the guarantee: a slow upload
samples the registry mid-flight and must still see the call registered.
"""
workflow_run, user, workflow = workflow_run_setup
active_calls._active_run_ids.clear()
run_task_ref: list[asyncio.Task] = []
observed: dict = {}
async def slow_upload(workflow_run_id, **_kwargs):
# Give _run_pipeline a chance to (incorrectly) return and unregister
# while the upload is still in flight, then sample the registry.
await asyncio.sleep(0.2)
observed["count_during_upload"] = active_calls.active_call_count()
observed["run_task_done_during_upload"] = run_task_ref[0].done()
monkeypatch.setattr(
"api.services.pipecat.event_handlers.upload_workflow_run_artifacts",
slow_upload,
)
transport = MockTransport(
TransportParams(audio_in_enabled=True, audio_out_enabled=True)
)
captured_task: list = []
audio_config = create_audio_config(WorkflowRunMode.SMALLWEBRTC.value)
with patch_run_pipeline_externals(captured_task):
run_task = asyncio.create_task(
_run_pipeline(
transport=transport,
workflow_id=workflow.id,
workflow_run_id=workflow_run.id,
user_id=user.id,
audio_config=audio_config,
user_provider_id=user.provider_id,
)
)
run_task_ref.append(run_task)
for _ in range(60):
if captured_task or run_task.done():
break
await asyncio.sleep(0.05)
if run_task.done() and not captured_task:
run_task.result() # re-raise the failure
assert captured_task, "create_pipeline_task was never invoked"
pipeline_task = captured_task[0]
await wait_for_pipeline_worker_started(
pipeline_task, timeout=3.0, run_task=run_task
)
assert active_calls.active_call_count() == 1
await pipeline_task.cancel()
await asyncio.wait_for(run_task, timeout=5.0)
assert observed, "upload_workflow_run_artifacts was never called"
assert observed["count_during_upload"] == 1
assert observed["run_task_done_during_upload"] is False
assert active_calls.active_call_count() == 0

View file

@ -71,39 +71,16 @@ silent. Each is exposed in `values.yaml` for operator override.
## `/tmp` audit (review fix #6)
The current docker-compose mounts a `shared-tmp` volume across all
logical services so file handoffs between processes Just Work. In
Kubernetes with separated pods this is broken by default.
**Findings:**
| File | Process | Behavior | Cross-pod? |
|------|---------|----------|------------|
| `api/services/pipecat/event_handlers.py` (lines 364383) | **web** | Writes WAV + transcript via `NamedTemporaryFile`, then `enqueue_job(...)` to ARQ with the local path | **YES — broken** |
| `api/tasks/s3_upload.py` | **arq-worker** | Receives `temp_file_path`, `os.path.exists`, uploads, deletes | **reads from web's path** |
| `api/services/pipecat/in_memory_buffers.py` | web | Writes tempfiles consumed in the same process | No |
| `api/services/pipecat/audio_file_cache.py` | web | Per-process cache | No |
| `api/tasks/knowledge_base_processing.py` | arq-worker | Writes + reads in the same task | No |
**Mitigation in this chart:** `sharedTmp.enabled` flag in `values.yaml`.
When enabled, the chart creates a `ReadWriteMany` PVC mounted into
both `dograh-web` and `dograh-arq-worker` at
`/tmp/dograh-shared/`. Default is `enabled: false` because most
cloud-default storage classes are RWO; enabling it on RWO will fail
PVC binding.
**If your cluster lacks an RWX storage class** (most cloud defaults are
RWO), you MUST either:
- provision an RWX class (EFS, Azure Files, Longhorn-RWX, Rook-Ceph) and
set `sharedTmp.storageClassName`, or
- complete the long-term fix in TODOs below before splitting web/worker.
Resolved. End-of-call artifacts (recordings, transcript) are uploaded to
object storage directly from the web process
(`api/services/workflow_run_artifacts.py`) before the ARQ completion job
is enqueued; the job carries only the workflow run id. No file handoff
crosses a pod boundary, so web and arq-worker pods need no shared
volume. The remaining `/tmp` uses (`audio_file_cache.py`,
`knowledge_base_processing.py`) write and read within a single process.
## Open TODOs (deferred from v1)
- **Refactor `event_handlers.py` to handle uploads in-web.** Upload to
object storage from the web process and pass the resulting storage
key (not a local path) to the ARQ job. This removes the need for a
shared `/tmp` PVC entirely.
- **Leader election for singletons.** Adopt Kubernetes lease-based
leader election so `ari-manager` / `campaign-orchestrator` can run
HA. Until then, replicas remain hard-coded to 1.
@ -162,7 +139,6 @@ deploy/helm/dograh/
├── configmap.yaml
├── secret.yaml
├── migrate-job.yaml
├── shared-tmp-pvc.yaml
├── web-deployment.yaml
├── web-service.yaml
├── web-hpa.yaml

View file

@ -57,18 +57,6 @@ Alembic migrations run as a post-install / pre-upgrade hook. Inspect with:
kubectl logs job/{{ include "dograh.migrate.fullname" . }} -n {{ .Release.Namespace }}
{{- end }}
=== /tmp shared volume ===
{{- if .Values.sharedTmp.enabled }}
sharedTmp.enabled=true — web and arq-worker pods mount a ReadWriteMany PVC
at {{ .Values.sharedTmp.mountPath }} so end-of-call uploads survive pod boundaries.
{{- else }}
WARNING: sharedTmp.enabled=false. End-of-call uploads (event_handlers.py →
ARQ s3_upload) hand off via /tmp paths. With separated web and worker pods
this WILL fail unless you have an RWX storage class configured.
See deploy/helm/dograh/README.md "/tmp audit" section.
{{- end }}
=== Singletons ===
ari-manager and campaign-orchestrator run with replicas=1 and
strategy=Recreate by design. Do NOT scale these via kubectl scale —

View file

@ -227,21 +227,3 @@ are added inline because they may need composition from subchart secrets.
- secretRef:
name: {{ include "dograh.secretName" . }}
{{- end }}
{{/*
Volume mounts for the shared-tmp PVC when enabled.
*/}}
{{- define "dograh.sharedTmpVolumeMounts" -}}
{{- if .Values.sharedTmp.enabled }}
- name: shared-tmp
mountPath: {{ .Values.sharedTmp.mountPath }}
{{- end }}
{{- end }}
{{- define "dograh.sharedTmpVolumes" -}}
{{- if .Values.sharedTmp.enabled }}
- name: shared-tmp
persistentVolumeClaim:
claimName: {{ include "dograh.fullname" . }}-shared-tmp
{{- end }}
{{- end }}

View file

@ -45,10 +45,6 @@ spec:
{{- toYaml .Values.workers.livenessProbe | nindent 12 }}
resources:
{{- toYaml .Values.workers.resources | nindent 12 }}
volumeMounts:
{{- include "dograh.sharedTmpVolumeMounts" . | nindent 12 }}
volumes:
{{- include "dograh.sharedTmpVolumes" . | nindent 8 }}
{{- with .Values.workers.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View file

@ -1,18 +0,0 @@
{{- if .Values.sharedTmp.enabled }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ include "dograh.fullname" . }}-shared-tmp
namespace: {{ .Release.Namespace }}
labels:
{{- include "dograh.labels" . | nindent 4 }}
spec:
accessModes:
- ReadWriteMany
{{- if .Values.sharedTmp.storageClassName }}
storageClassName: {{ .Values.sharedTmp.storageClassName | quote }}
{{- end }}
resources:
requests:
storage: {{ .Values.sharedTmp.size }}
{{- end }}

View file

@ -68,10 +68,6 @@ spec:
command: ["sh", "-c", "sleep {{ .Values.web.preStopSleepSeconds }}"]
resources:
{{- toYaml .Values.web.resources | nindent 12 }}
volumeMounts:
{{- include "dograh.sharedTmpVolumeMounts" . | nindent 12 }}
volumes:
{{- include "dograh.sharedTmpVolumes" . | nindent 8 }}
{{- with .Values.web.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View file

@ -150,28 +150,6 @@ secrets:
langfusePublicKey: ""
langfuseHost: ""
# -----------------------------------------------------------------------------
# Shared /tmp PVC.
#
# AUDIT FINDING: api/services/pipecat/event_handlers.py writes WAV/transcript
# tempfiles in the web process and enqueues an ARQ job that reads those exact
# paths in the worker (api/tasks/s3_upload.py). In compose this works because
# all processes share the `shared-tmp` volume. In Kubernetes web and worker run
# in separate pods. Options:
# 1. Enable this PVC (ReadWriteMany required) to mount /tmp/dograh-shared
# into both web and arq-worker pods. Use this for v1.
# 2. Refactor event_handlers.py to upload from the web process and pass a
# storage key (not a local path) to the ARQ job. Preferred long-term;
# see deploy/helm/dograh/README.md "Open TODOs".
# If your cluster lacks RWX (most cloud default storage classes are RWO),
# you MUST take option (2) before splitting web and worker pods, or end-of-
# call uploads will fail silently.
sharedTmp:
enabled: false
storageClassName: "" # must be an RWX-capable class (e.g. efs-sc, azurefile, longhorn-rwx)
size: 10Gi
mountPath: /tmp/dograh-shared
# -----------------------------------------------------------------------------
# Web tier (FastAPI + WebSocket signaling)
# -----------------------------------------------------------------------------

View file

@ -138,8 +138,6 @@ services:
api:
image: ${REGISTRY:-dograhai}/dograh-api:${DOGRAH_VERSION:-latest}
restart: unless-stopped
volumes:
- shared-tmp:/tmp
environment:
# production => drop private-IP host ICE candidates on a public VPS and
# order TURN URIs UDP-first. Required for correct remote WebRTC.
@ -266,8 +264,6 @@ volumes:
redis_data:
minio-data:
driver: local
shared-tmp:
driver: local
networks:
# Internal network for service-to-service traffic (db, redis, minio, coturn).

View file

@ -142,8 +142,6 @@ services:
api:
image: ${REGISTRY:-dograhai}/dograh-api:latest
volumes:
- shared-tmp:/tmp
environment:
# Core application config
ENVIRONMENT: "${ENVIRONMENT:-local}"
@ -330,8 +328,6 @@ volumes:
redis_data:
minio-data:
driver: local
shared-tmp:
driver: local
nginx-generated:
driver: local
coturn-generated:

View file

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: dograh-openapi-XXXXXX.json.5rayRuwwwc
# timestamp: 2026-07-03T07:27:55+00:00
# filename: dograh-openapi-XXXXXX.json.d7UvbEKHrl
# timestamp: 2026-07-03T12:48:12+00:00
from __future__ import annotations