import json from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from loguru import logger from pydantic import BaseModel, Field from api.constants import DEPLOYMENT_MODE from api.db import db_client from api.db.models import UserModel from api.services.auth.depends import get_user from api.services.mps_service_key_client import mps_service_key_client from api.services.reports import generate_usage_runs_report_csv router = APIRouter(prefix="/organizations") class CurrentUsageResponse(BaseModel): period_start: str period_end: str used_dograh_tokens: float quota_dograh_tokens: int percentage_used: float next_refresh_date: str quota_enabled: bool total_duration_seconds: int # New USD fields used_amount_usd: Optional[float] = None quota_amount_usd: Optional[float] = None currency: Optional[str] = None price_per_second_usd: Optional[float] = None class MPSCreditsResponse(BaseModel): total_credits_used: float remaining_credits: float total_quota: float class WorkflowRunUsageResponse(BaseModel): id: int workflow_id: int workflow_name: Optional[str] name: str created_at: str dograh_token_usage: float call_duration_seconds: int recording_url: Optional[str] = None transcript_url: Optional[str] = None phone_number: Optional[str] = Field( default=None, deprecated=True, description="Deprecated. Use caller_number and called_number instead.", ) caller_number: Optional[str] = None called_number: Optional[str] = None call_type: Optional[str] = None disposition: Optional[str] = None initial_context: Optional[Dict[str, Any]] = None gathered_context: Optional[Dict[str, Any]] = None # New USD field charge_usd: Optional[float] = None class UsageHistoryResponse(BaseModel): runs: List[WorkflowRunUsageResponse] total_dograh_tokens: float total_duration_seconds: int total_count: int page: int limit: int total_pages: int class DailyUsageItem(BaseModel): date: str minutes: float cost_usd: Optional[float] = None dograh_tokens: float call_count: int class DailyUsageBreakdownResponse(BaseModel): breakdown: List[DailyUsageItem] total_minutes: float total_cost_usd: Optional[float] = None total_dograh_tokens: float currency: Optional[str] = None @router.get("/usage/current-period", response_model=CurrentUsageResponse) async def get_current_period_usage(user: UserModel = Depends(get_user)): """Get current billing period usage for the user's organization.""" if not user.selected_organization_id: raise HTTPException(status_code=400, detail="No organization selected") try: usage = await db_client.get_current_usage(user.selected_organization_id) return usage except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/usage/mps-credits", response_model=MPSCreditsResponse) async def get_mps_credits(user: UserModel = Depends(get_user)): """Get aggregated usage and quota from MPS. OSS users: queries by provider_id (created_by). Hosted users: queries by organization_id. """ try: if DEPLOYMENT_MODE == "oss": usage = await mps_service_key_client.get_usage_by_created_by( str(user.provider_id) ) else: if not user.selected_organization_id: raise HTTPException(status_code=400, detail="No organization selected") usage = await mps_service_key_client.get_usage_by_organization( user.selected_organization_id ) total_used = usage.get("total_credits_used", 0.0) total_remaining = usage.get("remaining_credits", 0.0) return MPSCreditsResponse( total_credits_used=total_used, remaining_credits=total_remaining, total_quota=total_used + total_remaining, ) except HTTPException: raise except Exception as e: logger.error(f"Failed to fetch MPS credits: {e}") raise HTTPException(status_code=500, detail=str(e)) FILTERS_DESCRIPTION = """\ JSON-encoded array of filter objects. Each object has the shape: ```json { "attribute": "", "type": "", "value": } ``` Supported `attribute` / `type` / `value` combinations: | attribute | type | value shape | matches | |-----------------|---------------|----------------------------------------------|------------------------------------------------------| | `runId` | `number` | `{ "value": 12345 }` | exact run id | | `workflowId` | `number` | `{ "value": 42 }` | exact agent (workflow) id | | `campaignId` | `number` | `{ "value": 7 }` | exact campaign id | | `callerNumber` | `text` | `{ "value": "415555" }` | substring match on `initial_context.caller_number` | | `calledNumber` | `text` | `{ "value": "9911848" }` | substring match on `initial_context.called_number` | | `dispositionCode` | `multiSelect` | `{ "codes": ["XFER", "DNC"] }` | any of the codes in `gathered_context.mapped_call_disposition` | | `duration` | `numberRange` | `{ "min": 60, "max": 300 }` | call duration (seconds), inclusive bounds | Unknown attributes and unsupported `type` values are silently ignored. Date filtering on this endpoint is done via the dedicated `start_date` / `end_date` query params, not via a `dateRange` filter object. """ @router.get("/usage/runs", response_model=UsageHistoryResponse) async def get_usage_history( start_date: Optional[str] = Query( None, description="ISO 8601 date-time string (UTC). Lower bound (inclusive) on `created_at`.", examples=["2026-04-01T00:00:00Z"], ), end_date: Optional[str] = Query( None, description="ISO 8601 date-time string (UTC). Upper bound (inclusive) on `created_at`.", examples=["2026-05-01T00:00:00Z"], ), page: int = Query(1, ge=1), limit: int = Query(50, ge=1, le=100), filters: Optional[str] = Query( None, description=FILTERS_DESCRIPTION, examples=[ '[{"attribute":"callerNumber","type":"text","value":{"value":"415555"}}]', '[{"attribute":"campaignId","type":"number","value":{"value":7}},' '{"attribute":"duration","type":"numberRange","value":{"min":60,"max":300}}]', '[{"attribute":"dispositionCode","type":"multiSelect","value":{"codes":["XFER","DNC"]}}]', ], ), user: UserModel = Depends(get_user), ): """Get paginated workflow runs with usage for the organization.""" if not user.selected_organization_id: raise HTTPException(status_code=400, detail="No organization selected") # Parse dates if provided start_dt = datetime.fromisoformat(start_date) if start_date else None end_dt = datetime.fromisoformat(end_date) if end_date else None # Parse filters if provided parsed_filters = None if filters: try: parsed_filters = json.loads(filters) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid filters format") try: offset = (page - 1) * limit ( runs, total_count, total_tokens, total_duration, ) = await db_client.get_usage_history( user.selected_organization_id, start_date=start_dt, end_date=end_dt, limit=limit, offset=offset, filters=parsed_filters, ) total_pages = (total_count + limit - 1) // limit return { "runs": runs, "total_dograh_tokens": total_tokens, "total_duration_seconds": total_duration, "total_count": total_count, "page": page, "limit": limit, "total_pages": total_pages, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/usage/runs/report") async def download_usage_runs_report( start_date: Optional[str] = Query( None, description="ISO 8601 date-time string (UTC). Lower bound (inclusive) on `created_at`.", ), end_date: Optional[str] = Query( None, description="ISO 8601 date-time string (UTC). Upper bound (inclusive) on `created_at`.", ), filters: Optional[str] = Query( None, description=FILTERS_DESCRIPTION, ), user: UserModel = Depends(get_user), ) -> StreamingResponse: """Download a CSV of runs matching the same filters as `/usage/runs`.""" if not user.selected_organization_id: raise HTTPException(status_code=400, detail="No organization selected") start_dt = datetime.fromisoformat(start_date) if start_date else None end_dt = datetime.fromisoformat(end_date) if end_date else None parsed_filters = None if filters: try: parsed_filters = json.loads(filters) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid filters format") output, filename = await generate_usage_runs_report_csv( user.selected_organization_id, start_date=start_dt, end_date=end_dt, filters=parsed_filters, ) return StreamingResponse( output, media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.get("/usage/daily-breakdown", response_model=DailyUsageBreakdownResponse) async def get_daily_usage_breakdown( days: int = Query(7, ge=1, le=30, description="Number of days to include"), user: UserModel = Depends(get_user), ): """Get daily usage breakdown for the last N days. Only available for organizations with pricing.""" if not user.selected_organization_id: raise HTTPException(status_code=400, detail="No organization selected") try: # Get organization to check if it has pricing org = await db_client.get_organization_by_id(user.selected_organization_id) if not org or org.price_per_second_usd is None: raise HTTPException( status_code=400, detail="Daily breakdown is only available for organizations with pricing configured", ) # Calculate date range end_date = datetime.now() start_date = end_date - timedelta(days=days - 1) start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) # Get daily breakdown breakdown = await db_client.get_daily_usage_breakdown( user.selected_organization_id, start_date, end_date, org.price_per_second_usd, user_id=user.id, ) return breakdown except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e))