mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
180 lines
6.4 KiB
Python
180 lines
6.4 KiB
Python
import re
|
|
from typing import Any, Dict, List
|
|
|
|
import httpx
|
|
from loguru import logger
|
|
|
|
from api.db import db_client
|
|
from api.services.campaign.source_sync import CampaignSourceSyncService
|
|
from api.services.integrations.nango import NangoService
|
|
|
|
|
|
class GoogleSheetsSyncService(CampaignSourceSyncService):
|
|
"""Implementation for Google Sheets synchronization"""
|
|
|
|
def __init__(self):
|
|
self.nango_service = NangoService()
|
|
self.sheets_api_base = "https://sheets.googleapis.com/v4/spreadsheets"
|
|
|
|
async def sync_source_data(self, campaign_id: int) -> int:
|
|
"""
|
|
Fetches data from Google Sheets and creates queued_runs
|
|
"""
|
|
# Get campaign
|
|
campaign = await db_client.get_campaign_by_id(campaign_id)
|
|
if not campaign:
|
|
raise ValueError(f"Campaign {campaign_id} not found")
|
|
|
|
# 1. Get Google Sheets integration for the organization
|
|
integrations = await db_client.get_integrations_by_organization_id(
|
|
campaign.organization_id
|
|
)
|
|
integration = None
|
|
for intg in integrations:
|
|
if intg.provider == "google-sheet" and intg.is_active:
|
|
integration = intg
|
|
break
|
|
|
|
if not integration:
|
|
raise ValueError("Google Sheets integration not found or inactive")
|
|
|
|
# 2. Get OAuth token via Nango using the integration_id (which is the Nango connection ID)
|
|
token_data = await self.nango_service.get_access_token(
|
|
connection_id=integration.integration_id, provider_config_key="google-sheet"
|
|
)
|
|
access_token = token_data["credentials"]["access_token"]
|
|
|
|
# 3. Extract sheet ID from URL
|
|
sheet_id = self._extract_sheet_id(campaign.source_id)
|
|
|
|
# 4. Get sheet metadata (to find data range)
|
|
metadata = await self._get_sheet_metadata(sheet_id, access_token)
|
|
if not metadata.get("sheets"):
|
|
raise ValueError("No sheets found in the spreadsheet")
|
|
|
|
sheet_name = metadata["sheets"][0]["properties"]["title"]
|
|
|
|
# 5. Fetch all data from sheet
|
|
sheet_data = await self._fetch_sheet_data(
|
|
sheet_id,
|
|
f"{sheet_name}!A:Z", # Get all columns A-Z
|
|
access_token,
|
|
)
|
|
|
|
# 6. Convert to queued_runs
|
|
if not sheet_data or len(sheet_data) < 2:
|
|
logger.warning(f"No data found in sheet for campaign {campaign_id}")
|
|
return 0
|
|
|
|
headers = sheet_data[0] # First row is headers
|
|
rows = sheet_data[1:] # Rest is data
|
|
|
|
queued_runs = []
|
|
for idx, row_values in enumerate(rows, 1):
|
|
# Pad row to match headers length
|
|
padded_row = row_values + [""] * (len(headers) - len(row_values))
|
|
|
|
# Create context variables dict
|
|
context_vars = dict(zip(headers, padded_row))
|
|
|
|
# Skip if no phone number
|
|
if not context_vars.get("phone_number"):
|
|
logger.debug(f"Skipping row {idx}: no phone_number")
|
|
continue
|
|
|
|
# Generate unique source UUID
|
|
source_uuid = f"sheet_{sheet_id}_row_{idx}"
|
|
|
|
queued_runs.append(
|
|
{
|
|
"campaign_id": campaign_id,
|
|
"source_uuid": source_uuid,
|
|
"context_variables": context_vars,
|
|
"state": "queued",
|
|
}
|
|
)
|
|
|
|
# 7. Bulk insert
|
|
if queued_runs:
|
|
await db_client.bulk_create_queued_runs(queued_runs)
|
|
logger.info(
|
|
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
|
|
)
|
|
|
|
# 8. Update campaign total_rows
|
|
await db_client.update_campaign(
|
|
campaign_id=campaign_id,
|
|
total_rows=len(queued_runs),
|
|
source_sync_status="completed",
|
|
)
|
|
|
|
return len(queued_runs)
|
|
|
|
async def _fetch_sheet_data(
|
|
self, sheet_id: str, range: str, access_token: str
|
|
) -> List[List[str]]:
|
|
"""Fetch data from Google Sheets API"""
|
|
url = f"{self.sheets_api_base}/{sheet_id}/values/{range}"
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
return data.get("values", [])
|
|
|
|
async def _get_sheet_metadata(
|
|
self, sheet_id: str, access_token: str
|
|
) -> Dict[str, Any]:
|
|
"""Get sheet metadata including sheet names"""
|
|
url = f"{self.sheets_api_base}/{sheet_id}"
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
logger.debug(f"Fetching sheet metadata from URL: {url}")
|
|
logger.debug(f"Using sheet_id: {sheet_id}")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"HTTP error {e.response.status_code} for URL: {url}")
|
|
logger.error(f"Response body: {e.response.text}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error fetching sheet metadata: {e}")
|
|
raise
|
|
|
|
def _extract_sheet_id(self, sheet_url: str) -> str:
|
|
"""
|
|
Extract sheet ID from various Google Sheets URL formats:
|
|
- https://docs.google.com/spreadsheets/d/{id}/edit
|
|
- https://docs.google.com/spreadsheets/d/{id}/edit#gid=0
|
|
"""
|
|
pattern = r"/spreadsheets/d/([a-zA-Z0-9-_]+)"
|
|
match = re.search(pattern, sheet_url)
|
|
if match:
|
|
return match.group(1)
|
|
raise ValueError(f"Invalid Google Sheets URL: {sheet_url}")
|
|
|
|
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
|
|
"""Validate that required columns exist"""
|
|
required_columns = ["phone_number", "first_name", "last_name"]
|
|
|
|
# Fetch just the header row
|
|
sheet_id = self._extract_sheet_id(source_config["source_id"])
|
|
access_token = source_config["access_token"]
|
|
|
|
headers = await self._fetch_sheet_data(
|
|
sheet_id,
|
|
"A1:Z1", # Just first row
|
|
access_token,
|
|
)
|
|
|
|
if not headers:
|
|
return False
|
|
|
|
header_row = headers[0]
|
|
return all(col in header_row for col in required_columns)
|