fix(connectors): properly commit Google Drive document type changes

- Return file metadata from content_extractor for indexer to use
- Update document type and metadata in indexer after processing
- Explicitly commit changes to database
- Ensures documents are properly marked as GOOGLE_DRIVE_CONNECTOR type
This commit is contained in:
CREDO23 2025-12-28 17:15:29 +02:00
parent 9f1fd20944
commit b2b891e4d7
2 changed files with 44 additions and 37 deletions

View file

@ -29,7 +29,7 @@ async def download_and_process_file(
session: AsyncSession, session: AsyncSession,
task_logger: TaskLoggingService, task_logger: TaskLoggingService,
log_entry: Log, log_entry: Log,
) -> tuple[Any, str | None]: ) -> tuple[Any, str | None, dict[str, Any] | None]:
""" """
Download Google Drive file and process using Surfsense's existing infrastructure. Download Google Drive file and process using Surfsense's existing infrastructure.
@ -45,7 +45,7 @@ async def download_and_process_file(
log_entry: Log entry for tracking log_entry: Log entry for tracking
Returns: Returns:
Tuple of (Document object if successful, error message if failed) Tuple of (Document object if successful, error message if failed, file metadata dict)
""" """
file_id = file.get("id") file_id = file.get("id")
file_name = file.get("name", "Unknown") file_name = file.get("name", "Unknown")
@ -53,7 +53,7 @@ async def download_and_process_file(
# Skip folders and shortcuts # Skip folders and shortcuts
if should_skip_file(mime_type): if should_skip_file(mime_type):
return None, f"Skipping {mime_type}" return None, f"Skipping {mime_type}", None
logger.info(f"Downloading file: {file_name} ({mime_type})") logger.info(f"Downloading file: {file_name} ({mime_type})")
@ -104,42 +104,27 @@ async def download_and_process_file(
log_entry=log_entry, log_entry=log_entry,
) )
# Step 3: Update document type to GOOGLE_DRIVE_CONNECTOR and add metadata # Note: Document type update happens in the indexer after this returns
if document: # to ensure proper session management and commit timing
from app.db import DocumentType
# Store original file type in metadata before changing document_type # Prepare file metadata for the indexer to use
original_type = document.document_type file_metadata = {
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
}
# Update document type to mark it as from Google Drive # If it was a Google Workspace file, note the export format
document.document_type = DocumentType.GOOGLE_DRIVE_CONNECTOR if is_google_workspace_file(mime_type):
file_metadata["exported_as"] = "pdf"
# Add Google Drive specific metadata file_metadata["original_workspace_type"] = mime_type.split(".")[-1] # e.g., "document", "spreadsheet"
if not document.metadata:
document.metadata = {}
document.metadata.update({
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
"original_document_type": original_type,
"source_connector": "google_drive",
})
# If it was a Google Workspace file, note the export format
if is_google_workspace_file(mime_type):
document.metadata["exported_as"] = "pdf"
document.metadata["original_workspace_type"] = mime_type.split(".")[-1] # e.g., "document", "spreadsheet"
await session.flush() # Persist the changes
logger.info(f"Updated document type to GOOGLE_DRIVE_CONNECTOR for {file_name}")
# process_file_in_background returns None on duplicate/error, Document on success # process_file_in_background returns None on duplicate/error, Document on success
return document, None return document, None, file_metadata
except Exception as e: except Exception as e:
logger.warning(f"Failed to process {file_name}: {e!s}") logger.warning(f"Failed to process {file_name}: {e!s}")
return None, str(e) return None, str(e), None
finally: finally:
# Cleanup temp file (if process_file_in_background didn't already delete it) # Cleanup temp file (if process_file_in_background didn't already delete it)

View file

@ -388,7 +388,7 @@ async def _process_single_file(
# Download and process using Surfsense's existing infrastructure # Download and process using Surfsense's existing infrastructure
# This handles: markdown, audio, PDFs, Office docs, images, etc. # This handles: markdown, audio, PDFs, Office docs, images, etc.
# It also handles: deduplication, chunking, summarization, embedding # It also handles: deduplication, chunking, summarization, embedding
document, error = await download_and_process_file( document, error, file_metadata = await download_and_process_file(
client=drive_client, client=drive_client,
file=file, file=file,
search_space_id=search_space_id, search_space_id=search_space_id,
@ -407,7 +407,28 @@ async def _process_single_file(
) )
return 0, 1 return 0, 1
if document: if document and file_metadata:
# Update document type to GOOGLE_DRIVE_CONNECTOR and add metadata
original_type = document.document_type
document.document_type = DocumentType.GOOGLE_DRIVE_CONNECTOR
# Add Google Drive specific metadata
if not document.metadata:
document.metadata = {}
document.metadata.update({
**file_metadata,
"original_document_type": original_type,
"source_connector": "google_drive",
})
# Commit the document type and metadata changes
await session.commit()
logger.info(
f"Updated document {document.id} to GOOGLE_DRIVE_CONNECTOR type with metadata"
)
# Successfully indexed # Successfully indexed
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
@ -416,6 +437,7 @@ async def _process_single_file(
"status": "indexed", "status": "indexed",
"document_id": document.id, "document_id": document.id,
"file_name": file_name, "file_name": file_name,
"document_type": DocumentType.GOOGLE_DRIVE_CONNECTOR,
}, },
) )
return 1, 0 return 1, 0