dograh/api/services/campaign/campaign_event_protocol.py

259 lines
6.8 KiB
Python
Raw Permalink Normal View History

2025-09-09 14:37:32 +05:30
"""Campaign event protocol for orchestrator communication.
Defines message formats and helpers for campaign event publishing and handling.
"""
import json
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, Optional
class CampaignEventType(str, Enum):
"""Types of campaign events."""
# Batch processing events
BATCH_COMPLETED = "batch_completed"
BATCH_FAILED = "batch_failed"
# Sync events
SYNC_STARTED = "sync_started"
SYNC_COMPLETED = "sync_completed"
SYNC_FAILED = "sync_failed"
# Campaign lifecycle events
CAMPAIGN_STARTED = "campaign_started"
CAMPAIGN_PAUSED = "campaign_paused"
CAMPAIGN_RESUMED = "campaign_resumed"
CAMPAIGN_COMPLETED = "campaign_completed"
CAMPAIGN_FAILED = "campaign_failed"
# Retry events
RETRY_NEEDED = "retry_needed"
RETRY_SCHEDULED = "retry_scheduled"
RETRY_FAILED = "retry_failed"
class RetryReason(str, Enum):
"""Reasons for retry."""
BUSY = "busy"
NO_ANSWER = "no_answer"
VOICEMAIL = "voicemail"
FAILED = "failed"
ERROR = "error"
@dataclass
class BaseCampaignEvent:
"""Base class for all campaign events."""
type: str
campaign_id: int = 0
timestamp: Optional[str] = None
def __post_init__(self):
if self.timestamp is None:
from datetime import UTC, datetime
self.timestamp = datetime.now(UTC).isoformat()
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str):
return cls(**json.loads(data))
@dataclass
class BatchCompletedEvent(BaseCampaignEvent):
"""Event sent when a batch processing completes."""
type: str = CampaignEventType.BATCH_COMPLETED
processed_count: int = 0
failed_count: int = 0
batch_size: int = 0
metadata: Optional[Dict[str, Any]] = None
def __post_init__(self):
super().__post_init__()
if self.metadata is None:
self.metadata = {}
@dataclass
class BatchFailedEvent(BaseCampaignEvent):
"""Event sent when a batch processing fails."""
type: str = CampaignEventType.BATCH_FAILED
error: str = ""
processed_count: int = 0
metadata: Optional[Dict[str, Any]] = None
def __post_init__(self):
super().__post_init__()
if self.metadata is None:
self.metadata = {}
@dataclass
class SyncStartedEvent(BaseCampaignEvent):
"""Event sent when campaign source sync starts."""
type: str = CampaignEventType.SYNC_STARTED
source_type: str = ""
source_id: str = ""
@dataclass
class SyncCompletedEvent(BaseCampaignEvent):
"""Event sent when campaign source sync completes."""
type: str = CampaignEventType.SYNC_COMPLETED
total_rows: int = 0
source_type: str = ""
source_id: str = ""
metadata: Optional[Dict[str, Any]] = None
def __post_init__(self):
super().__post_init__()
if self.metadata is None:
self.metadata = {}
@dataclass
class SyncFailedEvent(BaseCampaignEvent):
"""Event sent when campaign source sync fails."""
type: str = CampaignEventType.SYNC_FAILED
error: str = ""
source_type: str = ""
source_id: str = ""
@dataclass
class CampaignStartedEvent(BaseCampaignEvent):
"""Event sent when a campaign starts."""
type: str = CampaignEventType.CAMPAIGN_STARTED
workflow_id: int = 0
total_rows: Optional[int] = None
@dataclass
class CampaignPausedEvent(BaseCampaignEvent):
"""Event sent when a campaign is paused."""
type: str = CampaignEventType.CAMPAIGN_PAUSED
processed_rows: int = 0
failed_rows: int = 0
@dataclass
class CampaignResumedEvent(BaseCampaignEvent):
"""Event sent when a campaign is resumed."""
type: str = CampaignEventType.CAMPAIGN_RESUMED
processed_rows: int = 0
failed_rows: int = 0
@dataclass
class CampaignCompletedEvent(BaseCampaignEvent):
"""Event sent when a campaign completes."""
type: str = CampaignEventType.CAMPAIGN_COMPLETED
total_rows: int = 0
processed_rows: int = 0
failed_rows: int = 0
duration_seconds: Optional[float] = None
@dataclass
class CampaignFailedEvent(BaseCampaignEvent):
"""Event sent when a campaign fails."""
type: str = CampaignEventType.CAMPAIGN_FAILED
error: str = ""
processed_rows: int = 0
failed_rows: int = 0
@dataclass
class RetryNeededEvent(BaseCampaignEvent):
"""Event sent when a call needs retry."""
type: str = CampaignEventType.RETRY_NEEDED
workflow_run_id: int = 0
queued_run_id: int = 0
reason: str = "" # RetryReason value
metadata: Optional[Dict[str, Any]] = None
def __post_init__(self):
super().__post_init__()
if self.metadata is None:
self.metadata = {}
@dataclass
class RetryScheduledEvent(BaseCampaignEvent):
"""Event sent when a retry is scheduled."""
type: str = CampaignEventType.RETRY_SCHEDULED
queued_run_id: int = 0
retry_run_id: int = 0
retry_count: int = 0
scheduled_for: str = "" # ISO timestamp
reason: str = "" # RetryReason value
@dataclass
class RetryFailedEvent(BaseCampaignEvent):
"""Event sent when max retries reached."""
type: str = CampaignEventType.RETRY_FAILED
queued_run_id: int = 0
retry_count: int = 0
last_reason: str = "" # RetryReason value
def parse_campaign_event(data: str) -> Any:
"""Parse a campaign event message."""
try:
parsed = json.loads(data)
event_type = parsed.get("type")
# Map event types to their classes
event_class_map = {
CampaignEventType.BATCH_COMPLETED: BatchCompletedEvent,
CampaignEventType.BATCH_FAILED: BatchFailedEvent,
CampaignEventType.SYNC_STARTED: SyncStartedEvent,
CampaignEventType.SYNC_COMPLETED: SyncCompletedEvent,
CampaignEventType.SYNC_FAILED: SyncFailedEvent,
CampaignEventType.CAMPAIGN_STARTED: CampaignStartedEvent,
CampaignEventType.CAMPAIGN_PAUSED: CampaignPausedEvent,
CampaignEventType.CAMPAIGN_RESUMED: CampaignResumedEvent,
CampaignEventType.CAMPAIGN_COMPLETED: CampaignCompletedEvent,
CampaignEventType.CAMPAIGN_FAILED: CampaignFailedEvent,
CampaignEventType.RETRY_NEEDED: RetryNeededEvent,
CampaignEventType.RETRY_SCHEDULED: RetryScheduledEvent,
CampaignEventType.RETRY_FAILED: RetryFailedEvent,
}
event_class = event_class_map.get(event_type)
if event_class:
return event_class(**parsed)
# Unknown event type
from loguru import logger
logger.warning(f"Unknown campaign event type: {event_type}")
return None
except Exception as e:
from loguru import logger
logger.error(f"Failed to parse campaign event: {e}, data: {data}")
return None