dograh/api/services/campaign/report.py
2026-04-13 23:25:43 +05:30

113 lines
3.6 KiB
Python

import csv
import io
from datetime import datetime
from typing import Any, List, Optional
from api.constants import BACKEND_API_ENDPOINT
from api.db import db_client
def _artifact_url(token: str | None, artifact: str) -> str:
if not token:
return ""
return f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow/{token}/{artifact}"
def _collect_extracted_variable_keys(runs: List[Any]) -> list[str]:
"""Collect all unique extracted variable keys across runs, preserving insertion order."""
keys: dict[str, None] = {}
for run in runs:
gathered = run.gathered_context or {}
extracted = gathered.get("extracted_variables", {})
if isinstance(extracted, dict):
for key in extracted:
keys.setdefault(key, None)
return list(keys)
def _build_run_report_csv(runs: List[Any]) -> io.StringIO:
"""Build a CSV from completed workflow runs.
Shared between campaign-scoped and workflow-scoped reports.
"""
extracted_var_keys = _collect_extracted_variable_keys(runs)
output = io.StringIO()
writer = csv.writer(output)
pre_headers = [
"Run ID",
"Campaign ID",
"Agent ID",
"Agent Definition ID",
"Created At",
"Phone Number",
"Call Disposition",
"Call Duration (s)",
]
post_headers = [
"Call Tags",
"Transcript URL",
"Recording URL",
]
writer.writerow(pre_headers + extracted_var_keys + post_headers)
for run in runs:
initial = run.initial_context or {}
gathered = run.gathered_context or {}
cost = run.cost_info or {}
call_tags = gathered.get("call_tags", [])
if isinstance(call_tags, list):
call_tags = ", ".join(str(t) for t in call_tags)
pre_values = [
run.id,
run.campaign_id if run.campaign_id is not None else "",
run.workflow_id,
run.definition_id if run.definition_id is not None else "",
run.created_at.isoformat() if run.created_at else "",
initial.get("phone_number", ""),
gathered.get("mapped_call_disposition", ""),
cost.get("call_duration_seconds", ""),
]
extracted = gathered.get("extracted_variables", {})
if not isinstance(extracted, dict):
extracted = {}
extracted_values = [extracted.get(key, "") for key in extracted_var_keys]
post_values = [
call_tags,
_artifact_url(run.public_access_token, "transcript"),
_artifact_url(run.public_access_token, "recording"),
]
writer.writerow(pre_values + extracted_values + post_values)
output.seek(0)
return output
async def generate_campaign_report_csv(
campaign_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for a campaign."""
runs = await db_client.get_completed_runs_for_report(
campaign_id=campaign_id, start_date=start_date, end_date=end_date
)
return _build_run_report_csv(runs), f"campaign_{campaign_id}_report.csv"
async def generate_workflow_report_csv(
workflow_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for all completed runs of a workflow."""
runs = await db_client.get_completed_runs_for_report(
workflow_id=workflow_id, start_date=start_date, end_date=end_date
)
return _build_run_report_csv(runs), f"workflow_{workflow_id}_report.csv"