"""Campaign event publisher for orchestrator communication. Handles publishing of campaign events to Redis pub/sub channels. """ from typing import Dict, Optional import redis.asyncio as aioredis from loguru import logger from api.constants import REDIS_URL from api.enums import RedisChannel from api.services.campaign.campaign_event_protocol import ( BatchCompletedEvent, CampaignCompletedEvent, RetryNeededEvent, SyncCompletedEvent, ) class CampaignEventPublisher: """Helper class for publishing campaign events.""" def __init__(self, redis_client): self.redis = redis_client async def publish_batch_completed( self, campaign_id: int, processed_count: int, failed_count: int = 0, batch_size: int = 0, metadata: Optional[Dict] = None, ): """Publish batch completed event.""" event = BatchCompletedEvent( campaign_id=campaign_id, processed_count=processed_count, failed_count=failed_count, batch_size=batch_size, metadata=metadata, ) await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json()) async def publish_sync_completed( self, campaign_id: int, total_rows: int, source_type: str = "", source_id: str = "", metadata: Optional[Dict] = None, ): """Publish sync completed event.""" event = SyncCompletedEvent( campaign_id=campaign_id, total_rows=total_rows, source_type=source_type, source_id=source_id, metadata=metadata, ) await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json()) async def publish_retry_needed( self, workflow_run_id: int, reason: str, campaign_id: Optional[int] = None, queued_run_id: Optional[int] = None, metadata: Optional[Dict] = None, ): """Publish retry needed event.""" event = RetryNeededEvent( campaign_id=campaign_id or 0, workflow_run_id=workflow_run_id, queued_run_id=queued_run_id or 0, reason=reason, metadata=metadata or {}, ) await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json()) logger.info( f"Published retry event for workflow_run {workflow_run_id}, " f"reason: {reason}, campaign: {campaign_id}" ) async def publish_campaign_completed( self, campaign_id: int, total_rows: int, processed_rows: int, failed_rows: int, duration_seconds: Optional[float] = None, ): """Publish campaign completed event.""" event = CampaignCompletedEvent( campaign_id=campaign_id, total_rows=total_rows, processed_rows=processed_rows, failed_rows=failed_rows, duration_seconds=duration_seconds, ) await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json()) # Global publisher instance with lazy Redis connection async def get_campaign_event_publisher() -> CampaignEventPublisher: """Get or create the campaign event publisher.""" global _campaign_publisher global _campaign_redis_client if "_campaign_publisher" not in globals(): _campaign_redis_client = await aioredis.from_url( REDIS_URL, decode_responses=True ) _campaign_publisher = CampaignEventPublisher(_campaign_redis_client) return _campaign_publisher