chore: add custom recordings documentation

This commit is contained in:
Abhishek Kumar 2026-03-25 15:44:54 +05:30
parent 2fa4191d9b
commit dc800bdd63
6 changed files with 211 additions and 37 deletions

View file

@ -40,6 +40,38 @@ class PresignedUploadUrlResponse(BaseModel):
router = APIRouter(prefix="/s3", tags=["s3"])
def _extract_org_id_from_key(key: str) -> Optional[int]:
"""Try to extract an organization ID from a storage key.
Matches keys of the form ``{prefix}/{org_id}/...`` where *org_id* is a
positive integer. Returns ``None`` when the pattern does not match.
"""
parts = key.split("/")
if len(parts) >= 3 and parts[1].isdigit():
return int(parts[1])
return None
def _extract_legacy_workflow_run_id(key: str) -> Optional[int]:
"""Extract a workflow_run_id from legacy key formats.
Supports:
- ``transcripts/{run_id}.txt``
- ``recordings/{run_id}.wav``
Returns ``None`` when the key does not match a legacy pattern.
"""
if key.startswith("transcripts/") and key.endswith(".txt"):
run_id_str = key[len("transcripts/") : -4]
elif key.startswith("recordings/") and key.endswith(".wav"):
run_id_str = key[len("recordings/") : -4]
else:
return None
return int(run_id_str) if run_id_str.isdigit() else None
# Keep for backward compat with file-metadata endpoint
async def _validate_and_extract_workflow_run_id(
key: str, allow_special_paths: bool = False
) -> Optional[int]:
@ -118,64 +150,68 @@ async def get_signed_url(
key: Annotated[str, Query(description="S3 object key")],
expires_in: int = 3600,
inline: bool = False,
storage_backend: Annotated[
Optional[str],
Query(
description="Storage backend to use (e.g. 'minio', 's3'). "
"When omitted the backend is inferred from the resource."
),
] = None,
user=Depends(get_user),
):
"""Return a short-lived signed URL for a transcript or recording file stored on S3.
"""Return a short-lived signed URL for a file stored on S3 / MinIO.
Access Control:
* Keys that embed an organization ID (``{prefix}/{org_id}/...``) are
authorized by matching the org_id against the requesting user's
organization.
* Legacy keys (``recordings/{run_id}.wav``, ``transcripts/{run_id}.txt``)
are authorized via the workflow run they belong to.
* Superusers can request any key.
* Regular users can only request resources belonging to **their** workflow runs.
"""
# Validate key and extract workflow_run_id (don't allow special paths for signed URLs)
run_id = await _validate_and_extract_workflow_run_id(key, allow_special_paths=False)
if run_id is None:
raise HTTPException(status_code=400, detail="Invalid key format")
# ------------------------------------------------------------------
# 1. Authorize
# ------------------------------------------------------------------
workflow_run = None
# Authorize and get workflow run
workflow_run = await _authorize_and_get_workflow_run(run_id, user)
org_id = _extract_org_id_from_key(key)
if org_id is not None:
# Generic org-based auth
if not user.is_superuser and org_id != user.selected_organization_id:
raise HTTPException(status_code=403, detail="Access denied")
else:
# Legacy workflow-run-based auth
run_id = _extract_legacy_workflow_run_id(key)
if run_id is None:
raise HTTPException(status_code=400, detail="Invalid key format")
workflow_run = await _authorize_and_get_workflow_run(run_id, user)
# ------------------------------------------------------------------
# 3. Generate the signed URL using the correct storage backend
# 2. Resolve storage backend
# ------------------------------------------------------------------
try:
# Use the storage backend recorded when the file was uploaded
if (
if storage_backend:
storage = get_storage_for_backend(storage_backend)
elif (
workflow_run
and hasattr(workflow_run, "storage_backend")
and workflow_run.storage_backend
):
backend = workflow_run.storage_backend
storage = get_storage_for_backend(backend)
logger.info(
f"DOWNLOAD: Using stored {backend} (value: {backend}) for signed URL generation - workflow_run_id: {run_id}, key: {key}"
)
storage = get_storage_for_backend(workflow_run.storage_backend)
else:
# Fallback to current storage for legacy records without storage_backend
storage = storage_fs
current_backend = StorageBackend.get_current_backend()
logger.warning(
f"DOWNLOAD: No storage_backend found for workflow run {run_id}, falling back to current {current_backend.name} - key: {key}"
)
# ------------------------------------------------------------------
# 3. Generate the signed URL
# ------------------------------------------------------------------
url = await storage.aget_signed_url(
key, expiration=expires_in, force_inline=inline
)
if not url:
raise HTTPException(status_code=500, detail="Failed to generate signed URL")
# Log successful URL generation
backend_info = (
f"stored {backend}"
if workflow_run
and hasattr(workflow_run, "storage_backend")
and workflow_run.storage_backend
else f"current {StorageBackend.get_current_backend().name}"
)
logger.info(
f"Successfully generated signed URL using {backend_info} - expires in {expires_in}s"
)
logger.info(f"Generated signed URL for key={key}, expires_in={expires_in}s")
return {"url": url, "expires_in": expires_in}
except ClientError as exc:
logger.error(f"Error generating signed URL: {exc}")