feat(gateway): schedule gateway maintenance tasks

This commit is contained in:
Anish Sarkar 2026-05-27 23:40:47 +05:30
parent d32e8c6a90
commit a428f6c05f
2 changed files with 158 additions and 0 deletions

View file

@ -188,6 +188,7 @@ celery_app = Celery(
"app.tasks.celery_tasks.document_reindex_tasks",
"app.tasks.celery_tasks.stale_notification_cleanup_task",
"app.tasks.celery_tasks.stripe_reconciliation_task",
"app.tasks.celery_tasks.gateway_tasks",
],
)
@ -242,6 +243,10 @@ celery_app.conf.update(
"index_obsidian_attachment": {"queue": CONNECTORS_QUEUE},
# Everything else (document processing, podcasts, reindexing,
# schedule checker, cleanup) stays on the default fast queue.
"gateway.process_inbound_event": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
"gateway.reconcile_inbox": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
"gateway.health_check": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
"gateway.retention_sweep": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
},
)
@ -282,4 +287,19 @@ celery_app.conf.beat_schedule = {
"expires": 60,
},
},
"gateway-reconcile-inbox": {
"task": "gateway.reconcile_inbox",
"schedule": crontab(minute="*"),
"options": {"expires": 60},
},
"gateway-health-check": {
"task": "gateway.health_check",
"schedule": crontab(minute="*/5"),
"options": {"expires": 120},
},
"gateway-retention-sweep": {
"task": "gateway.retention_sweep",
"schedule": crontab(hour="3", minute="17"),
"options": {"expires": 600},
},
}

View file

@ -0,0 +1,138 @@
"""Celery tasks for messaging gateway intake and maintenance."""
from __future__ import annotations
import logging
from datetime import UTC, datetime, timedelta
from sqlalchemy import select, update
from app.celery_app import celery_app
from app.db import (
GatewayEventStatus,
GatewayHealthStatus,
GatewayInboundEvent,
GatewayPlatform,
GatewayPlatformAccount,
)
from app.gateway.accounts import account_token
from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key
from app.gateway.telegram.adapter import TelegramAdapter
from app.observability.metrics import (
record_gateway_health_check_failure,
record_gateway_inbound_reconciled,
)
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
name="gateway.process_inbound_event",
acks_late=True,
max_retries=5,
retry_backoff=True,
)
def process_inbound_event_task(self, inbox_id: int) -> None:
logger.warning(
"Ignoring Celery gateway.process_inbound_event for inbox_id=%s; "
"GatewayRunner owns agent turn processing.",
inbox_id,
)
return None
@celery_app.task(name="gateway.reconcile_inbox")
def reconcile_inbox_task() -> None:
async def _run() -> None:
session_maker = get_celery_session_maker()
async with session_maker() as session:
stale_threshold = datetime.now(UTC) - timedelta(minutes=10)
result = await session.execute(
update(GatewayInboundEvent)
.where(
GatewayInboundEvent.status == GatewayEventStatus.PROCESSING,
GatewayInboundEvent.received_at < stale_threshold,
)
.values(
status=GatewayEventStatus.RECEIVED,
last_error="stale processing reset for gateway runner",
)
)
for _ in range(result.rowcount or 0):
record_gateway_inbound_reconciled(reason="stale_processing_reset")
await session.commit()
return run_async_celery_task(_run)
@celery_app.task(name="gateway.health_check")
def gateway_health_check_task() -> None:
async def _run() -> None:
session_maker = get_celery_session_maker()
async with session_maker() as session:
result = await session.execute(select(GatewayPlatformAccount))
accounts = list(result.scalars())
for account in accounts:
token = account_token(account)
if not token or account.platform != GatewayPlatform.TELEGRAM:
continue
try:
metadata = await TelegramAdapter(token).validate_credentials()
account.health_status = GatewayHealthStatus.OK
account.account_metadata = {
**(account.account_metadata or {}),
"bot_username": metadata.get("username"),
}
except Exception:
logger.warning("Gateway Telegram health check failed", exc_info=True)
account.health_status = GatewayHealthStatus.FAILING
record_gateway_health_check_failure(platform=account.platform.value)
account.last_health_check_at = datetime.now(UTC)
await session.commit()
return run_async_celery_task(_run)
@celery_app.task(name="gateway.retention_sweep")
def gateway_retention_sweep_task() -> None:
async def _run() -> None:
session_maker = get_celery_session_maker()
async with session_maker() as session:
raw_cutoff = datetime.now(UTC) - timedelta(days=30)
delete_cutoff = datetime.now(UTC) - timedelta(days=365)
await session.execute(
update(GatewayInboundEvent)
.where(GatewayInboundEvent.received_at < raw_cutoff)
.values(raw_payload=None)
)
result = await session.execute(
select(GatewayInboundEvent).where(
GatewayInboundEvent.received_at < delete_cutoff
)
)
for event in result.scalars():
await session.delete(event)
await session.commit()
return run_async_celery_task(_run)
async def enqueue_telegram_update(account_id: int, raw_update: dict) -> int | None:
session_maker = get_celery_session_maker()
async with session_maker() as session:
parsed = TelegramAdapter("placeholder").parse_inbound(raw_update)
inbox_id = await persist_inbound_event(
session,
account_id=account_id,
platform=GatewayPlatform.TELEGRAM,
event_dedupe_key=telegram_event_dedupe_key(raw_update["update_id"]),
external_event_id=str(raw_update["update_id"]),
external_message_id=parsed.external_message_id,
event_kind=parsed.event_kind,
raw_payload=raw_update,
)
await session.commit()
return inbox_id