dograh/api/services/reports/run_report.py
Abhishek Kumar 7810923bca chore: return formatted transcript url
- Return formatted transcript and recording URL
- Harden campaign dispatcher logic
2026-05-26 13:24:12 +05:30

131 lines
4.2 KiB
Python

"""CSV reports built from completed workflow runs.
Shared by campaign-, workflow-, and organization-usage-scoped reports.
The DB client supplies the row set; this module owns the column layout
so every endpoint emits the same shape.
"""
import csv
import io
from datetime import UTC, datetime
from typing import Any, List, Optional
from api.db import db_client
from api.utils.artifacts import artifact_url
def _collect_extracted_variable_keys(runs: List[Any]) -> list[str]:
"""Collect all unique extracted variable keys across runs, preserving insertion order."""
keys: dict[str, None] = {}
for run in runs:
gathered = run.gathered_context or {}
extracted = gathered.get("extracted_variables", {})
if isinstance(extracted, dict):
for key in extracted:
keys.setdefault(key, None)
return list(keys)
def build_run_report_csv(runs: List[Any]) -> io.StringIO:
"""Build a CSV from completed workflow runs."""
extracted_var_keys = _collect_extracted_variable_keys(runs)
output = io.StringIO()
writer = csv.writer(output)
pre_headers = [
"Run ID",
"Campaign ID",
"Agent ID",
"Agent Definition ID",
"Created At",
"Phone Number",
"Call Disposition",
"Call Duration (s)",
]
post_headers = [
"Call Tags",
"Transcript URL",
"Recording URL",
]
writer.writerow(pre_headers + extracted_var_keys + post_headers)
for run in runs:
initial = run.initial_context or {}
gathered = run.gathered_context or {}
cost = run.cost_info or {}
call_tags = gathered.get("call_tags", [])
if isinstance(call_tags, list):
call_tags = ", ".join(str(t) for t in call_tags)
pre_values = [
run.id,
run.campaign_id if run.campaign_id is not None else "",
run.workflow_id,
run.definition_id if run.definition_id is not None else "",
run.created_at.isoformat() if run.created_at else "",
initial.get("phone_number", ""),
gathered.get("mapped_call_disposition", ""),
cost.get("call_duration_seconds", ""),
]
extracted = gathered.get("extracted_variables", {})
if not isinstance(extracted, dict):
extracted = {}
extracted_values = [extracted.get(key, "") for key in extracted_var_keys]
post_values = [
call_tags,
artifact_url(run.public_access_token, "transcript") or "",
artifact_url(run.public_access_token, "recording") or "",
]
writer.writerow(pre_values + extracted_values + post_values)
output.seek(0)
return output
async def generate_campaign_report_csv(
campaign_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for a campaign."""
runs = await db_client.get_completed_runs_for_report(
campaign_id=campaign_id, start_date=start_date, end_date=end_date
)
return build_run_report_csv(runs), f"campaign_{campaign_id}_report.csv"
async def generate_workflow_report_csv(
workflow_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for all completed runs of a workflow."""
runs = await db_client.get_completed_runs_for_report(
workflow_id=workflow_id, start_date=start_date, end_date=end_date
)
return build_run_report_csv(runs), f"workflow_{workflow_id}_report.csv"
async def generate_usage_runs_report_csv(
organization_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
filters: Optional[list[dict]] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for runs visible on the org-wide usage page.
Honors the same date / filter inputs as the `/usage/runs` listing.
"""
runs = await db_client.get_usage_runs_for_report(
organization_id,
start_date=start_date,
end_date=end_date,
filters=filters,
)
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
return build_run_report_csv(runs), f"usage_runs_{timestamp}.csv"