From 65b79f37053edc054daeec5ba9f606d2ed70b03a Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Tue, 3 Feb 2026 22:57:01 +0530 Subject: [PATCH] feat: enhance Google Drive connector with file MIME type file detection and content based detection as fallback --- .../composio_google_drive_connector.py | 120 ++++++++++++++---- .../app/services/composio_service.py | 77 ++++++++++- 2 files changed, 173 insertions(+), 24 deletions(-) diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 369f5d8b3..93d7f360a 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -179,13 +179,14 @@ class ComposioGoogleDriveConnector(ComposioConnector): ) async def get_drive_file_content( - self, file_id: str + self, file_id: str, original_mime_type: str | None = None ) -> tuple[bytes | None, str | None]: """ Download file content from Google Drive via Composio. Args: file_id: Google Drive file ID. + original_mime_type: Original MIME type (used to detect Google Workspace files for export). Returns: Tuple of (file content bytes, error message). @@ -200,6 +201,31 @@ class ComposioGoogleDriveConnector(ComposioConnector): connected_account_id=connected_account_id, entity_id=entity_id, file_id=file_id, + original_mime_type=original_mime_type, + ) + + async def get_file_metadata( + self, file_id: str + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get metadata for a specific file from Google Drive. + + Args: + file_id: The ID of the file to get metadata for. + + Returns: + Tuple of (metadata dict, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_file_metadata( + connected_account_id=connected_account_id, + entity_id=entity_id, + file_id=file_id, ) async def get_drive_start_page_token(self) -> tuple[str | None, str | None]: @@ -292,8 +318,18 @@ async def _process_file_content( if isinstance(content, str): content = content.encode("utf-8") - # Check if this is a binary file - if _is_binary_file(file_name, mime_type): + # Check if this is a binary file based on extension or MIME type + is_binary = _is_binary_file(file_name, mime_type) + + # Content-based binary detection as fallback + # This catches PDFs and other binary files even if MIME type is missing/incorrect + if not is_binary and content: + has_pdf_magic = content[:4] == b"%PDF" + has_null_bytes = b"\x00" in content[:1000] + if has_pdf_magic or has_null_bytes: + is_binary = True + + if is_binary: # Use ETL service for binary files (PDF, Office docs, etc.) temp_file_path = None try: @@ -316,7 +352,7 @@ async def _process_file_content( return extracted_text else: # Fallback if extraction fails - logger.warning(f"Could not extract text from binary file {file_name}") + logger.warning(f"ETL returned empty for binary file {file_name}") return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" except Exception as e: @@ -329,8 +365,8 @@ async def _process_file_content( if temp_file_path and os.path.exists(temp_file_path): try: os.unlink(temp_file_path) - except Exception as e: - logger.debug(f"Could not delete temp file {temp_file_path}: {e}") + except Exception: + pass else: # Text file - try to decode as UTF-8 try: @@ -372,9 +408,11 @@ async def _extract_text_with_etl( from logging import ERROR, getLogger etl_service = config.ETL_SERVICE + logger.debug(f"[_extract_text_with_etl] START - file_path={file_path}, file_name={file_name}, etl_service={etl_service}") try: if etl_service == "UNSTRUCTURED": + logger.debug("[_extract_text_with_etl] Using UNSTRUCTURED ETL") from langchain_unstructured import UnstructuredLoader from app.utils.document_converters import convert_document_to_markdown @@ -390,11 +428,16 @@ async def _extract_text_with_etl( ) docs = await loader.aload() + logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED loaded {len(docs) if docs else 0} docs") if docs: - return await convert_document_to_markdown(docs) + result = await convert_document_to_markdown(docs) + logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED result: {len(result) if result else 0} chars") + return result + logger.debug("[_extract_text_with_etl] UNSTRUCTURED returned no docs") return None elif etl_service == "LLAMACLOUD": + logger.debug("[_extract_text_with_etl] Using LLAMACLOUD ETL") from app.tasks.document_processors.file_processors import ( parse_with_llamacloud_retry, ) @@ -413,11 +456,16 @@ async def _extract_text_with_etl( markdown_documents = await result.aget_markdown_documents( split_by_page=False ) + logger.debug(f"[_extract_text_with_etl] LLAMACLOUD got {len(markdown_documents) if markdown_documents else 0} markdown docs") if markdown_documents: - return markdown_documents[0].text + text = markdown_documents[0].text + logger.debug(f"[_extract_text_with_etl] LLAMACLOUD result: {len(text) if text else 0} chars") + return text + logger.debug("[_extract_text_with_etl] LLAMACLOUD returned no markdown docs") return None elif etl_service == "DOCLING": + logger.debug("[_extract_text_with_etl] Using DOCLING ETL") from app.services.docling_service import create_docling_service docling_service = create_docling_service() @@ -441,16 +489,21 @@ async def _extract_text_with_etl( result = await docling_service.process_document( file_path, file_name ) + logger.debug(f"[_extract_text_with_etl] DOCLING result keys: {list(result.keys()) if result else 'None'}") finally: pdfminer_logger.setLevel(original_level) - return result.get("content") + content = result.get("content") + logger.debug(f"[_extract_text_with_etl] DOCLING content: {len(content) if content else 0} chars") + return content else: - logger.warning(f"Unknown ETL service: {etl_service}") + logger.warning(f"[_extract_text_with_etl] Unknown ETL service: {etl_service}") return None except Exception as e: - logger.error(f"ETL extraction failed for {file_name}: {e!s}") + logger.error(f"[_extract_text_with_etl] ETL extraction EXCEPTION for {file_name}: {e!s}") + import traceback + logger.error(f"[_extract_text_with_etl] Traceback: {traceback.format_exc()}") return None @@ -979,7 +1032,7 @@ async def _index_composio_drive_full_scan( all_files.extend(folder_files[:max_files_per_folder]) logger.info(f"Found {len(folder_files)} files in folder {folder_name}") - # Add specifically selected files + # Add specifically selected files - fetch metadata to get mimeType for selected_file in selected_files: file_id = selected_file.get("id") file_name = selected_file.get("name", "Unknown") @@ -987,14 +1040,35 @@ async def _index_composio_drive_full_scan( if not file_id: continue - # Add file info (we'll fetch content later during indexing) - all_files.append( - { - "id": file_id, - "name": file_name, - "mimeType": "", # Will be determined later - } - ) + # Fetch file metadata to get proper mimeType + metadata, meta_error = await composio_connector.get_file_metadata(file_id) + if metadata and not meta_error: + all_files.append( + { + "id": file_id, + "name": metadata.get("name") or file_name, + "mimeType": metadata.get("mimeType", ""), + "modifiedTime": metadata.get("modifiedTime", ""), + "createdTime": metadata.get("createdTime", ""), + } + ) + logger.info( + f"Fetched metadata for UI-selected file: {file_name} " + f"(mimeType={metadata.get('mimeType', 'unknown')})" + ) + else: + # Fallback if metadata fetch fails - content-based detection will handle it + logger.warning( + f"Could not fetch metadata for file {file_name}: {meta_error}. " + f"Falling back to content-based detection." + ) + all_files.append( + { + "id": file_id, + "name": file_name, + "mimeType": "", # Content-based detection will handle this + } + ) else: # No selection specified - fetch all files (original behavior) page_token = None @@ -1128,8 +1202,10 @@ async def _process_single_drive_file( session, unique_identifier_hash ) - # Get file content - content, content_error = await composio_connector.get_drive_file_content(file_id) + # Get file content (pass mime_type for Google Workspace export handling) + content, content_error = await composio_connector.get_drive_file_content( + file_id, original_mime_type=mime_type + ) if content_error or not content: logger.warning(f"Could not get content for file {file_name}: {content_error}") diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index ad7841a8b..2aedfcb39 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -449,8 +449,11 @@ class ComposioService: """ try: # Composio uses snake_case for parameters + # IMPORTANT: Include 'fields' to ensure mimeType is returned in the response + # Without this, Google Drive API may not include mimeType for some files params = { "page_size": min(page_size, 100), + "fields": "files(id,name,mimeType,modifiedTime,createdTime),nextPageToken", } if folder_id: # List contents of a specific folder (exclude shortcuts - we don't have access to them) @@ -498,7 +501,11 @@ class ComposioService: return [], None, str(e) async def get_drive_file_content( - self, connected_account_id: str, entity_id: str, file_id: str + self, + connected_account_id: str, + entity_id: str, + file_id: str, + original_mime_type: str | None = None, ) -> tuple[bytes | None, str | None]: """ Download file content from Google Drive via Composio. @@ -507,10 +514,13 @@ class ComposioService: to a local directory, and the local file path is provided in the response. Response includes: file_path, file_name, size fields. + For Google Workspace files (Docs, Sheets, Slides), exports to PDF format. + Args: connected_account_id: Composio connected account ID. entity_id: The entity/user ID that owns the connected account. file_id: Google Drive file ID. + original_mime_type: Original MIME type of the file (used to detect Google Workspace files). Returns: Tuple of (file content bytes, error message). @@ -518,10 +528,19 @@ class ComposioService: from pathlib import Path try: + params = {"file_id": file_id} + + # For Google Workspace files, explicitly export as PDF + # This ensures consistent behavior and proper binary detection + if original_mime_type and original_mime_type.startswith( + "application/vnd.google-apps." + ): + params["mime_type"] = "application/pdf" + result = await self.execute_tool( connected_account_id=connected_account_id, tool_name="GOOGLEDRIVE_DOWNLOAD_FILE", - params={"file_id": file_id}, + params=params, entity_id=entity_id, ) @@ -651,6 +670,60 @@ class ComposioService: logger.error(f"Failed to get Drive file content: {e!s}") return None, str(e) + async def get_file_metadata( + self, connected_account_id: str, entity_id: str, file_id: str + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get metadata for a specific file from Google Drive. + + Args: + connected_account_id: Composio connected account ID. + entity_id: The entity/user ID that owns the connected account. + file_id: The ID of the file to get metadata for. + + Returns: + Tuple of (metadata dict, error message). + """ + try: + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLEDRIVE_GET_FILE_METADATA", + params={ + "file_id": file_id, + "fields": "id,name,mimeType,modifiedTime,createdTime,size", + }, + entity_id=entity_id, + ) + + if not result.get("success"): + return None, result.get("error", "Unknown error") + + data = result.get("data", {}) + + # Handle nested response structure + if isinstance(data, dict): + inner_data = data.get("data", data) + if isinstance(inner_data, dict): + # Extract metadata fields with fallbacks for camelCase/snake_case + metadata = { + "id": inner_data.get("id") or file_id, + "name": inner_data.get("name", ""), + "mimeType": inner_data.get("mimeType") + or inner_data.get("mime_type", ""), + "modifiedTime": inner_data.get("modifiedTime") + or inner_data.get("modified_time", ""), + "createdTime": inner_data.get("createdTime") + or inner_data.get("created_time", ""), + "size": inner_data.get("size", ""), + } + return metadata, None + + return None, "Could not extract metadata from response" + + except Exception as e: + logger.error(f"Failed to get file metadata: {e!s}") + return None, str(e) + async def get_drive_start_page_token( self, connected_account_id: str, entity_id: str ) -> tuple[str | None, str | None]: