dograh/api/services/campaign/sources/csv.py

139 lines
4.6 KiB
Python
Raw Permalink Normal View History

import csv
import hashlib
from io import StringIO
from typing import List, Optional
import httpx
from loguru import logger
from api.db import db_client
from api.services.campaign.source_sync import (
CampaignSourceSyncService,
ValidationError,
ValidationResult,
)
from api.services.storage import storage_fs
class CSVSyncService(CampaignSourceSyncService):
"""Implementation for CSV file synchronization"""
async def _fetch_csv_data(self, file_key: str) -> List[List[str]]:
"""Download and parse CSV file from storage. Returns all rows including header."""
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
raise ValueError(f"Failed to access CSV file: {file_key}")
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
csv_content = response.text
except httpx.HTTPError as e:
logger.error(f"Failed to download CSV file: {e} for url: {signed_url}")
raise ValueError(f"Failed to download CSV file from storage: {str(e)}")
return self._parse_csv(csv_content)
async def validate_source(
self, source_id: str, organization_id: Optional[int] = None
) -> ValidationResult:
"""Validate a CSV source file for campaign creation."""
try:
csv_data = await self._fetch_csv_data(source_id)
except ValueError as e:
return ValidationResult(
is_valid=False,
error=ValidationError(message=str(e)),
)
if not csv_data or len(csv_data) < 2:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="CSV file must have a header row and at least one data row"
),
)
headers = csv_data[0]
data_rows = csv_data[1:]
return self.validate_source_data(headers, data_rows)
async def sync_source_data(self, campaign_id: int) -> int:
"""
Fetches data from CSV file in S3/MinIO and creates queued_runs
"""
# Get campaign
campaign = await db_client.get_campaign_by_id(campaign_id)
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
file_key = campaign.source_id
csv_data = await self._fetch_csv_data(file_key)
if not csv_data or len(csv_data) < 2:
logger.warning(f"No data found in CSV for campaign {campaign_id}")
return 0
headers = self.normalize_headers(csv_data[0])
rows = csv_data[1:]
# Create hash of file_key for consistent source_uuid prefix
file_hash = hashlib.md5(file_key.encode()).hexdigest()[:8]
# Convert to queued_runs
queued_runs = []
for idx, row_values in enumerate(rows, 1):
# Pad row to match headers length
padded_row = row_values + [""] * (len(headers) - len(row_values))
# Create context variables dict
context_vars = dict(zip(headers, padded_row))
# Skip if no phone number
if not context_vars.get("phone_number"):
logger.debug(f"Skipping row {idx}: no phone_number")
continue
# Generate unique source UUID: csv_{hash(source_id)}_row_{idx}
source_uuid = f"csv_{file_hash}_row_{idx}"
queued_runs.append(
{
"campaign_id": campaign_id,
"source_uuid": source_uuid,
"context_variables": context_vars,
"state": "queued",
}
)
# Bulk insert
if queued_runs:
await db_client.bulk_create_queued_runs(queued_runs)
logger.info(
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
)
# Update campaign total_rows
await db_client.update_campaign(
campaign_id=campaign_id,
total_rows=len(queued_runs),
source_sync_status="completed",
)
return len(queued_runs)
def _parse_csv(self, csv_content: str) -> List[List[str]]:
"""Parse CSV content into rows"""
try:
csv_file = StringIO(csv_content)
reader = csv.reader(csv_file)
return list(reader)
except Exception as e:
logger.error(f"Failed to parse CSV: {e}")
raise ValueError(f"Invalid CSV format: {str(e)}")