diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 005e7b0ae..04c48f47f 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -92,9 +92,26 @@ async def download_and_process_file( from app.tasks.document_processors.file_processors import ( process_file_in_background, ) + from app.db import DocumentType + + # Prepare connector info + connector_info = { + "type": DocumentType.GOOGLE_DRIVE_CONNECTOR, + "metadata": { + "google_drive_file_id": file_id, + "google_drive_file_name": file_name, + "google_drive_mime_type": mime_type, + "source_connector": "google_drive", + }, + } + + # If it was a Google Workspace file, note the export format + if is_google_workspace_file(mime_type): + connector_info["metadata"]["exported_as"] = "pdf" + connector_info["metadata"]["original_workspace_type"] = mime_type.split(".")[-1] logger.info(f"Processing {file_name} with Surfsense's file processor") - document = await process_file_in_background( + await process_file_in_background( file_path=temp_file_path, filename=file_name, search_space_id=search_space_id, @@ -102,25 +119,11 @@ async def download_and_process_file( session=session, task_logger=task_logger, log_entry=log_entry, + connector=connector_info, # Pass connector info ) - # Note: Document type update happens in the indexer after this returns - # to ensure proper session management and commit timing - - # Prepare file metadata for the indexer to use - file_metadata = { - "google_drive_file_id": file_id, - "google_drive_file_name": file_name, - "google_drive_mime_type": mime_type, - } - - # If it was a Google Workspace file, note the export format - if is_google_workspace_file(mime_type): - file_metadata["exported_as"] = "pdf" - file_metadata["original_workspace_type"] = mime_type.split(".")[-1] # e.g., "document", "spreadsheet" - - # process_file_in_background returns None on duplicate/error, Document on success - return document, None, file_metadata + # process_file_in_background doesn't return the document + return None, None, connector_info["metadata"] except Exception as e: logger.warning(f"Failed to process {file_name}: {e!s}") diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 190792f1a..a2899853e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -388,7 +388,8 @@ async def _process_single_file( # Download and process using Surfsense's existing infrastructure # This handles: markdown, audio, PDFs, Office docs, images, etc. # It also handles: deduplication, chunking, summarization, embedding - document, error, file_metadata = await download_and_process_file( + # Document type is set to GOOGLE_DRIVE_CONNECTOR during processing + _, error, _ = await download_and_process_file( client=drive_client, file=file, search_space_id=search_space_id, @@ -407,58 +408,9 @@ async def _process_single_file( ) return 0, 1 - if document and file_metadata: - # Refresh document from database to ensure it's attached to session - from app.db import Document - from sqlalchemy import select - - # Get fresh document from database - result = await session.execute( - select(Document).where(Document.id == document.id) - ) - document = result.scalar_one_or_none() - - if not document: - logger.error(f"Could not find document {document.id} in database") - return 0, 1 - - # Update document type to GOOGLE_DRIVE_CONNECTOR and add metadata - original_type = document.document_type - document.document_type = DocumentType.GOOGLE_DRIVE_CONNECTOR - - # Add Google Drive specific metadata - if not document.metadata: - document.metadata = {} - - document.metadata.update({ - **file_metadata, - "original_document_type": original_type, - "source_connector": "google_drive", - }) - - # Commit the document type and metadata changes - await session.commit() - - logger.info( - f"Updated document {document.id} to GOOGLE_DRIVE_CONNECTOR type with metadata" - ) - - # Successfully indexed - await task_logger.log_task_progress( - log_entry, - f"Successfully indexed: {file_name}", - { - "status": "indexed", - "document_id": document.id, - "file_name": file_name, - "document_type": DocumentType.GOOGLE_DRIVE_CONNECTOR, - }, - ) - return 1, 0 - else: - # Likely a duplicate or unsupported type - logger.info(f"No document created for {file_name} (duplicate or unsupported)") - return 0, 1 + # File was processed successfully (document type already set in processor) + logger.info(f"Successfully indexed Google Drive file: {file_name}") + return 1, 0 except Exception as e: logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index a32e75a32..61f484ae1 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -447,6 +447,24 @@ async def add_received_file_document_using_docling( ) from e +async def _update_document_from_connector( + document: Document | None, connector: dict | None, session: AsyncSession +) -> None: + """Helper to update document type and metadata from connector info.""" + if document and connector: + if "type" in connector: + document.document_type = connector["type"] + if "metadata" in connector: + # Merge with existing document_metadata (the actual column name) + if not document.document_metadata: + document.document_metadata = connector["metadata"] + else: + # Expand existing metadata with connector metadata + merged = {**document.document_metadata, **connector["metadata"]} + document.document_metadata = merged + await session.commit() + + async def process_file_in_background( file_path: str, filename: str, @@ -455,6 +473,7 @@ async def process_file_in_background( session: AsyncSession, task_logger: TaskLoggingService, log_entry: Log, + connector: dict | None = None, # Optional: {"type": "GOOGLE_DRIVE_CONNECTOR", "metadata": {...}} ): try: # Check if the file is a markdown or text file @@ -492,6 +511,9 @@ async def process_file_in_background( session, filename, markdown_content, search_space_id, user_id ) + # Update from connector if provided + await _update_document_from_connector(result, connector, session) + if result: await task_logger.log_task_success( log_entry, @@ -608,6 +630,9 @@ async def process_file_in_background( session, filename, transcribed_text, search_space_id, user_id ) + # Update from connector if provided + await _update_document_from_connector(result, connector, session) + if result: await task_logger.log_task_success( log_entry, @@ -753,6 +778,9 @@ async def process_file_in_background( session, filename, docs, search_space_id, user_id ) + # Update from connector if provided + await _update_document_from_connector(result, connector, session) + if result: # Update page usage after successful processing # allow_exceed=True because document was already created after passing initial check @@ -897,6 +925,9 @@ async def process_file_in_background( user_id, final_page_count, allow_exceed=True ) + # Update from connector if provided + await _update_document_from_connector(last_created_doc, connector, session) + await task_logger.log_task_success( log_entry, f"Successfully processed file with LlamaCloud: {filename}", @@ -1021,6 +1052,9 @@ async def process_file_in_background( user_id, final_page_count, allow_exceed=True ) + # Update from connector if provided + await _update_document_from_connector(doc_result, connector, session) + await task_logger.log_task_success( log_entry, f"Successfully processed file with Docling: {filename}",