feat: enhance Composio connector functionality with Google Drive delta sync support

- Added methods to retrieve the starting page token and list changes in Google Drive, enabling delta sync capabilities.
- Updated Composio service to handle file download directory configuration.
- Modified indexing tasks to support delta sync, improving efficiency by processing only changed files.
- Adjusted date handling in connector tasks to allow optional start and end dates.
- Improved error handling and logging throughout the Composio indexing process.
This commit is contained in:
Anish Sarkar 2026-01-23 18:37:09 +05:30
parent 9c5c925fca
commit 29382070aa
16 changed files with 905 additions and 471 deletions

View file

@ -561,8 +561,12 @@ async def _index_composio_google_drive(
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
"""Index Google Drive files via Composio.
"""Index Google Drive files via Composio with delta sync support.
Delta Sync Flow:
1. First sync: Full scan + get initial page token
2. Subsequent syncs: Use LIST_CHANGES to process only changed files
Supports folder/file selection via connector config:
- selected_folders: List of {id, name} for folders to index
- selected_files: List of {id, name} for individual files to index
@ -576,354 +580,88 @@ async def _index_composio_google_drive(
selected_folders = connector_config.get("selected_folders", [])
selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get("indexing_options", {})
# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
use_delta_sync = stored_page_token and connector.last_indexed_at
max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True)
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Drive files via Composio for connector {connector_id}",
{
"stage": "fetching_files",
"selected_folders": len(selected_folders),
"selected_files": len(selected_files),
},
)
all_files = []
# If specific folders/files are selected, fetch from those
if selected_folders or selected_files:
# Fetch files from selected folders
for folder in selected_folders:
folder_id = folder.get("id")
folder_name = folder.get("name", "Unknown")
if not folder_id:
continue
# Handle special case for "root" folder
actual_folder_id = None if folder_id == "root" else folder_id
logger.info(f"Fetching files from folder: {folder_name} ({folder_id})")
# Fetch files from this folder
folder_files = []
page_token = None
while len(folder_files) < max_files_per_folder:
(
files,
next_token,
error,
) = await composio_connector.list_drive_files(
folder_id=actual_folder_id,
page_token=page_token,
page_size=min(100, max_files_per_folder - len(folder_files)),
)
if error:
logger.warning(
f"Failed to fetch files from folder {folder_name}: {error}"
)
break
# Process files
for file_info in files:
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
# If it's a folder and include_subfolders is enabled, recursively fetch
if mime_type == "application/vnd.google-apps.folder":
if include_subfolders:
# Add subfolder files recursively
subfolder_files = await _fetch_folder_files_recursively(
composio_connector,
file_info.get("id"),
max_files=max_files_per_folder,
current_count=len(folder_files),
)
folder_files.extend(subfolder_files)
else:
folder_files.append(file_info)
if not next_token:
break
page_token = next_token
all_files.extend(folder_files[:max_files_per_folder])
logger.info(f"Found {len(folder_files)} files in folder {folder_name}")
# Add specifically selected files
for selected_file in selected_files:
file_id = selected_file.get("id")
file_name = selected_file.get("name", "Unknown")
if not file_id:
continue
# Add file info (we'll fetch content later during indexing)
all_files.append(
{
"id": file_id,
"name": file_name,
"mimeType": "", # Will be determined later
}
)
else:
# No selection specified - fetch all files (original behavior)
page_token = None
while len(all_files) < max_items:
files, next_token, error = await composio_connector.list_drive_files(
page_token=page_token,
page_size=min(100, max_items - len(all_files)),
)
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch Drive files: {error}", {}
)
return 0, f"Failed to fetch Drive files: {error}"
all_files.extend(files)
if not next_token:
break
page_token = next_token
if not all_files:
success_msg = "No Google Drive files found"
await task_logger.log_task_success(
log_entry, success_msg, {"files_count": 0}
# Route to delta sync or full scan
if use_delta_sync:
logger.info(f"Using delta sync for Composio Google Drive connector {connector_id}")
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync for Google Drive via Composio (connector {connector_id})",
{"stage": "delta_sync", "token": stored_page_token[:20] + "..."},
)
documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_delta_sync(
session=session,
composio_connector=composio_connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
page_token=stored_page_token,
max_items=max_items,
task_logger=task_logger,
log_entry=log_entry,
)
else:
logger.info(f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)")
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Drive files via Composio for connector {connector_id}",
{
"stage": "full_scan",
"selected_folders": len(selected_folders),
"selected_files": len(selected_files),
},
)
documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_full_scan(
session=session,
composio_connector=composio_connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
selected_folders=selected_folders,
selected_files=selected_files,
max_files_per_folder=max_files_per_folder,
include_subfolders=include_subfolders,
max_items=max_items,
task_logger=task_logger,
log_entry=log_entry,
)
# CRITICAL: Update timestamp even when no files found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return (
0,
None,
) # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio")
# Get new page token for next sync (always update after successful sync)
new_token, token_error = await composio_connector.get_drive_start_page_token()
if new_token and not token_error:
from sqlalchemy.orm.attributes import flag_modified
# Refresh connector to avoid stale state
await session.refresh(connector)
if not connector.config:
connector.config = {}
connector.config["drive_page_token"] = new_token
flag_modified(connector, "config")
logger.info(f"Updated drive_page_token for connector {connector_id}")
elif token_error:
logger.warning(f"Failed to get new page token: {token_error}")
documents_indexed = 0
documents_skipped = 0
processing_errors = []
for file_info in all_files:
try:
# Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "")
file_name = (
file_info.get("name", "")
or file_info.get("fileName", "")
or "Untitled"
)
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
if not file_id:
documents_skipped += 1
continue
# Skip folders
if mime_type == "application/vnd.google-apps.folder":
continue
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get file content
(
content,
content_error,
) = await composio_connector.get_drive_file_content(file_id)
if content_error or not content:
logger.warning(
f"Could not get content for file {file_name}: {content_error}"
)
# Use metadata as content fallback
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
elif isinstance(content, dict):
# Safety check: if content is still a dict, log error and use fallback
error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}"
logger.error(error_msg)
processing_errors.append(error_msg)
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
else:
# Process content based on file type
markdown_content = await _process_file_content(
content=content,
file_name=file_name,
file_id=file_id,
mime_type=mime_type,
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
processing_errors=processing_errors,
)
content_hash = generate_content_hash(markdown_content, search_space_id)
if existing_document:
if existing_document.content_hash == content_hash:
documents_skipped += 1
continue
# Update existing document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Google Drive File: {file_name}\n\nType: {mime_type}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
existing_document.title = f"Drive: {file_name}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit()
continue
# Create new document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Google Drive File: {file_name}\n\nType: {mime_type}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
document = Document(
search_space_id=search_space_id,
title=f"Drive: {file_name}",
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
document_metadata={
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"connector_id": connector_id,
"toolkit_id": "googledrive",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
)
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit()
except Exception as e:
error_msg = (
f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
)
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
# CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
# This matches the pattern used in non-Composio Gmail indexer
logger.info(
f"Final commit: Total {documents_indexed} Google Drive files processed"
)
# Final commit
logger.info(f"Final commit: Total {documents_indexed} Google Drive files processed")
await session.commit()
logger.info(
"Successfully committed all Composio Google Drive document changes to database"
)
logger.info("Successfully committed all Composio Google Drive document changes to database")
# If there were processing errors, return them so notification can show them
# Handle processing errors
error_message = None
if processing_errors:
# Combine all errors into a single message
if len(processing_errors) == 1:
error_message = processing_errors[0]
else:
@ -934,6 +672,7 @@ async def _index_composio_google_drive(
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"sync_type": "delta" if use_delta_sync else "full",
"errors": processing_errors,
},
)
@ -944,6 +683,7 @@ async def _index_composio_google_drive(
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"sync_type": "delta" if use_delta_sync else "full",
},
)
@ -954,6 +694,469 @@ async def _index_composio_google_drive(
return 0, f"Failed to index Google Drive via Composio: {e!s}"
async def _index_composio_drive_delta_sync(
session: AsyncSession,
composio_connector: ComposioConnector,
connector_id: int,
search_space_id: int,
user_id: str,
page_token: str,
max_items: int,
task_logger: TaskLoggingService,
log_entry,
) -> tuple[int, int, list[str]]:
"""Index Google Drive files using delta sync (only changed files).
Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync.
Handles: new files, modified files, and deleted files.
"""
documents_indexed = 0
documents_skipped = 0
processing_errors = []
# Fetch all changes with pagination
all_changes = []
current_token = page_token
while len(all_changes) < max_items:
changes, next_token, error = await composio_connector.list_drive_changes(
page_token=current_token,
page_size=100,
include_removed=True,
)
if error:
logger.error(f"Error fetching Drive changes: {error}")
processing_errors.append(f"Failed to fetch changes: {error}")
break
all_changes.extend(changes)
if not next_token or next_token == current_token:
break
current_token = next_token
if not all_changes:
logger.info("No changes detected since last sync")
return 0, 0, []
logger.info(f"Processing {len(all_changes)} changes from delta sync")
for change in all_changes[:max_items]:
try:
# Handle removed files
is_removed = change.get("removed", False)
file_info = change.get("file", {})
file_id = change.get("fileId") or file_info.get("id", "")
if not file_id:
documents_skipped += 1
continue
# Check if file was trashed or removed
if is_removed or file_info.get("trashed", False):
# Remove document from database
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
await session.delete(existing_document)
documents_indexed += 1
logger.info(f"Deleted document for removed/trashed file: {file_id}")
continue
# Process changed file
file_name = file_info.get("name", "") or "Untitled"
mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
# Skip folders
if mime_type == "application/vnd.google-apps.folder":
continue
# Process the file
indexed, skipped, errors = await _process_single_drive_file(
session=session,
composio_connector=composio_connector,
file_id=file_id,
file_name=file_name,
mime_type=mime_type,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
documents_indexed += indexed
documents_skipped += skipped
processing_errors.extend(errors)
# Batch commit every 10 documents
if documents_indexed > 0 and documents_indexed % 10 == 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} changes processed")
except Exception as e:
error_msg = f"Error processing change for file {file_id}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1
logger.info(f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped")
return documents_indexed, documents_skipped, processing_errors
async def _index_composio_drive_full_scan(
session: AsyncSession,
composio_connector: ComposioConnector,
connector_id: int,
search_space_id: int,
user_id: str,
selected_folders: list[dict],
selected_files: list[dict],
max_files_per_folder: int,
include_subfolders: bool,
max_items: int,
task_logger: TaskLoggingService,
log_entry,
) -> tuple[int, int, list[str]]:
"""Index Google Drive files using full scan (first sync or when no delta token)."""
documents_indexed = 0
documents_skipped = 0
processing_errors = []
all_files = []
# If specific folders/files are selected, fetch from those
if selected_folders or selected_files:
# Fetch files from selected folders
for folder in selected_folders:
folder_id = folder.get("id")
folder_name = folder.get("name", "Unknown")
if not folder_id:
continue
# Handle special case for "root" folder
actual_folder_id = None if folder_id == "root" else folder_id
logger.info(f"Fetching files from folder: {folder_name} ({folder_id})")
# Fetch files from this folder
folder_files = []
page_token = None
while len(folder_files) < max_files_per_folder:
(
files,
next_token,
error,
) = await composio_connector.list_drive_files(
folder_id=actual_folder_id,
page_token=page_token,
page_size=min(100, max_files_per_folder - len(folder_files)),
)
if error:
logger.warning(
f"Failed to fetch files from folder {folder_name}: {error}"
)
break
# Process files
for file_info in files:
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
# If it's a folder and include_subfolders is enabled, recursively fetch
if mime_type == "application/vnd.google-apps.folder":
if include_subfolders:
# Add subfolder files recursively
subfolder_files = await _fetch_folder_files_recursively(
composio_connector,
file_info.get("id"),
max_files=max_files_per_folder,
current_count=len(folder_files),
)
folder_files.extend(subfolder_files)
else:
folder_files.append(file_info)
if not next_token:
break
page_token = next_token
all_files.extend(folder_files[:max_files_per_folder])
logger.info(f"Found {len(folder_files)} files in folder {folder_name}")
# Add specifically selected files
for selected_file in selected_files:
file_id = selected_file.get("id")
file_name = selected_file.get("name", "Unknown")
if not file_id:
continue
# Add file info (we'll fetch content later during indexing)
all_files.append(
{
"id": file_id,
"name": file_name,
"mimeType": "", # Will be determined later
}
)
else:
# No selection specified - fetch all files (original behavior)
page_token = None
while len(all_files) < max_items:
files, next_token, error = await composio_connector.list_drive_files(
page_token=page_token,
page_size=min(100, max_items - len(all_files)),
)
if error:
return 0, 0, [f"Failed to fetch Drive files: {error}"]
all_files.extend(files)
if not next_token:
break
page_token = next_token
if not all_files:
logger.info("No Google Drive files found")
return 0, 0, []
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio (full scan)")
for file_info in all_files:
try:
# Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "")
file_name = (
file_info.get("name", "")
or file_info.get("fileName", "")
or "Untitled"
)
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
)
if not file_id:
documents_skipped += 1
continue
# Skip folders
if mime_type == "application/vnd.google-apps.folder":
continue
# Process the file
indexed, skipped, errors = await _process_single_drive_file(
session=session,
composio_connector=composio_connector,
file_id=file_id,
file_name=file_name,
mime_type=mime_type,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
documents_indexed += indexed
documents_skipped += skipped
processing_errors.extend(errors)
# Batch commit every 10 documents
if documents_indexed > 0 and documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Google Drive files processed so far")
await session.commit()
except Exception as e:
error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
documents_skipped += 1
logger.info(f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped")
return documents_indexed, documents_skipped, processing_errors
async def _process_single_drive_file(
session: AsyncSession,
composio_connector: ComposioConnector,
file_id: str,
file_name: str,
mime_type: str,
connector_id: int,
search_space_id: int,
user_id: str,
task_logger: TaskLoggingService,
log_entry,
) -> tuple[int, int, list[str]]:
"""Process a single Google Drive file for indexing.
Returns:
Tuple of (documents_indexed, documents_skipped, processing_errors)
"""
processing_errors = []
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get file content
content, content_error = await composio_connector.get_drive_file_content(file_id)
if content_error or not content:
logger.warning(
f"Could not get content for file {file_name}: {content_error}"
)
# Use metadata as content fallback
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
elif isinstance(content, dict):
# Safety check: if content is still a dict, log error and use fallback
error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}"
logger.error(error_msg)
processing_errors.append(error_msg)
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
markdown_content += f"**Type:** {mime_type}\n"
else:
# Process content based on file type
markdown_content = await _process_file_content(
content=content,
file_name=file_name,
file_id=file_id,
mime_type=mime_type,
search_space_id=search_space_id,
user_id=user_id,
session=session,
task_logger=task_logger,
log_entry=log_entry,
processing_errors=processing_errors,
)
content_hash = generate_content_hash(markdown_content, search_space_id)
if existing_document:
if existing_document.content_hash == content_hash:
return 0, 1, processing_errors # Skipped
# Update existing document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Google Drive File: {file_name}\n\nType: {mime_type}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
existing_document.title = f"Drive: {file_name}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"file_id": file_id,
"file_name": file_name,
"FILE_NAME": file_name, # For compatibility
"mime_type": mime_type,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
return 1, 0, processing_errors # Indexed
# Create new document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"file_id": file_id,
"file_name": file_name,
"mime_type": mime_type,
"document_type": "Google Drive File (Composio)",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = (
f"Google Drive File: {file_name}\n\nType: {mime_type}"
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content)
document = Document(
search_space_id=search_space_id,
title=f"Drive: {file_name}",
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
document_metadata={
"file_id": file_id,
"file_name": file_name,
"FILE_NAME": file_name, # For compatibility
"mime_type": mime_type,
"connector_id": connector_id,
"toolkit_id": "googledrive",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
)
session.add(document)
return 1, 0, processing_errors # Indexed
async def _fetch_folder_files_recursively(
composio_connector: ComposioConnector,
folder_id: str,
@ -1271,11 +1474,18 @@ async def _index_composio_gmail(
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Use provided dates directly if both are provided, otherwise calculate from last_indexed_at
# This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior)
if start_date is not None and end_date is not None:
# User provided both dates - use them directly
start_date_str = start_date
end_date_str = end_date
else:
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Build query with date range
query_parts = []
@ -1468,11 +1678,18 @@ async def _index_composio_google_calendar(
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Use provided dates directly if both are provided, otherwise calculate from last_indexed_at
# This ensures user-selected dates are respected (matching non-Composio Calendar connector behavior)
if start_date is not None and end_date is not None:
# User provided both dates - use them directly
start_date_str = start_date
end_date_str = end_date
else:
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Build time range for API call
time_min = f"{start_date_str}T00:00:00Z"