feat: deprecate dograh based quota tracking

This commit is contained in:
Abhishek Kumar 2026-06-11 18:36:08 +05:30
parent fde84387f2
commit 7d4e2e06a9
9 changed files with 90 additions and 313 deletions

View file

@ -23,7 +23,7 @@ from api.schemas.ai_model_configuration import EffectiveAIModelConfiguration
class OrganizationUsageClient(BaseDBClient):
"""Client for managing organization usage and quota operations."""
"""Client for managing organization usage reporting aggregates."""
async def get_or_create_current_cycle(
self, organization_id: int, session=None
@ -49,14 +49,7 @@ class OrganizationUsageClient(BaseDBClient):
self, organization_id: int, session, commit: bool
) -> OrganizationUsageCycleModel:
"""Internal implementation for get_or_create_current_cycle."""
# Get organization to determine quota type
org_result = await session.execute(
select(OrganizationModel).where(OrganizationModel.id == organization_id)
)
org = org_result.scalar_one()
# Calculate current period
period_start, period_end = self._calculate_current_period(org)
period_start, period_end = self._calculate_current_period()
# Try to get existing cycle
cycle_result = await session.execute(
@ -78,7 +71,8 @@ class OrganizationUsageClient(BaseDBClient):
organization_id=organization_id,
period_start=period_start,
period_end=period_end,
quota_dograh_tokens=org.quota_dograh_tokens,
# Deprecated non-null column retained for historical schema compatibility.
quota_dograh_tokens=0,
)
# Handle concurrent inserts gracefully
stmt = stmt.on_conflict_do_nothing(
@ -102,54 +96,6 @@ class OrganizationUsageClient(BaseDBClient):
)
return cycle_result.scalar_one()
async def check_and_reserve_quota(
self, organization_id: int, estimated_tokens: int = 0
) -> bool:
"""
Check if organization has sufficient quota and optionally reserve tokens.
Returns True if quota is available, False otherwise.
This method is fully atomic and safe for concurrent access from multiple processes.
"""
async with self.async_session() as session:
# Get organization
org_result = await session.execute(
select(OrganizationModel).where(OrganizationModel.id == organization_id)
)
org = org_result.scalar_one_or_none()
if not org or not org.quota_enabled:
# No quota enforcement if not enabled
return True
# Get or create current cycle within the same session/transaction
cycle = await self._get_or_create_current_cycle_impl(
organization_id, session, commit=False
)
# Atomic check and update with row-level lock
result = await session.execute(
select(OrganizationUsageCycleModel)
.where(
and_(
OrganizationUsageCycleModel.id == cycle.id,
OrganizationUsageCycleModel.used_dograh_tokens
+ estimated_tokens
<= OrganizationUsageCycleModel.quota_dograh_tokens,
)
)
.with_for_update(skip_locked=False)
)
cycle_locked = result.scalar_one_or_none()
if cycle_locked:
# Update the usage atomically
cycle_locked.used_dograh_tokens += estimated_tokens
await session.commit()
return True
return False
async def update_usage_after_run(
self,
organization_id: int,
@ -188,9 +134,8 @@ class OrganizationUsageClient(BaseDBClient):
await session.commit()
async def get_current_usage(self, organization_id: int) -> dict:
"""Get current period usage information."""
"""Get current reporting-period usage information."""
async with self.async_session() as session:
# Get organization
org_result = await session.execute(
select(OrganizationModel).where(OrganizationModel.id == organization_id)
)
@ -201,42 +146,19 @@ class OrganizationUsageClient(BaseDBClient):
organization_id, session, commit=False
)
# Calculate next refresh date
if org.quota_type == "monthly":
next_refresh = cycle.period_end + relativedelta(days=1)
else: # annual
next_refresh = cycle.period_end + relativedelta(days=1)
result = {
"period_start": cycle.period_start.isoformat(),
"period_end": cycle.period_end.isoformat(),
"used_dograh_tokens": cycle.used_dograh_tokens,
"quota_dograh_tokens": cycle.quota_dograh_tokens,
"percentage_used": (
round(
(cycle.used_dograh_tokens / cycle.quota_dograh_tokens) * 100, 2
)
if cycle.quota_dograh_tokens > 0
else 0
),
"next_refresh_date": next_refresh.date().isoformat(),
"quota_enabled": org.quota_enabled,
"total_duration_seconds": cycle.total_duration_seconds,
}
# Add USD fields if organization has pricing
if org.price_per_second_usd is not None:
result["used_amount_usd"] = cycle.used_amount_usd or 0
result["quota_amount_usd"] = cycle.quota_amount_usd
result["currency"] = "USD"
result["price_per_second_usd"] = org.price_per_second_usd
# Calculate percentage based on USD if available
if cycle.quota_amount_usd and cycle.quota_amount_usd > 0:
result["percentage_used"] = round(
((cycle.used_amount_usd or 0) / cycle.quota_amount_usd) * 100, 2
)
return result
async def get_usage_history(
@ -545,83 +467,11 @@ class OrganizationUsageClient(BaseDBClient):
"currency": "USD",
}
async def update_organization_quota(
self,
organization_id: int,
quota_type: str,
quota_dograh_tokens: int,
quota_reset_day: Optional[int] = None,
quota_start_date: Optional[datetime] = None,
) -> OrganizationModel:
"""Update organization quota settings."""
async with self.async_session() as session:
result = await session.execute(
select(OrganizationModel).where(OrganizationModel.id == organization_id)
)
org = result.scalar_one()
org.quota_type = quota_type
org.quota_dograh_tokens = quota_dograh_tokens
org.quota_enabled = True
if quota_type == "monthly" and quota_reset_day:
org.quota_reset_day = quota_reset_day
elif quota_type == "annual" and quota_start_date:
org.quota_start_date = quota_start_date
await session.commit()
await session.refresh(org)
return org
def _calculate_current_period(
self, org: OrganizationModel
) -> tuple[datetime, datetime]:
"""Calculate the current billing period based on organization settings."""
def _calculate_current_period(self) -> tuple[datetime, datetime]:
"""Calculate the current calendar-month reporting period."""
now = datetime.now(timezone.utc)
if org.quota_type == "monthly":
# Find the start of the current billing month
reset_day = org.quota_reset_day
# Handle month boundaries
if now.day >= reset_day:
period_start = now.replace(
day=reset_day, hour=0, minute=0, second=0, microsecond=0
)
else:
# Previous month
period_start = (now - relativedelta(months=1)).replace(
day=reset_day, hour=0, minute=0, second=0, microsecond=0
)
# End is one month later minus 1 second
period_end = (
period_start + relativedelta(months=1) - relativedelta(seconds=1)
)
else: # annual
if not org.quota_start_date:
# Default to calendar year
period_start = now.replace(
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
)
period_end = (
period_start + relativedelta(years=1) - relativedelta(seconds=1)
)
else:
# Find current annual period
start_date = org.quota_start_date.replace(tzinfo=timezone.utc)
years_diff = now.year - start_date.year
# Adjust for whether we've passed the anniversary
if now.month < start_date.month or (
now.month == start_date.month and now.day < start_date.day
):
years_diff -= 1
period_start = start_date + relativedelta(years=years_diff)
period_end = (
period_start + relativedelta(years=1) - relativedelta(seconds=1)
)
period_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
period_end = period_start + relativedelta(months=1) - relativedelta(seconds=1)
return period_start, period_end