dograh/api/services/reports/daily_report.py
Abhishek Kumar 4f2a629340 Initial Commit 🚀 🚀
2025-09-09 14:37:32 +05:30

237 lines
8.8 KiB
Python

from datetime import datetime, time
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from api.db import db_client
class DailyReportService:
async def get_daily_report(
self,
organization_id: int,
date: str,
timezone: str,
workflow_id: Optional[int] = None,
) -> Dict[str, Any]:
"""
Get daily report for a specific date and timezone.
Args:
organization_id: The organization ID to filter by
date: Date in YYYY-MM-DD format
timezone: IANA timezone string (e.g., "America/New_York")
workflow_id: Optional workflow ID to filter by (None means all workflows)
"""
# Parse date and timezone
tz = ZoneInfo(timezone)
date_obj = datetime.strptime(date, "%Y-%m-%d")
# Create start and end datetime in the specified timezone
start_dt = datetime.combine(date_obj, time.min, tzinfo=tz)
end_dt = datetime.combine(date_obj, time.max, tzinfo=tz)
# Convert to UTC for database queries
start_utc = start_dt.astimezone(ZoneInfo("UTC"))
end_utc = end_dt.astimezone(ZoneInfo("UTC"))
# Get workflow runs from database (optimized - only required fields)
runs = await db_client.get_workflow_runs_for_daily_report(
organization_id=organization_id,
start_utc=start_utc,
end_utc=end_utc,
workflow_id=workflow_id,
)
# Calculate metrics
total_runs = len(runs)
xfer_count = sum(
1
for run in runs
if run["gathered_context"]
and run["gathered_context"].get("mapped_call_disposition") == "XFER"
)
# Calculate disposition distribution
disposition_counts = {}
for run in runs:
if run["gathered_context"]:
disposition = run["gathered_context"].get(
"mapped_call_disposition", "UNKNOWN"
)
disposition_counts[disposition] = (
disposition_counts.get(disposition, 0) + 1
)
# Sort dispositions by count and get top 5
sorted_dispositions = sorted(
disposition_counts.items(), key=lambda x: x[1], reverse=True
)
disposition_distribution = []
other_count = 0
for i, (disposition, count) in enumerate(sorted_dispositions):
if i < 5:
disposition_distribution.append(
{
"disposition": disposition,
"count": count,
"percentage": round(
(count / total_runs * 100) if total_runs > 0 else 0, 2
),
}
)
else:
other_count += count
# Add "Other" category if there are more than 5 dispositions
if other_count > 0:
disposition_distribution.append(
{
"disposition": "Other",
"count": other_count,
"percentage": round(
(other_count / total_runs * 100) if total_runs > 0 else 0, 2
),
}
)
# Calculate call duration distribution
duration_buckets = {
"0-10": {"range_start": 0, "range_end": 10, "count": 0},
"10-30": {"range_start": 10, "range_end": 30, "count": 0},
"30-60": {"range_start": 30, "range_end": 60, "count": 0},
"60-120": {"range_start": 60, "range_end": 120, "count": 0},
"120-180": {"range_start": 120, "range_end": 180, "count": 0},
">180": {"range_start": 180, "range_end": None, "count": 0},
}
for run in runs:
if run["usage_info"]:
duration_str = run["usage_info"].get("call_duration_seconds")
if duration_str:
try:
duration = float(duration_str)
if duration < 10:
duration_buckets["0-10"]["count"] += 1
elif duration < 30:
duration_buckets["10-30"]["count"] += 1
elif duration < 60:
duration_buckets["30-60"]["count"] += 1
elif duration < 120:
duration_buckets["60-120"]["count"] += 1
elif duration < 180:
duration_buckets["120-180"]["count"] += 1
else:
duration_buckets[">180"]["count"] += 1
except (ValueError, TypeError):
pass
# Format duration distribution
call_duration_distribution = []
total_calls_with_duration = sum(b["count"] for b in duration_buckets.values())
for bucket_name, bucket_data in duration_buckets.items():
call_duration_distribution.append(
{
"bucket": bucket_name,
"range_start": bucket_data["range_start"],
"range_end": bucket_data["range_end"],
"count": bucket_data["count"],
"percentage": round(
(bucket_data["count"] / total_calls_with_duration * 100)
if total_calls_with_duration > 0
else 0,
2,
),
}
)
return {
"date": date,
"timezone": timezone,
"workflow_id": workflow_id,
"metrics": {"total_runs": total_runs, "xfer_count": xfer_count},
"disposition_distribution": disposition_distribution,
"call_duration_distribution": call_duration_distribution,
}
async def get_workflows_for_organization(
self, organization_id: int
) -> List[Dict[str, Any]]:
"""
Get all workflows for an organization.
"""
workflows = await db_client.get_workflows_for_organization(organization_id)
return [{"id": workflow.id, "name": workflow.name} for workflow in workflows]
async def get_daily_runs_detail(
self,
organization_id: int,
date: str,
timezone: str,
workflow_id: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Get detailed workflow runs for CSV export.
Args:
organization_id: The organization ID to filter by
date: Date in YYYY-MM-DD format
timezone: IANA timezone string (e.g., "America/New_York")
workflow_id: Optional workflow ID to filter by
"""
# Parse date and timezone
tz = ZoneInfo(timezone)
date_obj = datetime.strptime(date, "%Y-%m-%d")
# Create start and end datetime in the specified timezone
start_dt = datetime.combine(date_obj, time.min, tzinfo=tz)
end_dt = datetime.combine(date_obj, time.max, tzinfo=tz)
# Convert to UTC for database queries
start_utc = start_dt.astimezone(ZoneInfo("UTC"))
end_utc = end_dt.astimezone(ZoneInfo("UTC"))
# Get workflow runs from database (optimized - only required fields)
runs = await db_client.get_workflow_runs_for_daily_report(
organization_id=organization_id,
start_utc=start_utc,
end_utc=end_utc,
workflow_id=workflow_id,
)
# Format runs for CSV export
detailed_runs = []
for run in runs:
# Phone number is already extracted at the database level
# Try customer_phone_number first, then fall back to initial_context
phone_number = run["gathered_context"].get(
"customer_phone_number", ""
) or run["initial_context"].get("phone_number", "")
# Disposition is already extracted at the database level
disposition = run["gathered_context"].get("mapped_call_disposition", "")
# Duration is already extracted at the database level
duration_seconds = 0
duration_str = run["usage_info"].get("call_duration_seconds", "0")
try:
duration_seconds = float(duration_str)
except (ValueError, TypeError):
duration_seconds = 0
detailed_runs.append(
{
"phone_number": phone_number,
"disposition": disposition,
"duration_seconds": duration_seconds,
"workflow_id": run["workflow_id"],
"run_id": run["id"],
"workflow_name": run["workflow_name"],
"created_at": run["created_at"].isoformat(),
}
)
return detailed_runs