chore: ran backend and frontend linting

This commit is contained in:
Anish Sarkar 2026-01-22 16:07:06 +05:30
parent 3d4a8f981c
commit be7ba76417
11 changed files with 119 additions and 127 deletions

View file

@ -6,7 +6,6 @@ For older items (beyond the sync window), use the list endpoint.
"""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
@ -24,14 +23,14 @@ class NotificationResponse(BaseModel):
id: int
user_id: str
search_space_id: Optional[int]
search_space_id: int | None
type: str
title: str
message: str
read: bool
metadata: dict
created_at: str
updated_at: Optional[str]
updated_at: str | None
class Config:
from_attributes = True
@ -43,7 +42,7 @@ class NotificationListResponse(BaseModel):
items: list[NotificationResponse]
total: int
has_more: bool
next_offset: Optional[int]
next_offset: int | None
class MarkReadResponse(BaseModel):
@ -63,9 +62,15 @@ class MarkAllReadResponse(BaseModel):
@router.get("", response_model=NotificationListResponse)
async def list_notifications(
search_space_id: Optional[int] = Query(None, description="Filter by search space ID"),
type_filter: Optional[str] = Query(None, alias="type", description="Filter by notification type"),
before_date: Optional[str] = Query(None, description="Get notifications before this ISO date (for pagination)"),
search_space_id: int | None = Query(
None, description="Filter by search space ID"
),
type_filter: str | None = Query(
None, alias="type", description="Filter by notification type"
),
before_date: str | None = Query(
None, description="Get notifications before this ISO date (for pagination)"
),
limit: int = Query(50, ge=1, le=100, description="Number of items to return"),
offset: int = Query(0, ge=0, description="Number of items to skip"),
user: User = Depends(current_active_user),
@ -73,32 +78,34 @@ async def list_notifications(
) -> NotificationListResponse:
"""
List notifications for the current user with pagination.
This endpoint is used as a fallback for older notifications that are
outside the Electric SQL sync window (2 weeks).
Use `before_date` to paginate through older notifications efficiently.
"""
# Build base query
query = select(Notification).where(Notification.user_id == user.id)
count_query = select(func.count(Notification.id)).where(Notification.user_id == user.id)
count_query = select(func.count(Notification.id)).where(
Notification.user_id == user.id
)
# Filter by search space (include null search_space_id for global notifications)
if search_space_id is not None:
query = query.where(
(Notification.search_space_id == search_space_id) |
(Notification.search_space_id.is_(None))
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
count_query = count_query.where(
(Notification.search_space_id == search_space_id) |
(Notification.search_space_id.is_(None))
(Notification.search_space_id == search_space_id)
| (Notification.search_space_id.is_(None))
)
# Filter by type
if type_filter:
query = query.where(Notification.type == type_filter)
count_query = count_query.where(Notification.type == type_filter)
# Filter by date (for efficient pagination of older items)
if before_date:
try:
@ -110,39 +117,47 @@ async def list_notifications(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid date format. Use ISO format (e.g., 2024-01-15T00:00:00Z)",
) from None
# Get total count
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# Apply ordering and pagination
query = query.order_by(desc(Notification.created_at)).offset(offset).limit(limit + 1)
query = (
query.order_by(desc(Notification.created_at)).offset(offset).limit(limit + 1)
)
# Execute query
result = await session.execute(query)
notifications = result.scalars().all()
# Check if there are more items
has_more = len(notifications) > limit
if has_more:
notifications = notifications[:limit]
# Convert to response format
items = []
for notification in notifications:
items.append(NotificationResponse(
id=notification.id,
user_id=str(notification.user_id),
search_space_id=notification.search_space_id,
type=notification.type,
title=notification.title,
message=notification.message,
read=notification.read,
metadata=notification.notification_metadata or {},
created_at=notification.created_at.isoformat() if notification.created_at else "",
updated_at=notification.updated_at.isoformat() if notification.updated_at else None,
))
items.append(
NotificationResponse(
id=notification.id,
user_id=str(notification.user_id),
search_space_id=notification.search_space_id,
type=notification.type,
title=notification.title,
message=notification.message,
read=notification.read,
metadata=notification.notification_metadata or {},
created_at=notification.created_at.isoformat()
if notification.created_at
else "",
updated_at=notification.updated_at.isoformat()
if notification.updated_at
else None,
)
)
return NotificationListResponse(
items=items,
total=total,