feat: improve Composio file processing and error handling

- Enhanced the handling of file content from Composio, supporting both binary and text files with appropriate processing methods.
- Introduced robust error logging and handling for file content extraction, ensuring better visibility into issues during processing.
- Updated the indexing logic to accommodate new content processing methods, improving overall reliability and user feedback on errors.
- Added temporary file handling for binary files to facilitate text extraction using the ETL service.
This commit is contained in:
Anish Sarkar 2026-01-23 05:28:03 +05:30
parent 7ec7ed5c3b
commit 42752bbeab
3 changed files with 360 additions and 22 deletions

View file

@ -1140,7 +1140,7 @@ async def _run_indexing_with_notifications(
f"Indexing completed successfully: {documents_processed} documents processed" f"Indexing completed successfully: {documents_processed} documents processed"
) )
# Update notification on success # Update notification on success (or partial success with errors)
if notification: if notification:
# Refresh notification to ensure it's not stale after timestamp update commit # Refresh notification to ensure it's not stale after timestamp update commit
await session.refresh(notification) await session.refresh(notification)
@ -1148,7 +1148,7 @@ async def _run_indexing_with_notifications(
session=session, session=session,
notification=notification, notification=notification,
indexed_count=documents_processed, indexed_count=documents_processed,
error_message=None, error_message=error_or_warning, # Show errors even if some documents were indexed
) )
await session.commit() # Commit to ensure Electric SQL syncs the notification update await session.commit() # Commit to ensure Electric SQL syncs the notification update
elif documents_processed > 0: elif documents_processed > 0:
@ -1172,7 +1172,7 @@ async def _run_indexing_with_notifications(
session=session, session=session,
notification=notification, notification=notification,
indexed_count=documents_processed, indexed_count=documents_processed,
error_message=None, error_message=error_or_warning, # Show errors even if some documents were indexed
) )
await session.commit() # Commit to ensure Electric SQL syncs the notification update await session.commit() # Commit to ensure Electric SQL syncs the notification update
else: else:

View file

@ -458,11 +458,76 @@ class ComposioService:
if not result.get("success"): if not result.get("success"):
return None, result.get("error", "Unknown error") return None, result.get("error", "Unknown error")
content = result.get("data") data = result.get("data")
if isinstance(content, str):
content = content.encode("utf-8") # Composio GOOGLEDRIVE_DOWNLOAD_FILE returns a dict with file info
# The actual content is in "downloaded_file_content" field
return content, None if isinstance(data, dict):
# Try known Composio response fields in order of preference
content = None
# Primary field from GOOGLEDRIVE_DOWNLOAD_FILE
if "downloaded_file_content" in data:
content = data["downloaded_file_content"]
# downloaded_file_content might itself be a dict with the actual content inside
if isinstance(content, dict):
# Try to extract actual content from nested dict
# Note: Composio nests downloaded_file_content inside another downloaded_file_content
actual_content = (
content.get("downloaded_file_content") or
content.get("content") or
content.get("data") or
content.get("file_content") or
content.get("body") or
content.get("text")
)
if actual_content is not None:
content = actual_content
else:
# Log structure for debugging
logger.warning(f"downloaded_file_content is dict with keys: {list(content.keys())}")
return None, f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}"
# Fallback fields for compatibility
elif "content" in data:
content = data["content"]
elif "file_content" in data:
content = data["file_content"]
elif "data" in data:
content = data["data"]
if content is None:
# Log available keys for debugging
logger.warning(f"Composio response dict keys: {list(data.keys())}")
return None, f"No file content found in Composio response. Available keys: {list(data.keys())}"
# Convert content to bytes
if isinstance(content, str):
# Check if it's base64 encoded
import base64
try:
# Try to decode as base64 first
content = base64.b64decode(content)
except Exception:
# If not base64, encode as UTF-8
content = content.encode("utf-8")
elif isinstance(content, bytes):
pass # Already bytes
elif isinstance(content, dict):
# Still a dict after all extraction attempts - log structure
logger.warning(f"Content still dict after extraction: {list(content.keys())}")
return None, f"Unexpected nested content structure: {list(content.keys())}"
else:
return None, f"Unexpected content type in Composio response: {type(content).__name__}"
return content, None
elif isinstance(data, str):
return data.encode("utf-8"), None
elif isinstance(data, bytes):
return data, None
elif data is None:
return None, "No data returned from Composio"
else:
return None, f"Unexpected data type from Composio: {type(data).__name__}"
except Exception as e: except Exception as e:
logger.error(f"Failed to get Drive file content: {e!s}") logger.error(f"Failed to get Drive file content: {e!s}")

View file

@ -8,7 +8,10 @@ to avoid circular import issues with the connector_indexers package.
""" """
import logging import logging
import os
import tempfile
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path
from typing import Any from typing import Any
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
@ -21,6 +24,7 @@ from app.connectors.composio_connector import ComposioConnector
from app.db import ( from app.db import (
Document, Document,
DocumentType, DocumentType,
Log,
SearchSourceConnector, SearchSourceConnector,
SearchSourceConnectorType, SearchSourceConnectorType,
) )
@ -81,6 +85,237 @@ async def update_connector_last_indexed(
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
# Binary file extensions that need file processor
BINARY_FILE_EXTENSIONS = {
".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx",
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp",
".zip", ".tar", ".gz", ".rar", ".7z",
".mp3", ".mp4", ".wav", ".avi", ".mov",
".exe", ".dll", ".so", ".bin",
}
# Text file extensions that can be decoded as UTF-8
TEXT_FILE_EXTENSIONS = {
".txt", ".md", ".markdown", ".json", ".xml", ".html", ".htm",
".css", ".js", ".ts", ".py", ".java", ".c", ".cpp", ".h",
".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
".sh", ".bash", ".zsh", ".fish",
".sql", ".csv", ".tsv",
".rst", ".tex", ".log",
}
def _is_binary_file(file_name: str, mime_type: str) -> bool:
"""Check if a file is binary based on extension or mime type."""
extension = Path(file_name).suffix.lower()
# Check extension first
if extension in BINARY_FILE_EXTENSIONS:
return True
if extension in TEXT_FILE_EXTENSIONS:
return False
# Check mime type
if mime_type:
if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")):
return True
if mime_type.startswith(("text/", "application/json", "application/xml")):
return False
# Office documents
if "spreadsheet" in mime_type or "document" in mime_type or "presentation" in mime_type:
return True
# Default to text for unknown types
return False
async def _process_file_content(
content: bytes | str,
file_name: str,
file_id: str,
mime_type: str,
search_space_id: int,
user_id: str,
session: AsyncSession,
task_logger: TaskLoggingService,
log_entry: Log,
processing_errors: list[str],
) -> str:
"""
Process file content and return markdown text.
For binary files (PDFs, images, etc.), uses Surfsense's ETL service.
For text files, decodes as UTF-8.
Args:
content: File content as bytes or string
file_name: Name of the file
file_id: Google Drive file ID
mime_type: MIME type of the file
search_space_id: Search space ID
user_id: User ID
session: Database session
task_logger: Task logging service
log_entry: Log entry for tracking
processing_errors: List to append errors to
Returns:
Markdown content string
"""
# Ensure content is bytes
if isinstance(content, str):
content = content.encode("utf-8")
# Check if this is a binary file
if _is_binary_file(file_name, mime_type):
# Use ETL service for binary files (PDF, Office docs, etc.)
temp_file_path = None
try:
# Get file extension
extension = Path(file_name).suffix or ".bin"
# Write to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file:
tmp_file.write(content)
temp_file_path = tmp_file.name
# Use the configured ETL service to extract text
extracted_text = await _extract_text_with_etl(
temp_file_path, file_name, task_logger, log_entry
)
if extracted_text:
return extracted_text
else:
# Fallback if extraction fails
logger.warning(f"Could not extract text from binary file {file_name}")
return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n"
except Exception as e:
error_msg = f"Error processing binary file {file_name}: {e!s}"
logger.error(error_msg)
processing_errors.append(error_msg)
return f"# {file_name}\n\n[Binary file - processing error]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n"
finally:
# Cleanup temp file
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception as e:
logger.debug(f"Could not delete temp file {temp_file_path}: {e}")
else:
# Text file - try to decode as UTF-8
try:
return content.decode("utf-8")
except UnicodeDecodeError:
# Try other encodings
for encoding in ["latin-1", "cp1252", "iso-8859-1"]:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
# If all encodings fail, treat as binary
error_msg = f"Could not decode text file {file_name} with any encoding"
logger.warning(error_msg)
processing_errors.append(error_msg)
return f"# {file_name}\n\n[File content could not be decoded]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n"
async def _extract_text_with_etl(
file_path: str,
file_name: str,
task_logger: TaskLoggingService,
log_entry: Log,
) -> str | None:
"""
Extract text from a file using the configured ETL service.
Args:
file_path: Path to the file
file_name: Name of the file
task_logger: Task logging service
log_entry: Log entry for tracking
Returns:
Extracted text as markdown, or None if extraction fails
"""
import warnings
from logging import ERROR, getLogger
etl_service = config.ETL_SERVICE
try:
if etl_service == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
if docs:
return await convert_document_to_markdown(docs)
return None
elif etl_service == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import parse_with_llamacloud_retry
# Estimate pages (rough estimate based on file size)
file_size = os.path.getsize(file_path)
estimated_pages = max(1, file_size // (80 * 1024))
result = await parse_with_llamacloud_retry(
file_path=file_path,
estimated_pages=estimated_pages,
task_logger=task_logger,
log_entry=log_entry,
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if markdown_documents:
return markdown_documents[0].text
return None
elif etl_service == "DOCLING":
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
# Suppress pdfminer warnings
pdfminer_logger = getLogger("pdfminer")
original_level = pdfminer_logger.level
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer")
warnings.filterwarnings("ignore", message=".*Cannot set gray non-stroke color.*")
warnings.filterwarnings("ignore", message=".*invalid float value.*")
pdfminer_logger.setLevel(ERROR)
try:
result = await docling_service.process_document(file_path, file_name)
finally:
pdfminer_logger.setLevel(original_level)
return result.get("content")
else:
logger.warning(f"Unknown ETL service: {etl_service}")
return None
except Exception as e:
logger.error(f"ETL extraction failed for {file_name}: {e!s}")
return None
# ============ Main indexer function ============ # ============ Main indexer function ============
@ -384,6 +619,7 @@ async def _index_composio_google_drive(
documents_indexed = 0 documents_indexed = 0
documents_skipped = 0 documents_skipped = 0
processing_errors = []
for file_info in all_files: for file_info in all_files:
try: try:
@ -422,11 +658,28 @@ async def _index_composio_google_drive(
markdown_content = f"# {file_name}\n\n" markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n" markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n" markdown_content += f"**Type:** {mime_type}\n"
elif isinstance(content, dict):
# Safety check: if content is still a dict, log error and use fallback
error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}"
logger.error(error_msg)
processing_errors.append(error_msg)
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
else: else:
try: # Process content based on file type
markdown_content = content.decode("utf-8") markdown_content = await _process_file_content(
except UnicodeDecodeError: content=content,
markdown_content = f"# {file_name}\n\n[Binary file content]\n" file_name=file_name,
file_id=file_id,
mime_type=mime_type,
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
processing_errors=processing_errors,
)
content_hash = generate_content_hash(markdown_content, search_space_id) content_hash = generate_content_hash(markdown_content, search_space_id)
@ -531,7 +784,9 @@ async def _index_composio_google_drive(
await session.commit() await session.commit()
except Exception as e: except Exception as e:
logger.error(f"Error processing Drive file: {e!s}", exc_info=True) error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1 documents_skipped += 1
continue continue
@ -549,16 +804,34 @@ async def _index_composio_google_drive(
"Successfully committed all Composio Google Drive document changes to database" "Successfully committed all Composio Google Drive document changes to database"
) )
await task_logger.log_task_success( # If there were processing errors, return them so notification can show them
log_entry, error_message = None
f"Successfully completed Google Drive indexing via Composio for connector {connector_id}", if processing_errors:
{ # Combine all errors into a single message
"documents_indexed": documents_indexed, if len(processing_errors) == 1:
"documents_skipped": documents_skipped, error_message = processing_errors[0]
}, else:
) error_message = f"Failed to process {len(processing_errors)} file(s). First error: {processing_errors[0]}"
await task_logger.log_task_failure(
log_entry,
f"Completed Google Drive indexing with {len(processing_errors)} error(s) for connector {connector_id}",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"errors": processing_errors,
},
)
else:
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google Drive indexing via Composio for connector {connector_id}",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
},
)
return documents_indexed, None return documents_indexed, error_message
except Exception as e: except Exception as e:
logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)