From b2b891e4d746b0d2add1f7f3bf0fb6f341e9ee85 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Sun, 28 Dec 2025 17:15:29 +0200 Subject: [PATCH] fix(connectors): properly commit Google Drive document type changes - Return file metadata from content_extractor for indexer to use - Update document type and metadata in indexer after processing - Explicitly commit changes to database - Ensures documents are properly marked as GOOGLE_DRIVE_CONNECTOR type --- .../google_drive/content_extractor.py | 55 +++++++------------ .../google_drive_indexer.py | 26 ++++++++- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 88aca8f46..005e7b0ae 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -29,7 +29,7 @@ async def download_and_process_file( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, -) -> tuple[Any, str | None]: +) -> tuple[Any, str | None, dict[str, Any] | None]: """ Download Google Drive file and process using Surfsense's existing infrastructure. @@ -45,7 +45,7 @@ async def download_and_process_file( log_entry: Log entry for tracking Returns: - Tuple of (Document object if successful, error message if failed) + Tuple of (Document object if successful, error message if failed, file metadata dict) """ file_id = file.get("id") file_name = file.get("name", "Unknown") @@ -53,7 +53,7 @@ async def download_and_process_file( # Skip folders and shortcuts if should_skip_file(mime_type): - return None, f"Skipping {mime_type}" + return None, f"Skipping {mime_type}", None logger.info(f"Downloading file: {file_name} ({mime_type})") @@ -104,42 +104,27 @@ async def download_and_process_file( log_entry=log_entry, ) - # Step 3: Update document type to GOOGLE_DRIVE_CONNECTOR and add metadata - if document: - from app.db import DocumentType - - # Store original file type in metadata before changing document_type - original_type = document.document_type - - # Update document type to mark it as from Google Drive - document.document_type = DocumentType.GOOGLE_DRIVE_CONNECTOR - - # Add Google Drive specific metadata - if not document.metadata: - document.metadata = {} - - document.metadata.update({ - "google_drive_file_id": file_id, - "google_drive_file_name": file_name, - "google_drive_mime_type": mime_type, - "original_document_type": original_type, - "source_connector": "google_drive", - }) - - # If it was a Google Workspace file, note the export format - if is_google_workspace_file(mime_type): - document.metadata["exported_as"] = "pdf" - document.metadata["original_workspace_type"] = mime_type.split(".")[-1] # e.g., "document", "spreadsheet" - - await session.flush() # Persist the changes - logger.info(f"Updated document type to GOOGLE_DRIVE_CONNECTOR for {file_name}") - + # Note: Document type update happens in the indexer after this returns + # to ensure proper session management and commit timing + + # Prepare file metadata for the indexer to use + file_metadata = { + "google_drive_file_id": file_id, + "google_drive_file_name": file_name, + "google_drive_mime_type": mime_type, + } + + # If it was a Google Workspace file, note the export format + if is_google_workspace_file(mime_type): + file_metadata["exported_as"] = "pdf" + file_metadata["original_workspace_type"] = mime_type.split(".")[-1] # e.g., "document", "spreadsheet" + # process_file_in_background returns None on duplicate/error, Document on success - return document, None + return document, None, file_metadata except Exception as e: logger.warning(f"Failed to process {file_name}: {e!s}") - return None, str(e) + return None, str(e), None finally: # Cleanup temp file (if process_file_in_background didn't already delete it) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 9c4d446de..9ed295424 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -388,7 +388,7 @@ async def _process_single_file( # Download and process using Surfsense's existing infrastructure # This handles: markdown, audio, PDFs, Office docs, images, etc. # It also handles: deduplication, chunking, summarization, embedding - document, error = await download_and_process_file( + document, error, file_metadata = await download_and_process_file( client=drive_client, file=file, search_space_id=search_space_id, @@ -407,7 +407,28 @@ async def _process_single_file( ) return 0, 1 - if document: + if document and file_metadata: + # Update document type to GOOGLE_DRIVE_CONNECTOR and add metadata + original_type = document.document_type + document.document_type = DocumentType.GOOGLE_DRIVE_CONNECTOR + + # Add Google Drive specific metadata + if not document.metadata: + document.metadata = {} + + document.metadata.update({ + **file_metadata, + "original_document_type": original_type, + "source_connector": "google_drive", + }) + + # Commit the document type and metadata changes + await session.commit() + + logger.info( + f"Updated document {document.id} to GOOGLE_DRIVE_CONNECTOR type with metadata" + ) + # Successfully indexed await task_logger.log_task_progress( log_entry, @@ -416,6 +437,7 @@ async def _process_single_file( "status": "indexed", "document_id": document.id, "file_name": file_name, + "document_type": DocumentType.GOOGLE_DRIVE_CONNECTOR, }, ) return 1, 0