mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
237 lines
8.8 KiB
Python
237 lines
8.8 KiB
Python
from datetime import datetime, time
|
|
from typing import Any, Dict, List, Optional
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from api.db import db_client
|
|
|
|
|
|
class DailyReportService:
|
|
async def get_daily_report(
|
|
self,
|
|
organization_id: int,
|
|
date: str,
|
|
timezone: str,
|
|
workflow_id: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get daily report for a specific date and timezone.
|
|
|
|
Args:
|
|
organization_id: The organization ID to filter by
|
|
date: Date in YYYY-MM-DD format
|
|
timezone: IANA timezone string (e.g., "America/New_York")
|
|
workflow_id: Optional workflow ID to filter by (None means all workflows)
|
|
"""
|
|
# Parse date and timezone
|
|
tz = ZoneInfo(timezone)
|
|
date_obj = datetime.strptime(date, "%Y-%m-%d")
|
|
|
|
# Create start and end datetime in the specified timezone
|
|
start_dt = datetime.combine(date_obj, time.min, tzinfo=tz)
|
|
end_dt = datetime.combine(date_obj, time.max, tzinfo=tz)
|
|
|
|
# Convert to UTC for database queries
|
|
start_utc = start_dt.astimezone(ZoneInfo("UTC"))
|
|
end_utc = end_dt.astimezone(ZoneInfo("UTC"))
|
|
|
|
# Get workflow runs from database (optimized - only required fields)
|
|
runs = await db_client.get_workflow_runs_for_daily_report(
|
|
organization_id=organization_id,
|
|
start_utc=start_utc,
|
|
end_utc=end_utc,
|
|
workflow_id=workflow_id,
|
|
)
|
|
|
|
# Calculate metrics
|
|
total_runs = len(runs)
|
|
xfer_count = sum(
|
|
1
|
|
for run in runs
|
|
if run["gathered_context"]
|
|
and run["gathered_context"].get("mapped_call_disposition") == "XFER"
|
|
)
|
|
|
|
# Calculate disposition distribution
|
|
disposition_counts = {}
|
|
for run in runs:
|
|
if run["gathered_context"]:
|
|
disposition = run["gathered_context"].get(
|
|
"mapped_call_disposition", "UNKNOWN"
|
|
)
|
|
disposition_counts[disposition] = (
|
|
disposition_counts.get(disposition, 0) + 1
|
|
)
|
|
|
|
# Sort dispositions by count and get top 5
|
|
sorted_dispositions = sorted(
|
|
disposition_counts.items(), key=lambda x: x[1], reverse=True
|
|
)
|
|
|
|
disposition_distribution = []
|
|
other_count = 0
|
|
|
|
for i, (disposition, count) in enumerate(sorted_dispositions):
|
|
if i < 5:
|
|
disposition_distribution.append(
|
|
{
|
|
"disposition": disposition,
|
|
"count": count,
|
|
"percentage": round(
|
|
(count / total_runs * 100) if total_runs > 0 else 0, 2
|
|
),
|
|
}
|
|
)
|
|
else:
|
|
other_count += count
|
|
|
|
# Add "Other" category if there are more than 5 dispositions
|
|
if other_count > 0:
|
|
disposition_distribution.append(
|
|
{
|
|
"disposition": "Other",
|
|
"count": other_count,
|
|
"percentage": round(
|
|
(other_count / total_runs * 100) if total_runs > 0 else 0, 2
|
|
),
|
|
}
|
|
)
|
|
|
|
# Calculate call duration distribution
|
|
duration_buckets = {
|
|
"0-10": {"range_start": 0, "range_end": 10, "count": 0},
|
|
"10-30": {"range_start": 10, "range_end": 30, "count": 0},
|
|
"30-60": {"range_start": 30, "range_end": 60, "count": 0},
|
|
"60-120": {"range_start": 60, "range_end": 120, "count": 0},
|
|
"120-180": {"range_start": 120, "range_end": 180, "count": 0},
|
|
">180": {"range_start": 180, "range_end": None, "count": 0},
|
|
}
|
|
|
|
for run in runs:
|
|
if run["usage_info"]:
|
|
duration_str = run["usage_info"].get("call_duration_seconds")
|
|
if duration_str:
|
|
try:
|
|
duration = float(duration_str)
|
|
if duration < 10:
|
|
duration_buckets["0-10"]["count"] += 1
|
|
elif duration < 30:
|
|
duration_buckets["10-30"]["count"] += 1
|
|
elif duration < 60:
|
|
duration_buckets["30-60"]["count"] += 1
|
|
elif duration < 120:
|
|
duration_buckets["60-120"]["count"] += 1
|
|
elif duration < 180:
|
|
duration_buckets["120-180"]["count"] += 1
|
|
else:
|
|
duration_buckets[">180"]["count"] += 1
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Format duration distribution
|
|
call_duration_distribution = []
|
|
total_calls_with_duration = sum(b["count"] for b in duration_buckets.values())
|
|
|
|
for bucket_name, bucket_data in duration_buckets.items():
|
|
call_duration_distribution.append(
|
|
{
|
|
"bucket": bucket_name,
|
|
"range_start": bucket_data["range_start"],
|
|
"range_end": bucket_data["range_end"],
|
|
"count": bucket_data["count"],
|
|
"percentage": round(
|
|
(bucket_data["count"] / total_calls_with_duration * 100)
|
|
if total_calls_with_duration > 0
|
|
else 0,
|
|
2,
|
|
),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"date": date,
|
|
"timezone": timezone,
|
|
"workflow_id": workflow_id,
|
|
"metrics": {"total_runs": total_runs, "xfer_count": xfer_count},
|
|
"disposition_distribution": disposition_distribution,
|
|
"call_duration_distribution": call_duration_distribution,
|
|
}
|
|
|
|
async def get_workflows_for_organization(
|
|
self, organization_id: int
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all workflows for an organization.
|
|
"""
|
|
workflows = await db_client.get_workflows_for_organization(organization_id)
|
|
|
|
return [{"id": workflow.id, "name": workflow.name} for workflow in workflows]
|
|
|
|
async def get_daily_runs_detail(
|
|
self,
|
|
organization_id: int,
|
|
date: str,
|
|
timezone: str,
|
|
workflow_id: Optional[int] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get detailed workflow runs for CSV export.
|
|
|
|
Args:
|
|
organization_id: The organization ID to filter by
|
|
date: Date in YYYY-MM-DD format
|
|
timezone: IANA timezone string (e.g., "America/New_York")
|
|
workflow_id: Optional workflow ID to filter by
|
|
"""
|
|
# Parse date and timezone
|
|
tz = ZoneInfo(timezone)
|
|
date_obj = datetime.strptime(date, "%Y-%m-%d")
|
|
|
|
# Create start and end datetime in the specified timezone
|
|
start_dt = datetime.combine(date_obj, time.min, tzinfo=tz)
|
|
end_dt = datetime.combine(date_obj, time.max, tzinfo=tz)
|
|
|
|
# Convert to UTC for database queries
|
|
start_utc = start_dt.astimezone(ZoneInfo("UTC"))
|
|
end_utc = end_dt.astimezone(ZoneInfo("UTC"))
|
|
|
|
# Get workflow runs from database (optimized - only required fields)
|
|
runs = await db_client.get_workflow_runs_for_daily_report(
|
|
organization_id=organization_id,
|
|
start_utc=start_utc,
|
|
end_utc=end_utc,
|
|
workflow_id=workflow_id,
|
|
)
|
|
|
|
# Format runs for CSV export
|
|
detailed_runs = []
|
|
for run in runs:
|
|
# Phone number is already extracted at the database level
|
|
# Try customer_phone_number first, then fall back to initial_context
|
|
phone_number = run["gathered_context"].get(
|
|
"customer_phone_number", ""
|
|
) or run["initial_context"].get("phone_number", "")
|
|
|
|
# Disposition is already extracted at the database level
|
|
disposition = run["gathered_context"].get("mapped_call_disposition", "")
|
|
|
|
# Duration is already extracted at the database level
|
|
duration_seconds = 0
|
|
duration_str = run["usage_info"].get("call_duration_seconds", "0")
|
|
try:
|
|
duration_seconds = float(duration_str)
|
|
except (ValueError, TypeError):
|
|
duration_seconds = 0
|
|
|
|
detailed_runs.append(
|
|
{
|
|
"phone_number": phone_number,
|
|
"disposition": disposition,
|
|
"duration_seconds": duration_seconds,
|
|
"workflow_id": run["workflow_id"],
|
|
"run_id": run["id"],
|
|
"workflow_name": run["workflow_name"],
|
|
"created_at": run["created_at"].isoformat(),
|
|
}
|
|
)
|
|
|
|
return detailed_runs
|