feat: add csv upload functionality for OSS (#29)

feat: add csv upload functionality
chore: remove redundant arq-worker from docker-compose
This commit is contained in:
Abhishek 2025-10-09 17:54:31 +05:30 committed by GitHub
parent 2633ff0a2a
commit 3babb5ced6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 941 additions and 234 deletions

View file

@ -7,7 +7,7 @@ for final completion after 1 hour of inactivity and handles retry events.
from api.logging_config import setup_logging
logging_queue_listener = setup_logging()
setup_logging()
import asyncio
@ -495,7 +495,7 @@ class CampaignOrchestrator:
if self._pubsub:
try:
await self._pubsub.unsubscribe(RedisChannel.CAMPAIGN_EVENTS.value)
await self._pubsub.close()
await self._pubsub.aclose()
except Exception as e:
logger.error(f"Error closing pubsub: {e}")
@ -538,16 +538,12 @@ async def main():
if shutdown_task in done:
logger.info("Shutdown signal received, stopping orchestrator...")
orchestrator._running = False
# Wait for orchestrator to finish gracefully
# Cancel the orchestrator task immediately since it may be blocked
orchestrator_task.cancel()
try:
await asyncio.wait_for(orchestrator_task, timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Orchestrator shutdown timeout, cancelling...")
orchestrator_task.cancel()
try:
await orchestrator_task
except asyncio.CancelledError:
pass
await orchestrator_task
except asyncio.CancelledError:
logger.info("Orchestrator task cancelled successfully")
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")

View file

@ -31,19 +31,3 @@ class CampaignSourceSyncService(ABC):
f"Getting credentials for org {organization_id}, source {source_type}"
)
return {}
def get_sync_service(source_type: str) -> CampaignSourceSyncService:
"""Returns appropriate sync service based on source type"""
from .sources.google_sheets import GoogleSheetsSyncService
services = {
"google-sheet": GoogleSheetsSyncService,
# Add more as needed: "hubspot": HubSpotSyncService,
}
service_class = services.get(source_type)
if not service_class:
raise ValueError(f"Unknown source type: {source_type}")
return service_class()

View file

@ -0,0 +1,19 @@
from api.services.campaign.source_sync import CampaignSourceSyncService
from api.services.campaign.sources.csv import CSVSyncService
from api.services.campaign.sources.google_sheets import GoogleSheetsSyncService
def get_sync_service(source_type: str) -> CampaignSourceSyncService:
"""Returns appropriate sync service based on source type"""
services = {
"google-sheet": GoogleSheetsSyncService,
"csv": CSVSyncService,
# Add more as needed: "hubspot": HubSpotSyncService,
}
service_class = services.get(source_type)
if not service_class:
raise ValueError(f"Unknown source type: {source_type}")
return service_class()

View file

@ -0,0 +1,140 @@
import csv
import hashlib
from io import StringIO
from typing import Any, Dict, List
import httpx
from loguru import logger
from api.db import db_client
from api.services.campaign.source_sync import CampaignSourceSyncService
from api.services.storage import storage_fs
class CSVSyncService(CampaignSourceSyncService):
"""Implementation for CSV file synchronization"""
async def sync_source_data(self, campaign_id: int) -> int:
"""
Fetches data from CSV file in S3/MinIO and creates queued_runs
"""
# Get campaign
campaign = await db_client.get_campaign_by_id(campaign_id)
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
# 1. Get download URL using internal endpoint (for container-to-container access)
file_key = campaign.source_id
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
raise ValueError(f"Failed to generate download URL for file: {file_key}")
# 2. Download CSV file
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
csv_content = response.text
except httpx.HTTPError as e:
logger.error(f"Failed to download CSV file: {e} for url: {signed_url}")
raise ValueError(f"Failed to download CSV file from storage: {str(e)}")
# 3. Parse CSV
csv_data = self._parse_csv(csv_content)
if not csv_data or len(csv_data) < 2:
logger.warning(f"No data found in CSV for campaign {campaign_id}")
return 0
headers = csv_data[0] # First row is headers
rows = csv_data[1:] # Rest is data
# 4. Create hash of file_key for consistent source_uuid prefix
file_hash = hashlib.md5(file_key.encode()).hexdigest()[:8]
# 5. Convert to queued_runs
queued_runs = []
for idx, row_values in enumerate(rows, 1):
# Pad row to match headers length
padded_row = row_values + [""] * (len(headers) - len(row_values))
# Create context variables dict
context_vars = dict(zip(headers, padded_row))
# Skip if no phone number
if not context_vars.get("phone_number"):
logger.debug(f"Skipping row {idx}: no phone_number")
continue
# Generate unique source UUID: csv_{hash(source_id)}_row_{idx}
source_uuid = f"csv_{file_hash}_row_{idx}"
queued_runs.append(
{
"campaign_id": campaign_id,
"source_uuid": source_uuid,
"context_variables": context_vars,
"state": "queued",
}
)
# 6. Bulk insert
if queued_runs:
await db_client.bulk_create_queued_runs(queued_runs)
logger.info(
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
)
# 7. Update campaign total_rows
await db_client.update_campaign(
campaign_id=campaign_id,
total_rows=len(queued_runs),
source_sync_status="completed",
)
return len(queued_runs)
def _parse_csv(self, csv_content: str) -> List[List[str]]:
"""Parse CSV content into rows"""
try:
csv_file = StringIO(csv_content)
reader = csv.reader(csv_file)
return list(reader)
except Exception as e:
logger.error(f"Failed to parse CSV: {e}")
raise ValueError(f"Invalid CSV format: {str(e)}")
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
"""Validate that required columns exist in CSV"""
required_columns = ["phone_number", "first_name", "last_name"]
file_key = source_config.get("source_id")
if not file_key:
return False
# Get download URL using internal endpoint
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
return False
# Download just enough to get headers
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
# Get just the first line for headers
first_line = response.text.split("\n")[0]
csv_file = StringIO(first_line)
reader = csv.reader(csv_file)
headers = next(reader, [])
return all(col in headers for col in required_columns)
except Exception as e:
logger.error(f"Failed to validate CSV schema: {e}")
return False

View file

@ -33,13 +33,19 @@ class BaseFileSystem(ABC):
@abstractmethod
async def aget_signed_url(
self, file_path: str, expiration: int = 3600
self,
file_path: str,
expiration: int = 3600,
force_inline: bool = False,
use_internal_endpoint: bool = False,
) -> Optional[str]:
"""Generate a signed URL for temporary access to a file.
Args:
file_path: Path to the file
expiration: URL expiration time in seconds (default: 1 hour)
force_inline: Force inline display (browser preview vs download)
use_internal_endpoint: Use internal endpoint (for container-to-container access)
Returns:
Optional[str]: Signed URL if successful, None otherwise
@ -58,3 +64,24 @@ class BaseFileSystem(ABC):
Contains: size, created_at, modified_at, etag, etc.
"""
pass
@abstractmethod
async def aget_presigned_put_url(
self,
file_path: str,
expiration: int = 900,
content_type: str = "text/csv",
max_size: int = 10_485_760,
) -> Optional[str]:
"""Generate a presigned PUT URL for direct file upload.
Args:
file_path: Path where the file should be uploaded
expiration: URL expiration time in seconds (default: 15 minutes)
content_type: MIME type of the file (default: text/csv)
max_size: Maximum file size in bytes (default: 10MB)
Returns:
Optional[str]: Presigned PUT URL if successful, None otherwise
"""
pass

View file

@ -48,17 +48,28 @@ class MinioFileSystem(BaseFileSystem):
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
# Set anonymous download policy for local development
# This allows unsigned URLs to work
# Set public read/write policy for local development
# This allows:
# 1. Anonymous downloads (s3:GetObject)
# 2. Anonymous uploads (s3:PutObject) - bypasses presigned URL signature issues
# 3. List bucket contents (s3:ListBucket) for debugging
# Note: This is set on every initialization to ensure policy is correct
# WARNING: Only use in local development, not production!
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"],
}
},
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}"],
},
],
}
@ -97,14 +108,19 @@ class MinioFileSystem(BaseFileSystem):
return False
async def aget_signed_url(
self, file_path: str, expiration: int = 3600, force_inline: bool = False
self,
file_path: str,
expiration: int = 3600,
force_inline: bool = False,
use_internal_endpoint: bool = False,
) -> Optional[str]:
try:
# For MinIO in local development, return unsigned URLs
# This avoids signature mismatch issues when endpoint differs
# MinIO must be configured to allow anonymous read access
protocol = "https" if self.secure else "http"
url = f"{protocol}://{self.public_endpoint}/{self.bucket_name}/{file_path}"
endpoint = self.endpoint if use_internal_endpoint else self.public_endpoint
url = f"{protocol}://{endpoint}/{self.bucket_name}/{file_path}"
return url
except Exception as e:
logger.error(f"Error generating MinIO URL: {e}")
@ -128,3 +144,29 @@ class MinioFileSystem(BaseFileSystem):
}
except S3Error:
return None
async def aget_presigned_put_url(
self,
file_path: str,
expiration: int = 900,
content_type: str = "text/csv",
max_size: int = 10_485_760,
) -> Optional[str]:
"""Generate an unsigned URL for direct file upload.
For local MinIO development with anonymous upload enabled, we return
a simple unsigned URL instead of a presigned URL. This avoids signature
mismatch issues when the internal endpoint (minio:9000) differs from
the public endpoint (localhost:9000).
The bucket policy allows anonymous s3:PutObject, so no signature is needed.
"""
try:
# Return unsigned URL for anonymous upload
protocol = "https" if self.secure else "http"
url = f"{protocol}://{self.public_endpoint}/{self.bucket_name}/{file_path}"
logger.debug(f"Generated unsigned upload URL: {url}")
return url
except Exception as e:
logger.error(f"Error generating MinIO upload URL: {e}")
return None

View file

@ -45,7 +45,11 @@ class S3FileSystem(BaseFileSystem):
return False
async def aget_signed_url(
self, file_path: str, expiration: int = 3600, force_inline: bool = False
self,
file_path: str,
expiration: int = 3600,
force_inline: bool = False,
use_internal_endpoint: bool = False,
) -> Optional[str]:
"""Generate a presigned GET url for the given object.
@ -97,3 +101,28 @@ class S3FileSystem(BaseFileSystem):
}
except ClientError:
return None
async def aget_presigned_put_url(
self,
file_path: str,
expiration: int = 900,
content_type: str = "text/csv",
max_size: int = 10_485_760,
) -> Optional[str]:
"""Generate a presigned PUT URL for direct file upload."""
try:
async with self.session.client(
"s3", region_name=self.region_name
) as s3_client:
url = await s3_client.generate_presigned_url(
"put_object",
Params={
"Bucket": self.bucket_name,
"Key": file_path,
"ContentType": content_type,
},
ExpiresIn=expiration,
)
return url
except ClientError:
return None

View file

@ -426,16 +426,7 @@ async def setup_ari_client_supervisor(
This is a drop-in replacement for the asyncari-based function.
Uses AsyncARIClient instead of asyncari.
If the *ENABLE_ARI_STASIS* environment variable is not set to ``"true"``
(case-insensitive) the function returns ``None`` and no supervisor is
launched.
"""
if os.getenv("ENABLE_ARI_STASIS", "false").lower() != "true":
logger.info("ARI Stasis integration disabled via environment variable")
return None
logger.info("Starting ARI Client Supervisor with AsyncARIClient")
supervisor = _ARIClientManagerSupervisor(on_channel_start, on_channel_end)

View file

@ -16,7 +16,7 @@ import signal
import time
from typing import Dict, Optional
from api.constants import REDIS_URL
from api.constants import ENABLE_ARI_STASIS, REDIS_URL
# --- Add logging setup before importing loguru ---
from api.logging_config import setup_logging
@ -32,7 +32,7 @@ from api.services.telephony.stasis_event_protocol import (
parse_command,
)
logging_queue_listener = setup_logging()
setup_logging()
import redis.asyncio as aioredis
import redis.exceptions
@ -601,9 +601,13 @@ class ARIManager:
async def run(self):
"""Main run loop for ARI Manager."""
self._running = True
if not ENABLE_ARI_STASIS:
logger.info("ARI Stasis integration disabled via environment variable")
return
# Setup ARI connection with supervisor
self._running = True
try:
self._ari_client_supervisor = await setup_ari_client_supervisor(
self.on_channel_start, self.on_channel_end
@ -737,9 +741,6 @@ async def main():
pass
finally:
await redis.aclose()
# --- Ensure Axiom logging listener is stopped gracefully ---
if logging_queue_listener is not None:
logging_queue_listener.stop()
if __name__ == "__main__":