dograh/api/services/reports/run_report.py

132 lines
4.2 KiB
Python
Raw Permalink Normal View History

"""CSV reports built from completed workflow runs.
Shared by campaign-, workflow-, and organization-usage-scoped reports.
The DB client supplies the row set; this module owns the column layout
so every endpoint emits the same shape.
"""
2026-03-23 11:36:39 +05:30
import csv
import io
from datetime import UTC, datetime
2026-03-27 17:31:17 +05:30
from typing import Any, List, Optional
2026-03-23 11:36:39 +05:30
from api.db import db_client
from api.utils.artifacts import artifact_url
2026-03-23 11:36:39 +05:30
def _collect_extracted_variable_keys(runs: List[Any]) -> list[str]:
"""Collect all unique extracted variable keys across runs, preserving insertion order."""
keys: dict[str, None] = {}
for run in runs:
gathered = run.gathered_context or {}
extracted = gathered.get("extracted_variables", {})
if isinstance(extracted, dict):
for key in extracted:
keys.setdefault(key, None)
return list(keys)
def build_run_report_csv(runs: List[Any]) -> io.StringIO:
"""Build a CSV from completed workflow runs."""
2026-03-23 11:36:39 +05:30
extracted_var_keys = _collect_extracted_variable_keys(runs)
output = io.StringIO()
writer = csv.writer(output)
pre_headers = [
"Run ID",
2026-04-13 23:25:43 +05:30
"Campaign ID",
"Agent ID",
"Agent Definition ID",
2026-03-23 11:36:39 +05:30
"Created At",
"Phone Number",
"Call Disposition",
"Call Duration (s)",
]
post_headers = [
"Call Tags",
2026-04-13 23:25:43 +05:30
"Transcript URL",
2026-03-23 11:36:39 +05:30
"Recording URL",
]
writer.writerow(pre_headers + extracted_var_keys + post_headers)
for run in runs:
initial = run.initial_context or {}
gathered = run.gathered_context or {}
cost = run.cost_info or {}
call_tags = gathered.get("call_tags", [])
if isinstance(call_tags, list):
call_tags = ", ".join(str(t) for t in call_tags)
pre_values = [
run.id,
2026-04-13 23:25:43 +05:30
run.campaign_id if run.campaign_id is not None else "",
run.workflow_id,
run.definition_id if run.definition_id is not None else "",
2026-03-23 11:36:39 +05:30
run.created_at.isoformat() if run.created_at else "",
initial.get("phone_number", ""),
gathered.get("mapped_call_disposition", ""),
cost.get("call_duration_seconds", ""),
]
extracted = gathered.get("extracted_variables", {})
if not isinstance(extracted, dict):
extracted = {}
extracted_values = [extracted.get(key, "") for key in extracted_var_keys]
post_values = [
call_tags,
artifact_url(run.public_access_token, "transcript") or "",
artifact_url(run.public_access_token, "recording") or "",
2026-03-23 11:36:39 +05:30
]
writer.writerow(pre_values + extracted_values + post_values)
output.seek(0)
2026-04-13 23:25:43 +05:30
return output
async def generate_campaign_report_csv(
campaign_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for a campaign."""
runs = await db_client.get_completed_runs_for_report(
campaign_id=campaign_id, start_date=start_date, end_date=end_date
)
return build_run_report_csv(runs), f"campaign_{campaign_id}_report.csv"
2026-04-13 23:25:43 +05:30
async def generate_workflow_report_csv(
workflow_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for all completed runs of a workflow."""
runs = await db_client.get_completed_runs_for_report(
workflow_id=workflow_id, start_date=start_date, end_date=end_date
)
return build_run_report_csv(runs), f"workflow_{workflow_id}_report.csv"
async def generate_usage_runs_report_csv(
organization_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
filters: Optional[list[dict]] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for runs visible on the org-wide usage page.
Honors the same date / filter inputs as the `/usage/runs` listing.
"""
runs = await db_client.get_usage_runs_for_report(
organization_id,
start_date=start_date,
end_date=end_date,
filters=filters,
)
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
return build_run_report_csv(runs), f"usage_runs_{timestamp}.csv"