dograh/api/services/campaign/campaign_event_publisher.py
Abhishek Kumar 4f2a629340 Initial Commit 🚀 🚀
2025-09-09 14:37:32 +05:30

121 lines
3.6 KiB
Python

"""Campaign event publisher for orchestrator communication.
Handles publishing of campaign events to Redis pub/sub channels.
"""
from typing import Dict, Optional
import redis.asyncio as aioredis
from loguru import logger
from api.constants import REDIS_URL
from api.enums import RedisChannel
from api.services.campaign.campaign_event_protocol import (
BatchCompletedEvent,
CampaignCompletedEvent,
RetryNeededEvent,
SyncCompletedEvent,
)
class CampaignEventPublisher:
"""Helper class for publishing campaign events."""
def __init__(self, redis_client):
self.redis = redis_client
async def publish_batch_completed(
self,
campaign_id: int,
processed_count: int,
failed_count: int = 0,
batch_size: int = 0,
metadata: Optional[Dict] = None,
):
"""Publish batch completed event."""
event = BatchCompletedEvent(
campaign_id=campaign_id,
processed_count=processed_count,
failed_count=failed_count,
batch_size=batch_size,
metadata=metadata,
)
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
async def publish_sync_completed(
self,
campaign_id: int,
total_rows: int,
source_type: str = "",
source_id: str = "",
metadata: Optional[Dict] = None,
):
"""Publish sync completed event."""
event = SyncCompletedEvent(
campaign_id=campaign_id,
total_rows=total_rows,
source_type=source_type,
source_id=source_id,
metadata=metadata,
)
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
async def publish_retry_needed(
self,
workflow_run_id: int,
reason: str,
campaign_id: Optional[int] = None,
queued_run_id: Optional[int] = None,
metadata: Optional[Dict] = None,
):
"""Publish retry needed event."""
event = RetryNeededEvent(
campaign_id=campaign_id or 0,
workflow_run_id=workflow_run_id,
queued_run_id=queued_run_id or 0,
reason=reason,
metadata=metadata or {},
)
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
logger.info(
f"Published retry event for workflow_run {workflow_run_id}, "
f"reason: {reason}, campaign: {campaign_id}"
)
async def publish_campaign_completed(
self,
campaign_id: int,
total_rows: int,
processed_rows: int,
failed_rows: int,
duration_seconds: Optional[float] = None,
):
"""Publish campaign completed event."""
event = CampaignCompletedEvent(
campaign_id=campaign_id,
total_rows=total_rows,
processed_rows=processed_rows,
failed_rows=failed_rows,
duration_seconds=duration_seconds,
)
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
# Global publisher instance with lazy Redis connection
async def get_campaign_event_publisher() -> CampaignEventPublisher:
"""Get or create the campaign event publisher."""
global _campaign_publisher
global _campaign_redis_client
if "_campaign_publisher" not in globals():
_campaign_redis_client = await aioredis.from_url(
REDIS_URL, decode_responses=True
)
_campaign_publisher = CampaignEventPublisher(_campaign_redis_client)
return _campaign_publisher