mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-10 08:05:22 +02:00
122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
|
|
"""Campaign event publisher for orchestrator communication.
|
||
|
|
|
||
|
|
Handles publishing of campaign events to Redis pub/sub channels.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from typing import Dict, Optional
|
||
|
|
|
||
|
|
import redis.asyncio as aioredis
|
||
|
|
from loguru import logger
|
||
|
|
|
||
|
|
from api.constants import REDIS_URL
|
||
|
|
from api.enums import RedisChannel
|
||
|
|
from api.services.campaign.campaign_event_protocol import (
|
||
|
|
BatchCompletedEvent,
|
||
|
|
CampaignCompletedEvent,
|
||
|
|
RetryNeededEvent,
|
||
|
|
SyncCompletedEvent,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class CampaignEventPublisher:
|
||
|
|
"""Helper class for publishing campaign events."""
|
||
|
|
|
||
|
|
def __init__(self, redis_client):
|
||
|
|
self.redis = redis_client
|
||
|
|
|
||
|
|
async def publish_batch_completed(
|
||
|
|
self,
|
||
|
|
campaign_id: int,
|
||
|
|
processed_count: int,
|
||
|
|
failed_count: int = 0,
|
||
|
|
batch_size: int = 0,
|
||
|
|
metadata: Optional[Dict] = None,
|
||
|
|
):
|
||
|
|
"""Publish batch completed event."""
|
||
|
|
event = BatchCompletedEvent(
|
||
|
|
campaign_id=campaign_id,
|
||
|
|
processed_count=processed_count,
|
||
|
|
failed_count=failed_count,
|
||
|
|
batch_size=batch_size,
|
||
|
|
metadata=metadata,
|
||
|
|
)
|
||
|
|
|
||
|
|
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
|
||
|
|
|
||
|
|
async def publish_sync_completed(
|
||
|
|
self,
|
||
|
|
campaign_id: int,
|
||
|
|
total_rows: int,
|
||
|
|
source_type: str = "",
|
||
|
|
source_id: str = "",
|
||
|
|
metadata: Optional[Dict] = None,
|
||
|
|
):
|
||
|
|
"""Publish sync completed event."""
|
||
|
|
event = SyncCompletedEvent(
|
||
|
|
campaign_id=campaign_id,
|
||
|
|
total_rows=total_rows,
|
||
|
|
source_type=source_type,
|
||
|
|
source_id=source_id,
|
||
|
|
metadata=metadata,
|
||
|
|
)
|
||
|
|
|
||
|
|
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
|
||
|
|
|
||
|
|
async def publish_retry_needed(
|
||
|
|
self,
|
||
|
|
workflow_run_id: int,
|
||
|
|
reason: str,
|
||
|
|
campaign_id: Optional[int] = None,
|
||
|
|
queued_run_id: Optional[int] = None,
|
||
|
|
metadata: Optional[Dict] = None,
|
||
|
|
):
|
||
|
|
"""Publish retry needed event."""
|
||
|
|
event = RetryNeededEvent(
|
||
|
|
campaign_id=campaign_id or 0,
|
||
|
|
workflow_run_id=workflow_run_id,
|
||
|
|
queued_run_id=queued_run_id or 0,
|
||
|
|
reason=reason,
|
||
|
|
metadata=metadata or {},
|
||
|
|
)
|
||
|
|
|
||
|
|
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
f"Published retry event for workflow_run {workflow_run_id}, "
|
||
|
|
f"reason: {reason}, campaign: {campaign_id}"
|
||
|
|
)
|
||
|
|
|
||
|
|
async def publish_campaign_completed(
|
||
|
|
self,
|
||
|
|
campaign_id: int,
|
||
|
|
total_rows: int,
|
||
|
|
processed_rows: int,
|
||
|
|
failed_rows: int,
|
||
|
|
duration_seconds: Optional[float] = None,
|
||
|
|
):
|
||
|
|
"""Publish campaign completed event."""
|
||
|
|
event = CampaignCompletedEvent(
|
||
|
|
campaign_id=campaign_id,
|
||
|
|
total_rows=total_rows,
|
||
|
|
processed_rows=processed_rows,
|
||
|
|
failed_rows=failed_rows,
|
||
|
|
duration_seconds=duration_seconds,
|
||
|
|
)
|
||
|
|
|
||
|
|
await self.redis.publish(RedisChannel.CAMPAIGN_EVENTS.value, event.to_json())
|
||
|
|
|
||
|
|
|
||
|
|
# Global publisher instance with lazy Redis connection
|
||
|
|
async def get_campaign_event_publisher() -> CampaignEventPublisher:
|
||
|
|
"""Get or create the campaign event publisher."""
|
||
|
|
global _campaign_publisher
|
||
|
|
global _campaign_redis_client
|
||
|
|
|
||
|
|
if "_campaign_publisher" not in globals():
|
||
|
|
_campaign_redis_client = await aioredis.from_url(
|
||
|
|
REDIS_URL, decode_responses=True
|
||
|
|
)
|
||
|
|
_campaign_publisher = CampaignEventPublisher(_campaign_redis_client)
|
||
|
|
|
||
|
|
return _campaign_publisher
|