feat: enhance Google Drive connector with file MIME type file detection and content based detection as fallback

This commit is contained in:
Anish Sarkar 2026-02-03 22:57:01 +05:30
parent c4cf773bb0
commit 65b79f3705
2 changed files with 173 additions and 24 deletions

View file

@ -179,13 +179,14 @@ class ComposioGoogleDriveConnector(ComposioConnector):
)
async def get_drive_file_content(
self, file_id: str
self, file_id: str, original_mime_type: str | None = None
) -> tuple[bytes | None, str | None]:
"""
Download file content from Google Drive via Composio.
Args:
file_id: Google Drive file ID.
original_mime_type: Original MIME type (used to detect Google Workspace files for export).
Returns:
Tuple of (file content bytes, error message).
@ -200,6 +201,31 @@ class ComposioGoogleDriveConnector(ComposioConnector):
connected_account_id=connected_account_id,
entity_id=entity_id,
file_id=file_id,
original_mime_type=original_mime_type,
)
async def get_file_metadata(
self, file_id: str
) -> tuple[dict[str, Any] | None, str | None]:
"""
Get metadata for a specific file from Google Drive.
Args:
file_id: The ID of the file to get metadata for.
Returns:
Tuple of (metadata dict, error message).
"""
connected_account_id = await self.get_connected_account_id()
if not connected_account_id:
return None, "No connected account ID found"
entity_id = await self.get_entity_id()
service = await self._get_service()
return await service.get_file_metadata(
connected_account_id=connected_account_id,
entity_id=entity_id,
file_id=file_id,
)
async def get_drive_start_page_token(self) -> tuple[str | None, str | None]:
@ -292,8 +318,18 @@ async def _process_file_content(
if isinstance(content, str):
content = content.encode("utf-8")
# Check if this is a binary file
if _is_binary_file(file_name, mime_type):
# Check if this is a binary file based on extension or MIME type
is_binary = _is_binary_file(file_name, mime_type)
# Content-based binary detection as fallback
# This catches PDFs and other binary files even if MIME type is missing/incorrect
if not is_binary and content:
has_pdf_magic = content[:4] == b"%PDF"
has_null_bytes = b"\x00" in content[:1000]
if has_pdf_magic or has_null_bytes:
is_binary = True
if is_binary:
# Use ETL service for binary files (PDF, Office docs, etc.)
temp_file_path = None
try:
@ -316,7 +352,7 @@ async def _process_file_content(
return extracted_text
else:
# Fallback if extraction fails
logger.warning(f"Could not extract text from binary file {file_name}")
logger.warning(f"ETL returned empty for binary file {file_name}")
return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n"
except Exception as e:
@ -329,8 +365,8 @@ async def _process_file_content(
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception as e:
logger.debug(f"Could not delete temp file {temp_file_path}: {e}")
except Exception:
pass
else:
# Text file - try to decode as UTF-8
try:
@ -372,9 +408,11 @@ async def _extract_text_with_etl(
from logging import ERROR, getLogger
etl_service = config.ETL_SERVICE
logger.debug(f"[_extract_text_with_etl] START - file_path={file_path}, file_name={file_name}, etl_service={etl_service}")
try:
if etl_service == "UNSTRUCTURED":
logger.debug("[_extract_text_with_etl] Using UNSTRUCTURED ETL")
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
@ -390,11 +428,16 @@ async def _extract_text_with_etl(
)
docs = await loader.aload()
logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED loaded {len(docs) if docs else 0} docs")
if docs:
return await convert_document_to_markdown(docs)
result = await convert_document_to_markdown(docs)
logger.debug(f"[_extract_text_with_etl] UNSTRUCTURED result: {len(result) if result else 0} chars")
return result
logger.debug("[_extract_text_with_etl] UNSTRUCTURED returned no docs")
return None
elif etl_service == "LLAMACLOUD":
logger.debug("[_extract_text_with_etl] Using LLAMACLOUD ETL")
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)
@ -413,11 +456,16 @@ async def _extract_text_with_etl(
markdown_documents = await result.aget_markdown_documents(
split_by_page=False
)
logger.debug(f"[_extract_text_with_etl] LLAMACLOUD got {len(markdown_documents) if markdown_documents else 0} markdown docs")
if markdown_documents:
return markdown_documents[0].text
text = markdown_documents[0].text
logger.debug(f"[_extract_text_with_etl] LLAMACLOUD result: {len(text) if text else 0} chars")
return text
logger.debug("[_extract_text_with_etl] LLAMACLOUD returned no markdown docs")
return None
elif etl_service == "DOCLING":
logger.debug("[_extract_text_with_etl] Using DOCLING ETL")
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
@ -441,16 +489,21 @@ async def _extract_text_with_etl(
result = await docling_service.process_document(
file_path, file_name
)
logger.debug(f"[_extract_text_with_etl] DOCLING result keys: {list(result.keys()) if result else 'None'}")
finally:
pdfminer_logger.setLevel(original_level)
return result.get("content")
content = result.get("content")
logger.debug(f"[_extract_text_with_etl] DOCLING content: {len(content) if content else 0} chars")
return content
else:
logger.warning(f"Unknown ETL service: {etl_service}")
logger.warning(f"[_extract_text_with_etl] Unknown ETL service: {etl_service}")
return None
except Exception as e:
logger.error(f"ETL extraction failed for {file_name}: {e!s}")
logger.error(f"[_extract_text_with_etl] ETL extraction EXCEPTION for {file_name}: {e!s}")
import traceback
logger.error(f"[_extract_text_with_etl] Traceback: {traceback.format_exc()}")
return None
@ -979,7 +1032,7 @@ async def _index_composio_drive_full_scan(
all_files.extend(folder_files[:max_files_per_folder])
logger.info(f"Found {len(folder_files)} files in folder {folder_name}")
# Add specifically selected files
# Add specifically selected files - fetch metadata to get mimeType
for selected_file in selected_files:
file_id = selected_file.get("id")
file_name = selected_file.get("name", "Unknown")
@ -987,14 +1040,35 @@ async def _index_composio_drive_full_scan(
if not file_id:
continue
# Add file info (we'll fetch content later during indexing)
all_files.append(
{
"id": file_id,
"name": file_name,
"mimeType": "", # Will be determined later
}
)
# Fetch file metadata to get proper mimeType
metadata, meta_error = await composio_connector.get_file_metadata(file_id)
if metadata and not meta_error:
all_files.append(
{
"id": file_id,
"name": metadata.get("name") or file_name,
"mimeType": metadata.get("mimeType", ""),
"modifiedTime": metadata.get("modifiedTime", ""),
"createdTime": metadata.get("createdTime", ""),
}
)
logger.info(
f"Fetched metadata for UI-selected file: {file_name} "
f"(mimeType={metadata.get('mimeType', 'unknown')})"
)
else:
# Fallback if metadata fetch fails - content-based detection will handle it
logger.warning(
f"Could not fetch metadata for file {file_name}: {meta_error}. "
f"Falling back to content-based detection."
)
all_files.append(
{
"id": file_id,
"name": file_name,
"mimeType": "", # Content-based detection will handle this
}
)
else:
# No selection specified - fetch all files (original behavior)
page_token = None
@ -1128,8 +1202,10 @@ async def _process_single_drive_file(
session, unique_identifier_hash
)
# Get file content
content, content_error = await composio_connector.get_drive_file_content(file_id)
# Get file content (pass mime_type for Google Workspace export handling)
content, content_error = await composio_connector.get_drive_file_content(
file_id, original_mime_type=mime_type
)
if content_error or not content:
logger.warning(f"Could not get content for file {file_name}: {content_error}")

View file

@ -449,8 +449,11 @@ class ComposioService:
"""
try:
# Composio uses snake_case for parameters
# IMPORTANT: Include 'fields' to ensure mimeType is returned in the response
# Without this, Google Drive API may not include mimeType for some files
params = {
"page_size": min(page_size, 100),
"fields": "files(id,name,mimeType,modifiedTime,createdTime),nextPageToken",
}
if folder_id:
# List contents of a specific folder (exclude shortcuts - we don't have access to them)
@ -498,7 +501,11 @@ class ComposioService:
return [], None, str(e)
async def get_drive_file_content(
self, connected_account_id: str, entity_id: str, file_id: str
self,
connected_account_id: str,
entity_id: str,
file_id: str,
original_mime_type: str | None = None,
) -> tuple[bytes | None, str | None]:
"""
Download file content from Google Drive via Composio.
@ -507,10 +514,13 @@ class ComposioService:
to a local directory, and the local file path is provided in the response.
Response includes: file_path, file_name, size fields.
For Google Workspace files (Docs, Sheets, Slides), exports to PDF format.
Args:
connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account.
file_id: Google Drive file ID.
original_mime_type: Original MIME type of the file (used to detect Google Workspace files).
Returns:
Tuple of (file content bytes, error message).
@ -518,10 +528,19 @@ class ComposioService:
from pathlib import Path
try:
params = {"file_id": file_id}
# For Google Workspace files, explicitly export as PDF
# This ensures consistent behavior and proper binary detection
if original_mime_type and original_mime_type.startswith(
"application/vnd.google-apps."
):
params["mime_type"] = "application/pdf"
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GOOGLEDRIVE_DOWNLOAD_FILE",
params={"file_id": file_id},
params=params,
entity_id=entity_id,
)
@ -651,6 +670,60 @@ class ComposioService:
logger.error(f"Failed to get Drive file content: {e!s}")
return None, str(e)
async def get_file_metadata(
self, connected_account_id: str, entity_id: str, file_id: str
) -> tuple[dict[str, Any] | None, str | None]:
"""
Get metadata for a specific file from Google Drive.
Args:
connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account.
file_id: The ID of the file to get metadata for.
Returns:
Tuple of (metadata dict, error message).
"""
try:
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GOOGLEDRIVE_GET_FILE_METADATA",
params={
"file_id": file_id,
"fields": "id,name,mimeType,modifiedTime,createdTime,size",
},
entity_id=entity_id,
)
if not result.get("success"):
return None, result.get("error", "Unknown error")
data = result.get("data", {})
# Handle nested response structure
if isinstance(data, dict):
inner_data = data.get("data", data)
if isinstance(inner_data, dict):
# Extract metadata fields with fallbacks for camelCase/snake_case
metadata = {
"id": inner_data.get("id") or file_id,
"name": inner_data.get("name", ""),
"mimeType": inner_data.get("mimeType")
or inner_data.get("mime_type", ""),
"modifiedTime": inner_data.get("modifiedTime")
or inner_data.get("modified_time", ""),
"createdTime": inner_data.get("createdTime")
or inner_data.get("created_time", ""),
"size": inner_data.get("size", ""),
}
return metadata, None
return None, "Could not extract metadata from response"
except Exception as e:
logger.error(f"Failed to get file metadata: {e!s}")
return None, str(e)
async def get_drive_start_page_token(
self, connected_account_id: str, entity_id: str
) -> tuple[str | None, str | None]: