mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
feat: add csv upload functionality chore: remove redundant arq-worker from docker-compose
140 lines
4.9 KiB
Python
140 lines
4.9 KiB
Python
import csv
|
|
import hashlib
|
|
from io import StringIO
|
|
from typing import Any, Dict, List
|
|
|
|
import httpx
|
|
from loguru import logger
|
|
|
|
from api.db import db_client
|
|
from api.services.campaign.source_sync import CampaignSourceSyncService
|
|
from api.services.storage import storage_fs
|
|
|
|
|
|
class CSVSyncService(CampaignSourceSyncService):
|
|
"""Implementation for CSV file synchronization"""
|
|
|
|
async def sync_source_data(self, campaign_id: int) -> int:
|
|
"""
|
|
Fetches data from CSV file in S3/MinIO and creates queued_runs
|
|
"""
|
|
# Get campaign
|
|
campaign = await db_client.get_campaign_by_id(campaign_id)
|
|
if not campaign:
|
|
raise ValueError(f"Campaign {campaign_id} not found")
|
|
|
|
# 1. Get download URL using internal endpoint (for container-to-container access)
|
|
file_key = campaign.source_id
|
|
signed_url = await storage_fs.aget_signed_url(
|
|
file_key, expiration=3600, use_internal_endpoint=True
|
|
)
|
|
|
|
if not signed_url:
|
|
raise ValueError(f"Failed to generate download URL for file: {file_key}")
|
|
|
|
# 2. Download CSV file
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(signed_url)
|
|
response.raise_for_status()
|
|
csv_content = response.text
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"Failed to download CSV file: {e} for url: {signed_url}")
|
|
raise ValueError(f"Failed to download CSV file from storage: {str(e)}")
|
|
|
|
# 3. Parse CSV
|
|
csv_data = self._parse_csv(csv_content)
|
|
|
|
if not csv_data or len(csv_data) < 2:
|
|
logger.warning(f"No data found in CSV for campaign {campaign_id}")
|
|
return 0
|
|
|
|
headers = csv_data[0] # First row is headers
|
|
rows = csv_data[1:] # Rest is data
|
|
|
|
# 4. Create hash of file_key for consistent source_uuid prefix
|
|
file_hash = hashlib.md5(file_key.encode()).hexdigest()[:8]
|
|
|
|
# 5. Convert to queued_runs
|
|
queued_runs = []
|
|
for idx, row_values in enumerate(rows, 1):
|
|
# Pad row to match headers length
|
|
padded_row = row_values + [""] * (len(headers) - len(row_values))
|
|
|
|
# Create context variables dict
|
|
context_vars = dict(zip(headers, padded_row))
|
|
|
|
# Skip if no phone number
|
|
if not context_vars.get("phone_number"):
|
|
logger.debug(f"Skipping row {idx}: no phone_number")
|
|
continue
|
|
|
|
# Generate unique source UUID: csv_{hash(source_id)}_row_{idx}
|
|
source_uuid = f"csv_{file_hash}_row_{idx}"
|
|
|
|
queued_runs.append(
|
|
{
|
|
"campaign_id": campaign_id,
|
|
"source_uuid": source_uuid,
|
|
"context_variables": context_vars,
|
|
"state": "queued",
|
|
}
|
|
)
|
|
|
|
# 6. Bulk insert
|
|
if queued_runs:
|
|
await db_client.bulk_create_queued_runs(queued_runs)
|
|
logger.info(
|
|
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
|
|
)
|
|
|
|
# 7. Update campaign total_rows
|
|
await db_client.update_campaign(
|
|
campaign_id=campaign_id,
|
|
total_rows=len(queued_runs),
|
|
source_sync_status="completed",
|
|
)
|
|
|
|
return len(queued_runs)
|
|
|
|
def _parse_csv(self, csv_content: str) -> List[List[str]]:
|
|
"""Parse CSV content into rows"""
|
|
try:
|
|
csv_file = StringIO(csv_content)
|
|
reader = csv.reader(csv_file)
|
|
return list(reader)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse CSV: {e}")
|
|
raise ValueError(f"Invalid CSV format: {str(e)}")
|
|
|
|
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
|
|
"""Validate that required columns exist in CSV"""
|
|
required_columns = ["phone_number", "first_name", "last_name"]
|
|
|
|
file_key = source_config.get("source_id")
|
|
if not file_key:
|
|
return False
|
|
|
|
# Get download URL using internal endpoint
|
|
signed_url = await storage_fs.aget_signed_url(
|
|
file_key, expiration=3600, use_internal_endpoint=True
|
|
)
|
|
if not signed_url:
|
|
return False
|
|
|
|
# Download just enough to get headers
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(signed_url)
|
|
response.raise_for_status()
|
|
|
|
# Get just the first line for headers
|
|
first_line = response.text.split("\n")[0]
|
|
csv_file = StringIO(first_line)
|
|
reader = csv.reader(csv_file)
|
|
headers = next(reader, [])
|
|
|
|
return all(col in headers for col in required_columns)
|
|
except Exception as e:
|
|
logger.error(f"Failed to validate CSV schema: {e}")
|
|
return False
|