diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py new file mode 100644 index 000000000..82b8d42b3 --- /dev/null +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -0,0 +1,122 @@ +""" +Content Extraction for Google Drive Files. + +Downloads files and delegates to Surfsense's existing file processors. +""" + +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import Log +from app.services.task_logging_service import TaskLoggingService + +from .client import GoogleDriveClient +from .file_types import get_export_mime_type, is_google_workspace_file, should_skip_file + +logger = logging.getLogger(__name__) + + +async def download_and_process_file( + client: GoogleDriveClient, + file: dict[str, Any], + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, +) -> tuple[Any, str | None]: + """ + Download Google Drive file and process using Surfsense's existing infrastructure. + + This is the ONLY function needed - it delegates everything to process_file_in_background. + + Args: + client: GoogleDriveClient instance + file: File metadata from Drive API + search_space_id: ID of the search space + user_id: ID of the user + session: Database session + task_logger: Task logging service + log_entry: Log entry for tracking + + Returns: + Tuple of (Document object if successful, error message if failed) + """ + file_id = file.get("id") + file_name = file.get("name", "Unknown") + mime_type = file.get("mimeType", "") + + # Skip folders and shortcuts + if should_skip_file(mime_type): + return None, f"Skipping {mime_type}" + + logger.info(f"Downloading file: {file_name} ({mime_type})") + + temp_file_path = None + try: + # Step 1: Download or export the file + if is_google_workspace_file(mime_type): + # Google Workspace files need export (as PDF to preserve formatting & images) + export_mime = get_export_mime_type(mime_type) + if not export_mime: + return None, f"Cannot export Google Workspace type: {mime_type}" + + logger.info(f"Exporting Google Workspace file as {export_mime}") + content_bytes, error = await client.export_google_file(file_id, export_mime) + if error: + return None, error + + # Set extension based on export format + extension = ".pdf" if export_mime == "application/pdf" else ".txt" + else: + # Regular files - download directly + content_bytes, error = await client.download_file(file_id) + if error: + return None, error + + # Preserve original file extension + extension = Path(file_name).suffix or ".bin" + + # Save to temporary file + with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file: + tmp_file.write(content_bytes) + temp_file_path = tmp_file.name + + # Step 2: Delegate to Surfsense's existing file processor + # This handles ALL file types: markdown, audio, PDFs, Office docs, images, etc. + from app.tasks.document_processors.file_processors import ( + process_file_in_background, + ) + + logger.info(f"Processing {file_name} with Surfsense's file processor") + result = await process_file_in_background( + file_path=temp_file_path, + filename=file_name, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + ) + + # process_file_in_background returns None on duplicate/error, Document on success + return result, None + + except Exception as e: + logger.warning(f"Failed to process {file_name}: {e!s}") + return None, str(e) + + finally: + # Cleanup temp file (if process_file_in_background didn't already delete it) + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception as e: + logger.debug(f"Could not delete temp file {temp_file_path}: {e}") + +