From 42752bbeabea23f03e34821143d769b0ec83afc2 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 23 Jan 2026 05:28:03 +0530 Subject: [PATCH] feat: improve Composio file processing and error handling - Enhanced the handling of file content from Composio, supporting both binary and text files with appropriate processing methods. - Introduced robust error logging and handling for file content extraction, ensuring better visibility into issues during processing. - Updated the indexing logic to accommodate new content processing methods, improving overall reliability and user feedback on errors. - Added temporary file handling for binary files to facilitate text extraction using the ETL service. --- .../routes/search_source_connectors_routes.py | 6 +- .../app/services/composio_service.py | 75 ++++- .../app/tasks/composio_indexer.py | 301 +++++++++++++++++- 3 files changed, 360 insertions(+), 22 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 89cdd9f95..ed306c7bc 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1140,7 +1140,7 @@ async def _run_indexing_with_notifications( f"Indexing completed successfully: {documents_processed} documents processed" ) - # Update notification on success + # Update notification on success (or partial success with errors) if notification: # Refresh notification to ensure it's not stale after timestamp update commit await session.refresh(notification) @@ -1148,7 +1148,7 @@ async def _run_indexing_with_notifications( session=session, notification=notification, indexed_count=documents_processed, - error_message=None, + error_message=error_or_warning, # Show errors even if some documents were indexed ) await session.commit() # Commit to ensure Electric SQL syncs the notification update elif documents_processed > 0: @@ -1172,7 +1172,7 @@ async def _run_indexing_with_notifications( session=session, notification=notification, indexed_count=documents_processed, - error_message=None, + error_message=error_or_warning, # Show errors even if some documents were indexed ) await session.commit() # Commit to ensure Electric SQL syncs the notification update else: diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 5a6148533..1173cfb6a 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -458,11 +458,76 @@ class ComposioService: if not result.get("success"): return None, result.get("error", "Unknown error") - content = result.get("data") - if isinstance(content, str): - content = content.encode("utf-8") - - return content, None + data = result.get("data") + + # Composio GOOGLEDRIVE_DOWNLOAD_FILE returns a dict with file info + # The actual content is in "downloaded_file_content" field + if isinstance(data, dict): + # Try known Composio response fields in order of preference + content = None + + # Primary field from GOOGLEDRIVE_DOWNLOAD_FILE + if "downloaded_file_content" in data: + content = data["downloaded_file_content"] + # downloaded_file_content might itself be a dict with the actual content inside + if isinstance(content, dict): + # Try to extract actual content from nested dict + # Note: Composio nests downloaded_file_content inside another downloaded_file_content + actual_content = ( + content.get("downloaded_file_content") or + content.get("content") or + content.get("data") or + content.get("file_content") or + content.get("body") or + content.get("text") + ) + if actual_content is not None: + content = actual_content + else: + # Log structure for debugging + logger.warning(f"downloaded_file_content is dict with keys: {list(content.keys())}") + return None, f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}" + # Fallback fields for compatibility + elif "content" in data: + content = data["content"] + elif "file_content" in data: + content = data["file_content"] + elif "data" in data: + content = data["data"] + + if content is None: + # Log available keys for debugging + logger.warning(f"Composio response dict keys: {list(data.keys())}") + return None, f"No file content found in Composio response. Available keys: {list(data.keys())}" + + # Convert content to bytes + if isinstance(content, str): + # Check if it's base64 encoded + import base64 + try: + # Try to decode as base64 first + content = base64.b64decode(content) + except Exception: + # If not base64, encode as UTF-8 + content = content.encode("utf-8") + elif isinstance(content, bytes): + pass # Already bytes + elif isinstance(content, dict): + # Still a dict after all extraction attempts - log structure + logger.warning(f"Content still dict after extraction: {list(content.keys())}") + return None, f"Unexpected nested content structure: {list(content.keys())}" + else: + return None, f"Unexpected content type in Composio response: {type(content).__name__}" + + return content, None + elif isinstance(data, str): + return data.encode("utf-8"), None + elif isinstance(data, bytes): + return data, None + elif data is None: + return None, "No data returned from Composio" + else: + return None, f"Unexpected data type from Composio: {type(data).__name__}" except Exception as e: logger.error(f"Failed to get Drive file content: {e!s}") diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index f568d4134..6f40e6d66 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -8,7 +8,10 @@ to avoid circular import issues with the connector_indexers package. """ import logging +import os +import tempfile from datetime import UTC, datetime +from pathlib import Path from typing import Any from sqlalchemy.exc import SQLAlchemyError @@ -21,6 +24,7 @@ from app.connectors.composio_connector import ComposioConnector from app.db import ( Document, DocumentType, + Log, SearchSourceConnector, SearchSourceConnectorType, ) @@ -81,6 +85,237 @@ async def update_connector_last_indexed( logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") +# Binary file extensions that need file processor +BINARY_FILE_EXTENSIONS = { + ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", + ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp", + ".zip", ".tar", ".gz", ".rar", ".7z", + ".mp3", ".mp4", ".wav", ".avi", ".mov", + ".exe", ".dll", ".so", ".bin", +} + +# Text file extensions that can be decoded as UTF-8 +TEXT_FILE_EXTENSIONS = { + ".txt", ".md", ".markdown", ".json", ".xml", ".html", ".htm", + ".css", ".js", ".ts", ".py", ".java", ".c", ".cpp", ".h", + ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", + ".sh", ".bash", ".zsh", ".fish", + ".sql", ".csv", ".tsv", + ".rst", ".tex", ".log", +} + + +def _is_binary_file(file_name: str, mime_type: str) -> bool: + """Check if a file is binary based on extension or mime type.""" + extension = Path(file_name).suffix.lower() + + # Check extension first + if extension in BINARY_FILE_EXTENSIONS: + return True + if extension in TEXT_FILE_EXTENSIONS: + return False + + # Check mime type + if mime_type: + if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")): + return True + if mime_type.startswith(("text/", "application/json", "application/xml")): + return False + # Office documents + if "spreadsheet" in mime_type or "document" in mime_type or "presentation" in mime_type: + return True + + # Default to text for unknown types + return False + + +async def _process_file_content( + content: bytes | str, + file_name: str, + file_id: str, + mime_type: str, + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, + processing_errors: list[str], +) -> str: + """ + Process file content and return markdown text. + + For binary files (PDFs, images, etc.), uses Surfsense's ETL service. + For text files, decodes as UTF-8. + + Args: + content: File content as bytes or string + file_name: Name of the file + file_id: Google Drive file ID + mime_type: MIME type of the file + search_space_id: Search space ID + user_id: User ID + session: Database session + task_logger: Task logging service + log_entry: Log entry for tracking + processing_errors: List to append errors to + + Returns: + Markdown content string + """ + # Ensure content is bytes + if isinstance(content, str): + content = content.encode("utf-8") + + # Check if this is a binary file + if _is_binary_file(file_name, mime_type): + # Use ETL service for binary files (PDF, Office docs, etc.) + temp_file_path = None + try: + # Get file extension + extension = Path(file_name).suffix or ".bin" + + # Write to temp file + with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file: + tmp_file.write(content) + temp_file_path = tmp_file.name + + # Use the configured ETL service to extract text + extracted_text = await _extract_text_with_etl( + temp_file_path, file_name, task_logger, log_entry + ) + + if extracted_text: + return extracted_text + else: + # Fallback if extraction fails + logger.warning(f"Could not extract text from binary file {file_name}") + return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" + + except Exception as e: + error_msg = f"Error processing binary file {file_name}: {e!s}" + logger.error(error_msg) + processing_errors.append(error_msg) + return f"# {file_name}\n\n[Binary file - processing error]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" + finally: + # Cleanup temp file + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception as e: + logger.debug(f"Could not delete temp file {temp_file_path}: {e}") + else: + # Text file - try to decode as UTF-8 + try: + return content.decode("utf-8") + except UnicodeDecodeError: + # Try other encodings + for encoding in ["latin-1", "cp1252", "iso-8859-1"]: + try: + return content.decode(encoding) + except UnicodeDecodeError: + continue + + # If all encodings fail, treat as binary + error_msg = f"Could not decode text file {file_name} with any encoding" + logger.warning(error_msg) + processing_errors.append(error_msg) + return f"# {file_name}\n\n[File content could not be decoded]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" + + +async def _extract_text_with_etl( + file_path: str, + file_name: str, + task_logger: TaskLoggingService, + log_entry: Log, +) -> str | None: + """ + Extract text from a file using the configured ETL service. + + Args: + file_path: Path to the file + file_name: Name of the file + task_logger: Task logging service + log_entry: Log entry for tracking + + Returns: + Extracted text as markdown, or None if extraction fails + """ + import warnings + from logging import ERROR, getLogger + + etl_service = config.ETL_SERVICE + + try: + if etl_service == "UNSTRUCTURED": + from langchain_unstructured import UnstructuredLoader + + from app.utils.document_converters import convert_document_to_markdown + + loader = UnstructuredLoader( + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", + ) + + docs = await loader.aload() + if docs: + return await convert_document_to_markdown(docs) + return None + + elif etl_service == "LLAMACLOUD": + from app.tasks.document_processors.file_processors import parse_with_llamacloud_retry + + # Estimate pages (rough estimate based on file size) + file_size = os.path.getsize(file_path) + estimated_pages = max(1, file_size // (80 * 1024)) + + result = await parse_with_llamacloud_retry( + file_path=file_path, + estimated_pages=estimated_pages, + task_logger=task_logger, + log_entry=log_entry, + ) + + markdown_documents = await result.aget_markdown_documents(split_by_page=False) + if markdown_documents: + return markdown_documents[0].text + return None + + elif etl_service == "DOCLING": + from app.services.docling_service import create_docling_service + + docling_service = create_docling_service() + + # Suppress pdfminer warnings + pdfminer_logger = getLogger("pdfminer") + original_level = pdfminer_logger.level + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") + warnings.filterwarnings("ignore", message=".*Cannot set gray non-stroke color.*") + warnings.filterwarnings("ignore", message=".*invalid float value.*") + + pdfminer_logger.setLevel(ERROR) + + try: + result = await docling_service.process_document(file_path, file_name) + finally: + pdfminer_logger.setLevel(original_level) + + return result.get("content") + else: + logger.warning(f"Unknown ETL service: {etl_service}") + return None + + except Exception as e: + logger.error(f"ETL extraction failed for {file_name}: {e!s}") + return None + + # ============ Main indexer function ============ @@ -384,6 +619,7 @@ async def _index_composio_google_drive( documents_indexed = 0 documents_skipped = 0 + processing_errors = [] for file_info in all_files: try: @@ -422,11 +658,28 @@ async def _index_composio_google_drive( markdown_content = f"# {file_name}\n\n" markdown_content += f"**File ID:** {file_id}\n" markdown_content += f"**Type:** {mime_type}\n" + elif isinstance(content, dict): + # Safety check: if content is still a dict, log error and use fallback + error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}" + logger.error(error_msg) + processing_errors.append(error_msg) + markdown_content = f"# {file_name}\n\n" + markdown_content += f"**File ID:** {file_id}\n" + markdown_content += f"**Type:** {mime_type}\n" else: - try: - markdown_content = content.decode("utf-8") - except UnicodeDecodeError: - markdown_content = f"# {file_name}\n\n[Binary file content]\n" + # Process content based on file type + markdown_content = await _process_file_content( + content=content, + file_name=file_name, + file_id=file_id, + mime_type=mime_type, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + processing_errors=processing_errors, + ) content_hash = generate_content_hash(markdown_content, search_space_id) @@ -531,7 +784,9 @@ async def _index_composio_google_drive( await session.commit() except Exception as e: - logger.error(f"Error processing Drive file: {e!s}", exc_info=True) + error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}" + logger.error(error_msg, exc_info=True) + processing_errors.append(error_msg) documents_skipped += 1 continue @@ -549,16 +804,34 @@ async def _index_composio_google_drive( "Successfully committed all Composio Google Drive document changes to database" ) - await task_logger.log_task_success( - log_entry, - f"Successfully completed Google Drive indexing via Composio for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - }, - ) + # If there were processing errors, return them so notification can show them + error_message = None + if processing_errors: + # Combine all errors into a single message + if len(processing_errors) == 1: + error_message = processing_errors[0] + else: + error_message = f"Failed to process {len(processing_errors)} file(s). First error: {processing_errors[0]}" + await task_logger.log_task_failure( + log_entry, + f"Completed Google Drive indexing with {len(processing_errors)} error(s) for connector {connector_id}", + { + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "errors": processing_errors, + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google Drive indexing via Composio for connector {connector_id}", + { + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + }, + ) - return documents_indexed, None + return documents_indexed, error_message except Exception as e: logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)