diff --git a/surfsense_backend/app/connectors/google_drive/client.py b/surfsense_backend/app/connectors/google_drive/client.py index 4e4240e91..fdbacfd69 100644 --- a/surfsense_backend/app/connectors/google_drive/client.py +++ b/surfsense_backend/app/connectors/google_drive/client.py @@ -140,6 +140,24 @@ class GoogleDriveClient: except Exception as e: return None, f"Error getting file metadata: {e!s}" + @staticmethod + def _sync_download_file(service, file_id: str) -> tuple[bytes | None, str | None]: + """Blocking download — runs on a worker thread via ``to_thread``.""" + try: + from googleapiclient.http import MediaIoBaseDownload + + request = service.files().get_media(fileId=file_id) + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request) + done = False + while not done: + _, done = downloader.next_chunk() + return fh.getvalue(), None + except HttpError as e: + return None, f"HTTP error downloading file: {e.resp.status}" + except Exception as e: + return None, f"Error downloading file: {e!s}" + async def download_file(self, file_id: str) -> tuple[bytes | None, str | None]: """ Download binary file content. @@ -150,27 +168,28 @@ class GoogleDriveClient: Returns: Tuple of (file content bytes, error message) """ + service = await self.get_service() + return await asyncio.to_thread(self._sync_download_file, service, file_id) + + @staticmethod + def _sync_download_file_to_disk( + service, file_id: str, dest_path: str, chunksize: int, + ) -> str | None: + """Blocking download-to-disk — runs on a worker thread via ``to_thread``.""" try: - service = await self.get_service() - request = service.files().get_media(fileId=file_id) - - import io - - fh = io.BytesIO() from googleapiclient.http import MediaIoBaseDownload - downloader = MediaIoBaseDownload(fh, request) - - done = False - while not done: - _, done = downloader.next_chunk() - - return fh.getvalue(), None - + request = service.files().get_media(fileId=file_id) + with open(dest_path, "wb") as fh: + downloader = MediaIoBaseDownload(fh, request, chunksize=chunksize) + done = False + while not done: + _, done = downloader.next_chunk() + return None except HttpError as e: - return None, f"HTTP error downloading file: {e.resp.status}" + return f"HTTP error downloading file: {e.resp.status}" except Exception as e: - return None, f"Error downloading file: {e!s}" + return f"Error downloading file: {e!s}" async def download_file_to_disk( self, file_id: str, dest_path: str, chunksize: int = 5 * 1024 * 1024, @@ -179,23 +198,27 @@ class GoogleDriveClient: Returns error message on failure, None on success. """ + service = await self.get_service() + return await asyncio.to_thread( + self._sync_download_file_to_disk, service, file_id, dest_path, chunksize, + ) + + @staticmethod + def _sync_export_google_file( + service, file_id: str, mime_type: str, + ) -> tuple[bytes | None, str | None]: + """Blocking export — runs on a worker thread via ``to_thread``.""" try: - service = await self.get_service() - request = service.files().get_media(fileId=file_id) - from googleapiclient.http import MediaIoBaseDownload - - with open(dest_path, "wb") as fh: - downloader = MediaIoBaseDownload(fh, request, chunksize=chunksize) - done = False - while not done: - _, done = downloader.next_chunk() - - return None - + content = ( + service.files().export(fileId=file_id, mimeType=mime_type).execute() + ) + if not isinstance(content, bytes): + content = content.encode("utf-8") + return content, None except HttpError as e: - return f"HTTP error downloading file: {e.resp.status}" + return None, f"HTTP error exporting file: {e.resp.status}" except Exception as e: - return f"Error downloading file: {e!s}" + return None, f"Error exporting file: {e!s}" async def export_google_file( self, file_id: str, mime_type: str @@ -210,23 +233,10 @@ class GoogleDriveClient: Returns: Tuple of (exported content as bytes, error message) """ - try: - service = await self.get_service() - content = ( - service.files().export(fileId=file_id, mimeType=mime_type).execute() - ) - - # Content is already bytes from the API - # Keep as bytes to support both text and binary formats (like PDF) - if not isinstance(content, bytes): - content = content.encode("utf-8") - - return content, None - - except HttpError as e: - return None, f"HTTP error exporting file: {e.resp.status}" - except Exception as e: - return None, f"Error exporting file: {e!s}" + service = await self.get_service() + return await asyncio.to_thread( + self._sync_export_google_file, service, file_id, mime_type, + ) async def create_file( self, diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 69f64d9ae..29c7e85a7 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -1,5 +1,6 @@ """Content extraction for Google Drive files.""" +import asyncio import logging import os import tempfile @@ -118,7 +119,7 @@ async def _parse_file_to_markdown(file_path: str, filename: str) -> str: ) if stt_service_type == "local": from app.services.stt_service import stt_service - result = stt_service.transcribe_file(file_path) + result = await asyncio.to_thread(stt_service.transcribe_file, file_path) text = result.get("text", "") else: with open(file_path, "rb") as audio_file: @@ -170,7 +171,7 @@ async def _parse_file_to_markdown(file_path: str, filename: str) -> str: from docling.document_converter import DocumentConverter converter = DocumentConverter() - result = converter.convert(file_path) + result = await asyncio.to_thread(converter.convert, file_path) return result.document.export_to_markdown() raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") diff --git a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py index 1183efa9f..9737ca3d2 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py @@ -1,7 +1,8 @@ """Tests for parallel download + indexing in the Google Drive indexer.""" import asyncio -from unittest.mock import AsyncMock, MagicMock +import time +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -586,3 +587,83 @@ async def test_selected_files_skip_rename_counting(selected_files_mocks): call_files = mock.call_args[1].get("files") if "files" in (mock.call_args[1] or {}) else mock.call_args[0][2] assert len(call_files) == 2 assert {f["id"] for f in call_files} == {"n1", "n2"} + + +# --------------------------------------------------------------------------- +# asyncio.to_thread verification — prove blocking calls run in parallel +# --------------------------------------------------------------------------- + +async def test_client_download_file_runs_in_thread_parallel(): + """Calling download_file concurrently via asyncio.gather should overlap + blocking work on separate threads, proving to_thread is effective. + + Strategy: patch _sync_download_file with a blocking time.sleep(0.2). + Launch 3 concurrent calls. Serial would take >=0.6s; parallel < 0.4s. + """ + from app.connectors.google_drive.client import GoogleDriveClient + + BLOCK_SECONDS = 0.2 + NUM_CALLS = 3 + + def _blocking_download(service, file_id): + time.sleep(BLOCK_SECONDS) + return b"fake-content", None + + client = GoogleDriveClient.__new__(GoogleDriveClient) + client.service = MagicMock() + client._service_lock = asyncio.Lock() + + with patch.object( + GoogleDriveClient, "_sync_download_file", staticmethod(_blocking_download), + ): + start = time.monotonic() + results = await asyncio.gather( + *(client.download_file(f"file-{i}") for i in range(NUM_CALLS)) + ) + elapsed = time.monotonic() - start + + for content, error in results: + assert content == b"fake-content" + assert error is None + + serial_minimum = BLOCK_SECONDS * NUM_CALLS + assert elapsed < serial_minimum, ( + f"Elapsed {elapsed:.2f}s >= serial minimum {serial_minimum:.2f}s — " + f"downloads are not running in parallel" + ) + + +async def test_client_export_google_file_runs_in_thread_parallel(): + """Same strategy for export_google_file — verify to_thread parallelism.""" + from app.connectors.google_drive.client import GoogleDriveClient + + BLOCK_SECONDS = 0.2 + NUM_CALLS = 3 + + def _blocking_export(service, file_id, mime_type): + time.sleep(BLOCK_SECONDS) + return b"exported", None + + client = GoogleDriveClient.__new__(GoogleDriveClient) + client.service = MagicMock() + client._service_lock = asyncio.Lock() + + with patch.object( + GoogleDriveClient, "_sync_export_google_file", staticmethod(_blocking_export), + ): + start = time.monotonic() + results = await asyncio.gather( + *(client.export_google_file(f"file-{i}", "application/pdf") + for i in range(NUM_CALLS)) + ) + elapsed = time.monotonic() - start + + for content, error in results: + assert content == b"exported" + assert error is None + + serial_minimum = BLOCK_SECONDS * NUM_CALLS + assert elapsed < serial_minimum, ( + f"Elapsed {elapsed:.2f}s >= serial minimum {serial_minimum:.2f}s — " + f"exports are not running in parallel" + )