dograh/api/services/campaign/sources/csv.py

141 lines
4.9 KiB
Python
Raw Permalink Normal View History

import csv
import hashlib
from io import StringIO
from typing import Any, Dict, List
import httpx
from loguru import logger
from api.db import db_client
from api.services.campaign.source_sync import CampaignSourceSyncService
from api.services.storage import storage_fs
class CSVSyncService(CampaignSourceSyncService):
"""Implementation for CSV file synchronization"""
async def sync_source_data(self, campaign_id: int) -> int:
"""
Fetches data from CSV file in S3/MinIO and creates queued_runs
"""
# Get campaign
campaign = await db_client.get_campaign_by_id(campaign_id)
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
# 1. Get download URL using internal endpoint (for container-to-container access)
file_key = campaign.source_id
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
raise ValueError(f"Failed to generate download URL for file: {file_key}")
# 2. Download CSV file
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
csv_content = response.text
except httpx.HTTPError as e:
logger.error(f"Failed to download CSV file: {e} for url: {signed_url}")
raise ValueError(f"Failed to download CSV file from storage: {str(e)}")
# 3. Parse CSV
csv_data = self._parse_csv(csv_content)
if not csv_data or len(csv_data) < 2:
logger.warning(f"No data found in CSV for campaign {campaign_id}")
return 0
headers = csv_data[0] # First row is headers
rows = csv_data[1:] # Rest is data
# 4. Create hash of file_key for consistent source_uuid prefix
file_hash = hashlib.md5(file_key.encode()).hexdigest()[:8]
# 5. Convert to queued_runs
queued_runs = []
for idx, row_values in enumerate(rows, 1):
# Pad row to match headers length
padded_row = row_values + [""] * (len(headers) - len(row_values))
# Create context variables dict
context_vars = dict(zip(headers, padded_row))
# Skip if no phone number
if not context_vars.get("phone_number"):
logger.debug(f"Skipping row {idx}: no phone_number")
continue
# Generate unique source UUID: csv_{hash(source_id)}_row_{idx}
source_uuid = f"csv_{file_hash}_row_{idx}"
queued_runs.append(
{
"campaign_id": campaign_id,
"source_uuid": source_uuid,
"context_variables": context_vars,
"state": "queued",
}
)
# 6. Bulk insert
if queued_runs:
await db_client.bulk_create_queued_runs(queued_runs)
logger.info(
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
)
# 7. Update campaign total_rows
await db_client.update_campaign(
campaign_id=campaign_id,
total_rows=len(queued_runs),
source_sync_status="completed",
)
return len(queued_runs)
def _parse_csv(self, csv_content: str) -> List[List[str]]:
"""Parse CSV content into rows"""
try:
csv_file = StringIO(csv_content)
reader = csv.reader(csv_file)
return list(reader)
except Exception as e:
logger.error(f"Failed to parse CSV: {e}")
raise ValueError(f"Invalid CSV format: {str(e)}")
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
"""Validate that required columns exist in CSV"""
required_columns = ["phone_number", "first_name", "last_name"]
file_key = source_config.get("source_id")
if not file_key:
return False
# Get download URL using internal endpoint
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
return False
# Download just enough to get headers
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
# Get just the first line for headers
first_line = response.text.split("\n")[0]
csv_file = StringIO(first_line)
reader = csv.reader(csv_file)
headers = next(reader, [])
return all(col in headers for col in required_columns)
except Exception as e:
logger.error(f"Failed to validate CSV schema: {e}")
return False