feat: download campaign report

This commit is contained in:
Abhishek Kumar 2026-03-11 17:57:04 +05:30
parent ff92c6ae5c
commit 4d807266a7
12 changed files with 429 additions and 28 deletions

View file

@ -365,6 +365,29 @@ class CampaignClient(BaseDBClient):
result = await session.execute(query)
return list(result.scalars().all())
async def get_completed_runs_for_report(
self, campaign_id: int
) -> list[WorkflowRunModel]:
"""Get completed workflow runs with call duration for campaign report CSV."""
async with self.async_session() as session:
query = (
select(WorkflowRunModel)
.where(
WorkflowRunModel.campaign_id == campaign_id,
WorkflowRunModel.is_completed.is_(True),
WorkflowRunModel.cost_info["call_duration_seconds"]
.as_string()
.isnot(None),
)
.order_by(
WorkflowRunModel.cost_info["call_duration_seconds"]
.as_float()
.desc()
)
)
result = await session.execute(query)
return list(result.scalars().all())
async def create_queued_run(
self,
campaign_id: int,

View file

@ -1,12 +1,16 @@
import csv
import io
import json
from datetime import datetime
from typing import List, Optional
from zoneinfo import ZoneInfo
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field, field_validator, model_validator
from api.constants import (
BACKEND_API_ENDPOINT,
DEFAULT_CAMPAIGN_RETRY_CONFIG,
DEFAULT_ORG_CONCURRENCY_LIMIT,
)
@ -18,6 +22,7 @@ from api.services.campaign.runner import campaign_runner_service
from api.services.campaign.source_sync_factory import get_sync_service
from api.services.quota_service import check_dograh_quota
from api.services.storage import storage_fs
from api.utils.transcript import generate_transcript_text
router = APIRouter(prefix="/campaign")
@ -662,3 +667,76 @@ async def get_campaign_source_download_url(
raise HTTPException(
status_code=500, detail=f"Failed to generate download URL: {str(e)}"
)
def _transcript_from_logs(logs: dict | None) -> str:
"""Extract transcript text from workflow run logs JSON."""
if not logs:
return ""
events = logs.get("realtime_feedback_events", [])
return generate_transcript_text(events).strip()
@router.get("/{campaign_id}/report")
async def download_campaign_report(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> StreamingResponse:
"""Download a CSV report of completed campaign runs."""
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
runs = await db_client.get_completed_runs_for_report(campaign_id)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
[
"Run ID",
"Created At",
"Customer Name",
"Phone Number",
"Call Tags",
"Call Duration (s)",
"Transcript",
"Recording URL",
]
)
for run in runs:
initial = run.initial_context or {}
gathered = run.gathered_context or {}
cost = run.cost_info or {}
recording_url = ""
if run.public_access_token:
recording_url = (
f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow"
f"/{run.public_access_token}/recording"
)
call_tags = gathered.get("call_tags", [])
if isinstance(call_tags, list):
call_tags = ", ".join(str(t) for t in call_tags)
writer.writerow(
[
run.id,
run.created_at.isoformat() if run.created_at else "",
initial.get("first_name", ""),
initial.get("phone_number", ""),
call_tags,
cost.get("call_duration_seconds", ""),
_transcript_from_logs(run.logs),
recording_url,
]
)
output.seek(0)
filename = f"campaign_{campaign_id}_report.csv"
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)

View file

@ -3,7 +3,7 @@ from typing import List, Literal, Optional, TypedDict, Union
from fastapi import APIRouter, Depends, HTTPException, Query
from loguru import logger
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from api.db import db_client
from api.db.models import (
@ -15,7 +15,7 @@ from api.services.configuration.check_validity import (
UserConfigurationValidator,
)
from api.services.configuration.defaults import DEFAULT_SERVICE_PROVIDERS
from api.services.configuration.masking import mask_user_config
from api.services.configuration.masking import check_for_masked_keys, mask_user_config
from api.services.configuration.merge import merge_user_configurations
from api.services.configuration.registry import REGISTRY, ServiceType
from api.services.mps_service_key_client import mps_service_key_client
@ -113,7 +113,15 @@ async def update_user_configurations(
incoming_dict.pop("organization_pricing", None)
# Merge via helper
user_configurations = merge_user_configurations(existing_config, incoming_dict)
try:
user_configurations = merge_user_configurations(existing_config, incoming_dict)
except ValidationError as e:
raise HTTPException(status_code=422, detail=str(e))
try:
check_for_masked_keys(user_configurations)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
try:
validator = UserConfigurationValidator()

View file

@ -16,6 +16,28 @@ from api.services.configuration.registry import ServiceConfig
VISIBLE_CHARS = 4 # number of trailing characters to reveal
MASK_CHAR = "*"
MASK_MARKER = "***" # substring that indicates a masked key
def contains_masked_key(api_key: str | list[str] | None) -> bool:
"""Return True if *api_key* looks like a masked placeholder."""
if api_key is None:
return False
keys = api_key if isinstance(api_key, list) else [api_key]
return any(MASK_MARKER in k for k in keys)
def check_for_masked_keys(config: "UserConfiguration") -> None:
"""Raise ValueError if any service in *config* still has a masked API key."""
for field in ("llm", "tts", "stt", "embeddings"):
service = getattr(config, field, None)
if service is None:
continue
if contains_masked_key(service.get_all_api_keys()):
raise ValueError(
f"The {field} api_key appears to be masked. "
"Please provide the actual API key, not the masked value."
)
def mask_key(real_key: str, visible: int = VISIBLE_CHARS) -> str:

View file

@ -2,7 +2,6 @@ import random
from enum import Enum, auto
from typing import Annotated, Dict, Literal, Type, TypeVar, Union
from loguru import logger
from pydantic import BaseModel, Field, computed_field, field_validator
@ -607,6 +606,7 @@ class SpeechmaticsSTTConfiguration(BaseSTTConfiguration):
STTConfig = Annotated[
Union[
DeepgramSTTConfiguration,
CartesiaSTTConfiguration,
OpenAISTTConfiguration,
DograhSTTService,
SpeechmaticsSTTConfiguration,

View file

@ -6,6 +6,7 @@ from typing import List, Optional
from loguru import logger
from api.utils.transcript import generate_transcript_text as _generate_transcript_text
from pipecat.utils.enums import RealtimeFeedbackType
@ -138,26 +139,9 @@ class InMemoryLogsBuffer:
"""Generate transcript text from logged events.
Filters for rtf-user-transcription (final) and rtf-bot-text events,
formats them as '[timestamp] user/assistant: text\n'.
formats them as '[timestamp] user/assistant: text\\n'.
"""
lines: List[str] = []
for event in self._events:
event_type = event.get("type")
payload = event.get("payload", {})
if (
event_type == RealtimeFeedbackType.USER_TRANSCRIPTION.value
and payload.get("final") is True
):
timestamp = payload.get("timestamp", "")
prefix = f"[{timestamp}] " if timestamp else ""
lines.append(f"{prefix}user: {payload.get('text', '')}\n")
elif event_type == RealtimeFeedbackType.BOT_TEXT.value:
timestamp = payload.get("timestamp", "")
prefix = f"[{timestamp}] " if timestamp else ""
lines.append(f"{prefix}assistant: {payload.get('text', '')}\n")
return "".join(lines)
return _generate_transcript_text(self._events)
def write_transcript_to_temp_file(self) -> Optional[str]:
"""Write transcript to a temporary text file and return the path.

View file

@ -0,0 +1,170 @@
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from api.routes.user import router
from api.schemas.user_configuration import UserConfiguration
from api.services.auth.depends import get_user
from api.services.configuration.masking import mask_key
from api.services.configuration.registry import (
GoogleLLMService,
OpenAILLMService,
)
def _make_test_app():
app = FastAPI()
app.include_router(router)
mock_user = MagicMock()
mock_user.id = 1
mock_user.is_superuser = False
mock_user.selected_organization_id = None
app.dependency_overrides[get_user] = lambda: mock_user
return app
REAL_KEY = "sk-real-key-1234567890abcdef"
MASKED_KEY = mask_key(REAL_KEY) # "**************************cdef"
def _existing_openai_config():
return UserConfiguration(
llm=OpenAILLMService(
provider="openai",
api_key=REAL_KEY,
model="gpt-4.1",
)
)
class TestMaskedKeyRejection:
def test_rejects_masked_api_key_on_provider_change(self):
"""Changing provider with a masked API key should return 400."""
app = _make_test_app()
client = TestClient(app)
with (
patch("api.routes.user.db_client") as mock_db,
patch("api.routes.user.UserConfigurationValidator") as mock_validator,
):
mock_db.get_user_configurations = AsyncMock(
return_value=_existing_openai_config()
)
mock_db.update_user_configuration = AsyncMock(
side_effect=lambda uid, cfg: cfg
)
mock_validator.return_value.validate = AsyncMock()
response = client.put(
"/user/configurations/user",
json={
"llm": {
"provider": "google",
"api_key": MASKED_KEY,
"model": "gemini-2.0-flash",
}
},
)
assert response.status_code == 400
assert "masked" in response.json()["detail"].lower()
def test_rejects_masked_api_key_in_list(self):
"""A list of API keys containing a masked key should return 400."""
app = _make_test_app()
client = TestClient(app)
with (
patch("api.routes.user.db_client") as mock_db,
patch("api.routes.user.UserConfigurationValidator") as mock_validator,
):
mock_db.get_user_configurations = AsyncMock(
return_value=_existing_openai_config()
)
mock_db.update_user_configuration = AsyncMock(
side_effect=lambda uid, cfg: cfg
)
mock_validator.return_value.validate = AsyncMock()
response = client.put(
"/user/configurations/user",
json={
"llm": {
"provider": "google",
"api_key": ["AIzaSyRealKey123456", MASKED_KEY],
"model": "gemini-2.0-flash",
}
},
)
assert response.status_code == 400
assert "masked" in response.json()["detail"].lower()
def test_allows_real_api_key(self):
"""A real (unmasked) API key should be accepted."""
app = _make_test_app()
client = TestClient(app)
new_key = "AIzaSyNewRealKey12345678"
updated = UserConfiguration(
llm=GoogleLLMService(
provider="google",
api_key=new_key,
model="gemini-2.0-flash",
)
)
with (
patch("api.routes.user.db_client") as mock_db,
patch("api.routes.user.UserConfigurationValidator") as mock_validator,
):
mock_db.get_user_configurations = AsyncMock(
return_value=_existing_openai_config()
)
mock_db.update_user_configuration = AsyncMock(return_value=updated)
mock_validator.return_value.validate = AsyncMock()
response = client.put(
"/user/configurations/user",
json={
"llm": {
"provider": "google",
"api_key": new_key,
"model": "gemini-2.0-flash",
}
},
)
assert response.status_code == 200
def test_allows_same_provider_with_masked_key(self):
"""Same provider with masked key should succeed (merge resolves it)."""
app = _make_test_app()
client = TestClient(app)
with (
patch("api.routes.user.db_client") as mock_db,
patch("api.routes.user.UserConfigurationValidator") as mock_validator,
):
existing = _existing_openai_config()
mock_db.get_user_configurations = AsyncMock(return_value=existing)
mock_db.update_user_configuration = AsyncMock(return_value=existing)
mock_validator.return_value.validate = AsyncMock()
response = client.put(
"/user/configurations/user",
json={
"llm": {
"provider": "openai",
"api_key": MASKED_KEY,
"model": "gpt-4.1",
}
},
)
# Merge resolves the masked key back to the real one,
# so check_for_masked_keys should NOT raise.
assert response.status_code == 200

29
api/utils/transcript.py Normal file
View file

@ -0,0 +1,29 @@
from typing import List
from pipecat.utils.enums import RealtimeFeedbackType
def generate_transcript_text(events: List[dict]) -> str:
"""Generate transcript text from realtime feedback events.
Filters for rtf-user-transcription (final) and rtf-bot-text events,
formats them as '[timestamp] user/assistant: text\\n'.
"""
lines: List[str] = []
for event in events:
event_type = event.get("type")
payload = event.get("payload", {})
if (
event_type == RealtimeFeedbackType.USER_TRANSCRIPTION.value
and payload.get("final") is True
):
timestamp = payload.get("timestamp", "")
prefix = f"[{timestamp}] " if timestamp else ""
lines.append(f"{prefix}user: {payload.get('text', '')}\n")
elif event_type == RealtimeFeedbackType.BOT_TEXT.value:
timestamp = payload.get("timestamp", "")
prefix = f"[{timestamp}] " if timestamp else ""
lines.append(f"{prefix}assistant: {payload.get('text', '')}\n")
return "".join(lines)

View file

@ -1,11 +1,12 @@
"use client";
import { ArrowLeft, Check, Clock, Pause, Pencil, Play, RefreshCw, X } from 'lucide-react';
import { ArrowLeft, Check, Clock, Download, Pause, Pencil, Play, RefreshCw, X } from 'lucide-react';
import { useParams, useRouter, useSearchParams } from 'next/navigation';
import { useCallback, useEffect, useState } from 'react';
import { toast } from 'sonner';
import {
downloadCampaignReportApiV1CampaignCampaignIdReportGet,
getCampaignApiV1CampaignCampaignIdGet,
getCampaignSourceDownloadUrlApiV1CampaignCampaignIdSourceDownloadUrlGet,
pauseCampaignApiV1CampaignCampaignIdPausePost,
@ -111,6 +112,40 @@ export default function CampaignDetailPage() {
}
};
// Handle download report
const handleDownloadReport = async () => {
if (!user) return;
try {
const accessToken = await getAccessToken();
const response = await downloadCampaignReportApiV1CampaignCampaignIdReportGet({
path: {
campaign_id: campaignId,
},
headers: {
'Authorization': `Bearer ${accessToken}`,
},
parseAs: 'blob',
});
if (response.data) {
const blob = response.data as Blob;
const url = window.URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `campaign_${campaignId}_report.csv`;
document.body.appendChild(a);
a.click();
a.remove();
window.URL.revokeObjectURL(url);
} else {
toast.error('Failed to download report');
}
} catch (error) {
console.error('Failed to download report:', error);
toast.error('Failed to download report');
}
};
// Handle start campaign
const handleStart = async () => {
if (!user) return;
@ -328,7 +363,13 @@ export default function CampaignDetailPage() {
</span>
</div>
</div>
{renderActionButton()}
<div className="flex items-center gap-2">
<Button variant="outline" onClick={handleDownloadReport}>
<Download className="h-4 w-4 mr-2" />
Download Report
</Button>
{renderActionButton()}
</div>
</div>
</div>

File diff suppressed because one or more lines are too long

View file

@ -3034,6 +3034,39 @@ export type GetCampaignSourceDownloadUrlApiV1CampaignCampaignIdSourceDownloadUrl
export type GetCampaignSourceDownloadUrlApiV1CampaignCampaignIdSourceDownloadUrlGetResponse = GetCampaignSourceDownloadUrlApiV1CampaignCampaignIdSourceDownloadUrlGetResponses[keyof GetCampaignSourceDownloadUrlApiV1CampaignCampaignIdSourceDownloadUrlGetResponses];
export type DownloadCampaignReportApiV1CampaignCampaignIdReportGetData = {
body?: never;
headers?: {
authorization?: string | null;
'X-API-Key'?: string | null;
};
path: {
campaign_id: number;
};
query?: never;
url: '/api/v1/campaign/{campaign_id}/report';
};
export type DownloadCampaignReportApiV1CampaignCampaignIdReportGetErrors = {
/**
* Not found
*/
404: unknown;
/**
* Validation Error
*/
422: HttpValidationError;
};
export type DownloadCampaignReportApiV1CampaignCampaignIdReportGetError = DownloadCampaignReportApiV1CampaignCampaignIdReportGetErrors[keyof DownloadCampaignReportApiV1CampaignCampaignIdReportGetErrors];
export type DownloadCampaignReportApiV1CampaignCampaignIdReportGetResponses = {
/**
* Successful Response
*/
200: unknown;
};
export type ListCredentialsApiV1CredentialsGetData = {
body?: never;
headers?: {

View file

@ -131,8 +131,10 @@ export function UserConfigProvider({ children }: { children: ReactNode }) {
});
if (response.error) {
let msg = 'Failed to save user configuration';
const detail = (response.error as unknown as { detail?: { errors: { model: string; message: string }[] } }).detail;
if (Array.isArray(detail)) {
const detail = (response.error as unknown as { detail?: string | { errors: { model: string; message: string }[] } }).detail;
if (typeof detail === 'string') {
msg = detail;
} else if (Array.isArray(detail)) {
msg = detail
.map((e: { model: string; message: string }) => `${e.model}: ${e.message}`)
.join('\n');