feat(connectors): add Google Drive content extraction using existing ETL

- Download files from Google Drive to temporary location
- Export Google Workspace files as PDF
- Delegate content extraction to existing process_file_in_background
- Reuse Surfsense's ETL services (Unstructured, LlamaCloud, Docling)
This commit is contained in:
CREDO23 2025-12-28 15:54:50 +02:00
parent 701c3409b3
commit 40304c6795

View file

@ -0,0 +1,122 @@
"""
Content Extraction for Google Drive Files.
Downloads files and delegates to Surfsense's existing file processors.
"""
import logging
import os
import tempfile
from pathlib import Path
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Log
from app.services.task_logging_service import TaskLoggingService
from .client import GoogleDriveClient
from .file_types import get_export_mime_type, is_google_workspace_file, should_skip_file
logger = logging.getLogger(__name__)
async def download_and_process_file(
client: GoogleDriveClient,
file: dict[str, Any],
search_space_id: int,
user_id: str,
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
) -> tuple[Any, str | None]:
"""
Download Google Drive file and process using Surfsense's existing infrastructure.
This is the ONLY function needed - it delegates everything to process_file_in_background.
Args:
client: GoogleDriveClient instance
file: File metadata from Drive API
search_space_id: ID of the search space
user_id: ID of the user
session: Database session
task_logger: Task logging service
log_entry: Log entry for tracking
Returns:
Tuple of (Document object if successful, error message if failed)
"""
file_id = file.get("id")
file_name = file.get("name", "Unknown")
mime_type = file.get("mimeType", "")
# Skip folders and shortcuts
if should_skip_file(mime_type):
return None, f"Skipping {mime_type}"
logger.info(f"Downloading file: {file_name} ({mime_type})")
temp_file_path = None
try:
# Step 1: Download or export the file
if is_google_workspace_file(mime_type):
# Google Workspace files need export (as PDF to preserve formatting & images)
export_mime = get_export_mime_type(mime_type)
if not export_mime:
return None, f"Cannot export Google Workspace type: {mime_type}"
logger.info(f"Exporting Google Workspace file as {export_mime}")
content_bytes, error = await client.export_google_file(file_id, export_mime)
if error:
return None, error
# Set extension based on export format
extension = ".pdf" if export_mime == "application/pdf" else ".txt"
else:
# Regular files - download directly
content_bytes, error = await client.download_file(file_id)
if error:
return None, error
# Preserve original file extension
extension = Path(file_name).suffix or ".bin"
# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file:
tmp_file.write(content_bytes)
temp_file_path = tmp_file.name
# Step 2: Delegate to Surfsense's existing file processor
# This handles ALL file types: markdown, audio, PDFs, Office docs, images, etc.
from app.tasks.document_processors.file_processors import (
process_file_in_background,
)
logger.info(f"Processing {file_name} with Surfsense's file processor")
result = await process_file_in_background(
file_path=temp_file_path,
filename=file_name,
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
)
# process_file_in_background returns None on duplicate/error, Document on success
return result, None
except Exception as e:
logger.warning(f"Failed to process {file_name}: {e!s}")
return None, str(e)
finally:
# Cleanup temp file (if process_file_in_background didn't already delete it)
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception as e:
logger.debug(f"Could not delete temp file {temp_file_path}: {e}")