chore: ran linting

This commit is contained in:
Anish Sarkar 2026-01-24 04:36:34 +05:30
parent 97d7207bd4
commit a5103da3d7
21 changed files with 259 additions and 181 deletions

View file

@ -611,4 +611,3 @@ async def index_composio_gmail(
except Exception as e: except Exception as e:
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True) logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Gmail via Composio: {e!s}" return 0, f"Failed to index Gmail via Composio: {e!s}"

View file

@ -259,7 +259,9 @@ async def index_composio_google_calendar(
documents_indexed = 0 documents_indexed = 0
documents_skipped = 0 documents_skipped = 0
duplicate_content_count = 0 # Track events skipped due to duplicate content_hash duplicate_content_count = (
0 # Track events skipped due to duplicate content_hash
)
for event in events: for event in events:
try: try:
@ -353,7 +355,7 @@ async def index_composio_google_calendar(
logger.info( logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far" f"Committing batch: {documents_indexed} Google Calendar events processed so far"
) )
await session.commit( ) await session.commit()
continue continue
# Document doesn't exist by unique_identifier_hash # Document doesn't exist by unique_identifier_hash
@ -362,7 +364,7 @@ async def index_composio_google_calendar(
duplicate_by_content = await check_duplicate_document_by_hash( duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash session, content_hash
) )
if duplicate_by_content: if duplicate_by_content:
# A document with the same content already exists (likely from standard connector) # A document with the same content already exists (likely from standard connector)
logger.info( logger.info(
@ -458,7 +460,10 @@ async def index_composio_google_calendar(
) )
except Exception as e: except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.) # Handle any remaining integrity errors gracefully (race conditions, etc.)
if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower(): if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning( logger.warning(
f"Duplicate content_hash detected during final commit. " f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. " f"This may occur if the same event was indexed by multiple connectors. "
@ -495,4 +500,3 @@ async def index_composio_google_calendar(
f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True
) )
return 0, f"Failed to index Google Calendar via Composio: {e!s}" return 0, f"Failed to index Google Calendar via Composio: {e!s}"

View file

@ -453,8 +453,8 @@ async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str session: AsyncSession, unique_identifier_hash: str
) -> Document | None: ) -> Document | None:
"""Check if a document with the given unique identifier hash already exists.""" """Check if a document with the given unique identifier hash already exists."""
from sqlalchemy.orm import selectinload
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
existing_doc_result = await session.execute( existing_doc_result = await session.execute(
select(Document) select(Document)
@ -517,14 +517,20 @@ async def index_composio_google_drive(
# Route to delta sync or full scan # Route to delta sync or full scan
if use_delta_sync: if use_delta_sync:
logger.info(f"Using delta sync for Composio Google Drive connector {connector_id}") logger.info(
f"Using delta sync for Composio Google Drive connector {connector_id}"
)
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
f"Starting delta sync for Google Drive via Composio (connector {connector_id})", f"Starting delta sync for Google Drive via Composio (connector {connector_id})",
{"stage": "delta_sync", "token": stored_page_token[:20] + "..."}, {"stage": "delta_sync", "token": stored_page_token[:20] + "..."},
) )
documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_delta_sync( (
documents_indexed,
documents_skipped,
processing_errors,
) = await _index_composio_drive_delta_sync(
session=session, session=session,
composio_connector=composio_connector, composio_connector=composio_connector,
connector_id=connector_id, connector_id=connector_id,
@ -536,7 +542,9 @@ async def index_composio_google_drive(
log_entry=log_entry, log_entry=log_entry,
) )
else: else:
logger.info(f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)") logger.info(
f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)"
)
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
f"Fetching Google Drive files via Composio for connector {connector_id}", f"Fetching Google Drive files via Composio for connector {connector_id}",
@ -547,7 +555,11 @@ async def index_composio_google_drive(
}, },
) )
documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_full_scan( (
documents_indexed,
documents_skipped,
processing_errors,
) = await _index_composio_drive_full_scan(
session=session, session=session,
composio_connector=composio_connector, composio_connector=composio_connector,
connector_id=connector_id, connector_id=connector_id,
@ -580,9 +592,13 @@ async def index_composio_google_drive(
await update_connector_last_indexed(session, connector, update_last_indexed) await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit # Final commit
logger.info(f"Final commit: Total {documents_indexed} Google Drive files processed") logger.info(
f"Final commit: Total {documents_indexed} Google Drive files processed"
)
await session.commit() await session.commit()
logger.info("Successfully committed all Composio Google Drive document changes to database") logger.info(
"Successfully committed all Composio Google Drive document changes to database"
)
# Handle processing errors # Handle processing errors
error_message = None error_message = None
@ -731,7 +747,9 @@ async def _index_composio_drive_delta_sync(
processing_errors.append(error_msg) processing_errors.append(error_msg)
documents_skipped += 1 documents_skipped += 1
logger.info(f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped") logger.info(
f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped"
)
return documents_indexed, documents_skipped, processing_errors return documents_indexed, documents_skipped, processing_errors
@ -858,20 +876,18 @@ async def _index_composio_drive_full_scan(
logger.info("No Google Drive files found") logger.info("No Google Drive files found")
return 0, 0, [] return 0, 0, []
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio (full scan)") logger.info(
f"Found {len(all_files)} Google Drive files to index via Composio (full scan)"
)
for file_info in all_files: for file_info in all_files:
try: try:
# Handle both standard Google API and potential Composio variations # Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "") file_id = file_info.get("id", "") or file_info.get("fileId", "")
file_name = ( file_name = (
file_info.get("name", "") file_info.get("name", "") or file_info.get("fileName", "") or "Untitled"
or file_info.get("fileName", "")
or "Untitled"
)
mime_type = file_info.get("mimeType", "") or file_info.get(
"mime_type", ""
) )
mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
if not file_id: if not file_id:
documents_skipped += 1 documents_skipped += 1
@ -901,7 +917,9 @@ async def _index_composio_drive_full_scan(
# Batch commit every 10 documents # Batch commit every 10 documents
if documents_indexed > 0 and documents_indexed % 10 == 0: if documents_indexed > 0 and documents_indexed % 10 == 0:
logger.info(f"Committing batch: {documents_indexed} Google Drive files processed so far") logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit() await session.commit()
except Exception as e: except Exception as e:
@ -910,7 +928,9 @@ async def _index_composio_drive_full_scan(
processing_errors.append(error_msg) processing_errors.append(error_msg)
documents_skipped += 1 documents_skipped += 1
logger.info(f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped") logger.info(
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
)
return documents_indexed, documents_skipped, processing_errors return documents_indexed, documents_skipped, processing_errors
@ -948,9 +968,7 @@ async def _process_single_drive_file(
content, content_error = await composio_connector.get_drive_file_content(file_id) content, content_error = await composio_connector.get_drive_file_content(file_id)
if content_error or not content: if content_error or not content:
logger.warning( logger.warning(f"Could not get content for file {file_name}: {content_error}")
f"Could not get content for file {file_name}: {content_error}"
)
# Use metadata as content fallback # Use metadata as content fallback
markdown_content = f"# {file_name}\n\n" markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n" markdown_content += f"**File ID:** {file_id}\n"
@ -985,9 +1003,7 @@ async def _process_single_drive_file(
return 0, 1, processing_errors # Skipped return 0, 1, processing_errors # Skipped
# Update existing document # Update existing document
user_llm = await get_user_long_context_llm( user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
session, user_id, search_space_id
)
if user_llm: if user_llm:
document_metadata = { document_metadata = {
@ -1003,12 +1019,8 @@ async def _process_single_drive_file(
markdown_content, user_llm, document_metadata markdown_content, user_llm, document_metadata
) )
else: else:
summary_content = ( summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
f"Google Drive File: {file_name}\n\nType: {mime_type}" summary_embedding = config.embedding_model_instance.embed(summary_content)
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content) chunks = await create_document_chunks(markdown_content)
@ -1030,9 +1042,7 @@ async def _process_single_drive_file(
return 1, 0, processing_errors # Indexed return 1, 0, processing_errors # Indexed
# Create new document # Create new document
user_llm = await get_user_long_context_llm( user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
session, user_id, search_space_id
)
if user_llm: if user_llm:
document_metadata = { document_metadata = {
@ -1048,12 +1058,8 @@ async def _process_single_drive_file(
markdown_content, user_llm, document_metadata markdown_content, user_llm, document_metadata
) )
else: else:
summary_content = ( summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
f"Google Drive File: {file_name}\n\nType: {mime_type}" summary_embedding = config.embedding_model_instance.embed(summary_content)
)
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(markdown_content) chunks = await create_document_chunks(markdown_content)
@ -1159,4 +1165,3 @@ async def _fetch_folder_files_recursively(
except Exception as e: except Exception as e:
logger.error(f"Error in recursive folder fetch: {e!s}") logger.error(f"Error in recursive folder fetch: {e!s}")
return all_files return all_files

View file

@ -144,7 +144,10 @@ class GoogleCalendarConnector:
except Exception as e: except Exception as e:
error_str = str(e) error_str = str(e)
# Check if this is an invalid_grant error (token expired/revoked) # Check if this is an invalid_grant error (token expired/revoked)
if "invalid_grant" in error_str.lower() or "token has been expired or revoked" in error_str.lower(): if (
"invalid_grant" in error_str.lower()
or "token has been expired or revoked" in error_str.lower()
):
raise Exception( raise Exception(
"Google Calendar authentication failed. Please re-authenticate." "Google Calendar authentication failed. Please re-authenticate."
) from e ) from e
@ -173,7 +176,11 @@ class GoogleCalendarConnector:
except Exception as e: except Exception as e:
error_str = str(e) error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it # If the error already contains a user-friendly re-authentication message, preserve it
if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower(): if (
"re-authenticate" in error_str.lower()
or "expired or been revoked" in error_str.lower()
or "authentication failed" in error_str.lower()
):
raise Exception(error_str) from e raise Exception(error_str) from e
raise Exception(f"Failed to create Google Calendar service: {e!s}") from e raise Exception(f"Failed to create Google Calendar service: {e!s}") from e
@ -283,7 +290,11 @@ class GoogleCalendarConnector:
except Exception as e: except Exception as e:
error_str = str(e) error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it # If the error already contains a user-friendly re-authentication message, preserve it
if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower(): if (
"re-authenticate" in error_str.lower()
or "expired or been revoked" in error_str.lower()
or "authentication failed" in error_str.lower()
):
return [], error_str return [], error_str
return [], f"Error fetching events: {e!s}" return [], f"Error fetching events: {e!s}"

View file

@ -143,7 +143,10 @@ class GoogleGmailConnector:
except Exception as e: except Exception as e:
error_str = str(e) error_str = str(e)
# Check if this is an invalid_grant error (token expired/revoked) # Check if this is an invalid_grant error (token expired/revoked)
if "invalid_grant" in error_str.lower() or "token has been expired or revoked" in error_str.lower(): if (
"invalid_grant" in error_str.lower()
or "token has been expired or revoked" in error_str.lower()
):
raise Exception( raise Exception(
"Gmail authentication failed. Please re-authenticate." "Gmail authentication failed. Please re-authenticate."
) from e ) from e
@ -172,7 +175,11 @@ class GoogleGmailConnector:
except Exception as e: except Exception as e:
error_str = str(e) error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it # If the error already contains a user-friendly re-authentication message, preserve it
if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower(): if (
"re-authenticate" in error_str.lower()
or "expired or been revoked" in error_str.lower()
or "authentication failed" in error_str.lower()
):
raise Exception(error_str) from e raise Exception(error_str) from e
raise Exception(f"Failed to create Gmail service: {e!s}") from e raise Exception(f"Failed to create Gmail service: {e!s}") from e
@ -237,7 +244,11 @@ class GoogleGmailConnector:
except Exception as e: except Exception as e:
error_str = str(e) error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it # If the error already contains a user-friendly re-authentication message, preserve it
if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower(): if (
"re-authenticate" in error_str.lower()
or "expired or been revoked" in error_str.lower()
or "authentication failed" in error_str.lower()
):
return [], error_str return [], error_str
return [], f"Error fetching messages list: {e!s}" return [], f"Error fetching messages list: {e!s}"

View file

@ -350,10 +350,10 @@ async def composio_callback(
count = await count_connectors_of_type( count = await count_connectors_of_type(
session, connector_type, space_id, user_id session, connector_type, space_id, user_id
) )
# Generate base name (e.g., "Gmail", "Google Drive") # Generate base name (e.g., "Gmail", "Google Drive")
base_name = get_base_name_for_type(connector_type) base_name = get_base_name_for_type(connector_type)
# Format: "Gmail (Composio) 1", "Gmail (Composio) 2", etc. # Format: "Gmail (Composio) 1", "Gmail (Composio) 2", etc.
if count == 0: if count == 0:
connector_name = f"{base_name} (Composio) 1" connector_name = f"{base_name} (Composio) 1"

View file

@ -662,16 +662,16 @@ async def index_connector_content(
# Use UTC for "today" to match how last_indexed_at is stored # Use UTC for "today" to match how last_indexed_at is stored
today_utc = datetime.now(UTC).replace(tzinfo=None).date() today_utc = datetime.now(UTC).replace(tzinfo=None).date()
last_indexed_date = last_indexed_naive.date() last_indexed_date = last_indexed_naive.date()
if last_indexed_date == today_utc: if last_indexed_date == today_utc:
# If last indexed today, go back 1 day to ensure we don't miss anything # If last indexed today, go back 1 day to ensure we don't miss anything
indexing_from = (today_utc - timedelta(days=1)).strftime("%Y-%m-%d") indexing_from = (today_utc - timedelta(days=1)).strftime("%Y-%m-%d")
else: else:
indexing_from = last_indexed_naive.strftime("%Y-%m-%d") indexing_from = last_indexed_naive.strftime("%Y-%m-%d")
else: else:
indexing_from = (datetime.now(UTC).replace(tzinfo=None) - timedelta(days=365)).strftime( indexing_from = (
"%Y-%m-%d" datetime.now(UTC).replace(tzinfo=None) - timedelta(days=365)
) ).strftime("%Y-%m-%d")
else: else:
indexing_from = start_date indexing_from = start_date
@ -683,7 +683,7 @@ async def index_connector_content(
]: ]:
# Default to today if no end_date provided (users can manually select future dates) # Default to today if no end_date provided (users can manually select future dates)
indexing_to = today_str if end_date is None else end_date indexing_to = today_str if end_date is None else end_date
# If start_date and end_date are the same, adjust end_date to be one day later # If start_date and end_date are the same, adjust end_date to be one day later
# to ensure valid date range (start_date must be strictly before end_date) # to ensure valid date range (start_date must be strictly before end_date)
if indexing_from == indexing_to: if indexing_from == indexing_to:
@ -1251,16 +1251,19 @@ async def _run_indexing_with_notifications(
if error_or_warning: if error_or_warning:
# Check if this is a duplicate warning or empty result (success cases) or an actual error # Check if this is a duplicate warning or empty result (success cases) or an actual error
# Handle both normal and Composio calendar connectors # Handle both normal and Composio calendar connectors
error_or_warning_lower = str(error_or_warning).lower() if error_or_warning else "" error_or_warning_lower = (
str(error_or_warning).lower() if error_or_warning else ""
)
is_duplicate_warning = "skipped (duplicate)" in error_or_warning_lower is_duplicate_warning = "skipped (duplicate)" in error_or_warning_lower
# "No X found" messages are success cases - sync worked, just found nothing in date range # "No X found" messages are success cases - sync worked, just found nothing in date range
is_empty_result = ("no " in error_or_warning_lower and "found" in error_or_warning_lower) is_empty_result = (
"no " in error_or_warning_lower
and "found" in error_or_warning_lower
)
if is_duplicate_warning or is_empty_result: if is_duplicate_warning or is_empty_result:
# These are success cases - sync worked, just found nothing new # These are success cases - sync worked, just found nothing new
logger.info( logger.info(f"Indexing completed successfully: {error_or_warning}")
f"Indexing completed successfully: {error_or_warning}"
)
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI # Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
if update_timestamp_func: if update_timestamp_func:
await update_timestamp_func(session, connector_id) await update_timestamp_func(session, connector_id)
@ -1269,7 +1272,11 @@ async def _run_indexing_with_notifications(
# Refresh notification to ensure it's not stale after timestamp update commit # Refresh notification to ensure it's not stale after timestamp update commit
await session.refresh(notification) await session.refresh(notification)
# For empty results, use a cleaner message # For empty results, use a cleaner message
notification_message = "No new items found in date range" if is_empty_result else error_or_warning notification_message = (
"No new items found in date range"
if is_empty_result
else error_or_warning
)
await NotificationService.connector_indexing.notify_indexing_completed( await NotificationService.connector_indexing.notify_indexing_completed(
session=session, session=session,
notification=notification, notification=notification,

View file

@ -81,7 +81,9 @@ class ComposioService:
# Default download directory for files from Composio # Default download directory for files from Composio
DEFAULT_DOWNLOAD_DIR = "/tmp/composio_downloads" DEFAULT_DOWNLOAD_DIR = "/tmp/composio_downloads"
def __init__(self, api_key: str | None = None, file_download_dir: str | None = None): def __init__(
self, api_key: str | None = None, file_download_dir: str | None = None
):
""" """
Initialize the Composio service. Initialize the Composio service.
@ -90,18 +92,20 @@ class ComposioService:
file_download_dir: Directory for downloaded files. Defaults to /tmp/composio_downloads. file_download_dir: Directory for downloaded files. Defaults to /tmp/composio_downloads.
""" """
import os import os
self.api_key = api_key or config.COMPOSIO_API_KEY self.api_key = api_key or config.COMPOSIO_API_KEY
if not self.api_key: if not self.api_key:
raise ValueError("COMPOSIO_API_KEY is required but not configured") raise ValueError("COMPOSIO_API_KEY is required but not configured")
# Set up download directory # Set up download directory
self.file_download_dir = file_download_dir or self.DEFAULT_DOWNLOAD_DIR self.file_download_dir = file_download_dir or self.DEFAULT_DOWNLOAD_DIR
os.makedirs(self.file_download_dir, exist_ok=True) os.makedirs(self.file_download_dir, exist_ok=True)
# Initialize Composio client with download directory # Initialize Composio client with download directory
# Per docs: file_download_dir configures where files are downloaded # Per docs: file_download_dir configures where files are downloaded
self.client = Composio(api_key=self.api_key, file_download_dir=self.file_download_dir) self.client = Composio(
api_key=self.api_key, file_download_dir=self.file_download_dir
)
@staticmethod @staticmethod
def is_enabled() -> bool: def is_enabled() -> bool:
@ -512,7 +516,7 @@ class ComposioService:
Tuple of (file content bytes, error message). Tuple of (file content bytes, error message).
""" """
from pathlib import Path from pathlib import Path
try: try:
result = await self.execute_tool( result = await self.execute_tool(
connected_account_id=connected_account_id, connected_account_id=connected_account_id,
@ -532,35 +536,37 @@ class ComposioService:
# Response structure: {data: {...}, error: ..., successful: ...} # Response structure: {data: {...}, error: ..., successful: ...}
# The actual file info is nested inside data["data"] # The actual file info is nested inside data["data"]
file_path = None file_path = None
if isinstance(data, dict): if isinstance(data, dict):
# Handle nested response structure: data contains {data, error, successful} # Handle nested response structure: data contains {data, error, successful}
# The actual file info is in data["data"] # The actual file info is in data["data"]
inner_data = data inner_data = data
if "data" in data and isinstance(data["data"], dict): if "data" in data and isinstance(data["data"], dict):
inner_data = data["data"] inner_data = data["data"]
logger.debug(f"Found nested data structure. Inner keys: {list(inner_data.keys())}") logger.debug(
f"Found nested data structure. Inner keys: {list(inner_data.keys())}"
)
elif "successful" in data and "data" in data: elif "successful" in data and "data" in data:
# Standard Composio response wrapper # Standard Composio response wrapper
inner_data = data["data"] if data["data"] else data inner_data = data["data"] if data["data"] else data
# Try documented fields: file_path, downloaded_file_content, path, uri # Try documented fields: file_path, downloaded_file_content, path, uri
file_path = ( file_path = (
inner_data.get("file_path") or inner_data.get("file_path")
inner_data.get("downloaded_file_content") or or inner_data.get("downloaded_file_content")
inner_data.get("path") or or inner_data.get("path")
inner_data.get("uri") or inner_data.get("uri")
) )
# Handle nested dict case where downloaded_file_content contains the path # Handle nested dict case where downloaded_file_content contains the path
if isinstance(file_path, dict): if isinstance(file_path, dict):
file_path = ( file_path = (
file_path.get("file_path") or file_path.get("file_path")
file_path.get("downloaded_file_content") or or file_path.get("downloaded_file_content")
file_path.get("path") or or file_path.get("path")
file_path.get("uri") or file_path.get("uri")
) )
# If still no path, check if inner_data itself has the nested structure # If still no path, check if inner_data itself has the nested structure
if not file_path and isinstance(inner_data, dict): if not file_path and isinstance(inner_data, dict):
for key in ["downloaded_file_content", "file_path", "path", "uri"]: for key in ["downloaded_file_content", "file_path", "path", "uri"]:
@ -572,15 +578,17 @@ class ComposioService:
elif isinstance(val, dict): elif isinstance(val, dict):
# One more level of nesting # One more level of nesting
file_path = ( file_path = (
val.get("file_path") or val.get("file_path")
val.get("downloaded_file_content") or or val.get("downloaded_file_content")
val.get("path") or or val.get("path")
val.get("uri") or val.get("uri")
) )
if file_path: if file_path:
break break
logger.debug(f"Composio response keys: {list(data.keys())}, inner keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}, extracted path: {file_path}") logger.debug(
f"Composio response keys: {list(data.keys())}, inner keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}, extracted path: {file_path}"
)
elif isinstance(data, str): elif isinstance(data, str):
# Direct string response (could be path or content) # Direct string response (could be path or content)
file_path = data file_path = data
@ -591,24 +599,31 @@ class ComposioService:
# Read file from the path # Read file from the path
if file_path and isinstance(file_path, str): if file_path and isinstance(file_path, str):
path_obj = Path(file_path) path_obj = Path(file_path)
# Check if it's a valid file path (absolute or in .composio directory) # Check if it's a valid file path (absolute or in .composio directory)
if path_obj.is_absolute() or '.composio' in str(path_obj): if path_obj.is_absolute() or ".composio" in str(path_obj):
try: try:
if path_obj.exists(): if path_obj.exists():
content = path_obj.read_bytes() content = path_obj.read_bytes()
logger.info(f"Successfully read {len(content)} bytes from Composio file: {file_path}") logger.info(
f"Successfully read {len(content)} bytes from Composio file: {file_path}"
)
return content, None return content, None
else: else:
logger.warning(f"File path from Composio does not exist: {file_path}") logger.warning(
f"File path from Composio does not exist: {file_path}"
)
return None, f"File not found at path: {file_path}" return None, f"File not found at path: {file_path}"
except Exception as e: except Exception as e:
logger.error(f"Failed to read file from Composio path {file_path}: {e!s}") logger.error(
f"Failed to read file from Composio path {file_path}: {e!s}"
)
return None, f"Failed to read file: {e!s}" return None, f"Failed to read file: {e!s}"
else: else:
# Not a file path - might be base64 encoded content # Not a file path - might be base64 encoded content
try: try:
import base64 import base64
content = base64.b64decode(file_path) content = base64.b64decode(file_path)
return content, None return content, None
except Exception: except Exception:
@ -625,8 +640,11 @@ class ComposioService:
f"Inner data keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else type(inner_data).__name__}, " f"Inner data keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else type(inner_data).__name__}, "
f"Full inner data: {inner_data}" f"Full inner data: {inner_data}"
) )
return None, f"No file path in Composio response. Keys: {list(data.keys())}, inner: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}" return (
None,
f"No file path in Composio response. Keys: {list(data.keys())}, inner: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}",
)
return None, f"Unexpected data type from Composio: {type(data).__name__}" return None, f"Unexpected data type from Composio: {type(data).__name__}"
except Exception as e: except Exception as e:
@ -638,14 +656,14 @@ class ComposioService:
) -> tuple[str | None, str | None]: ) -> tuple[str | None, str | None]:
""" """
Get the starting page token for Google Drive change tracking. Get the starting page token for Google Drive change tracking.
This token represents the current state and is used for future delta syncs. This token represents the current state and is used for future delta syncs.
Per Composio docs: Use GOOGLEDRIVE_GET_CHANGES_START_PAGE_TOKEN to get initial token. Per Composio docs: Use GOOGLEDRIVE_GET_CHANGES_START_PAGE_TOKEN to get initial token.
Args: Args:
connected_account_id: Composio connected account ID. connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account. entity_id: The entity/user ID that owns the connected account.
Returns: Returns:
Tuple of (start_page_token, error message). Tuple of (start_page_token, error message).
""" """
@ -656,27 +674,27 @@ class ComposioService:
params={}, params={},
entity_id=entity_id, entity_id=entity_id,
) )
if not result.get("success"): if not result.get("success"):
return None, result.get("error", "Unknown error") return None, result.get("error", "Unknown error")
data = result.get("data", {}) data = result.get("data", {})
# Handle nested response: {data: {startPageToken: ...}, successful: ...} # Handle nested response: {data: {startPageToken: ...}, successful: ...}
if isinstance(data, dict): if isinstance(data, dict):
inner_data = data.get("data", data) inner_data = data.get("data", data)
token = ( token = (
inner_data.get("startPageToken") or inner_data.get("startPageToken")
inner_data.get("start_page_token") or or inner_data.get("start_page_token")
data.get("startPageToken") or or data.get("startPageToken")
data.get("start_page_token") or data.get("start_page_token")
) )
if token: if token:
logger.info(f"Got Drive start page token: {token}") logger.info(f"Got Drive start page token: {token}")
return token, None return token, None
logger.warning(f"Could not extract start page token from response: {data}") logger.warning(f"Could not extract start page token from response: {data}")
return None, "No start page token in response" return None, "No start page token in response"
except Exception as e: except Exception as e:
logger.error(f"Failed to get Drive start page token: {e!s}") logger.error(f"Failed to get Drive start page token: {e!s}")
return None, str(e) return None, str(e)
@ -691,18 +709,18 @@ class ComposioService:
) -> tuple[list[dict[str, Any]], str | None, str | None]: ) -> tuple[list[dict[str, Any]], str | None, str | None]:
""" """
List changes in Google Drive since the given page token. List changes in Google Drive since the given page token.
Per Composio docs: GOOGLEDRIVE_LIST_CHANGES tracks modifications to files/folders. Per Composio docs: GOOGLEDRIVE_LIST_CHANGES tracks modifications to files/folders.
If pageToken is not provided, it auto-fetches the current start page token. If pageToken is not provided, it auto-fetches the current start page token.
Response includes nextPageToken for pagination and newStartPageToken for future syncs. Response includes nextPageToken for pagination and newStartPageToken for future syncs.
Args: Args:
connected_account_id: Composio connected account ID. connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account. entity_id: The entity/user ID that owns the connected account.
page_token: Page token from previous sync (optional - will auto-fetch if not provided). page_token: Page token from previous sync (optional - will auto-fetch if not provided).
page_size: Number of changes per page. page_size: Number of changes per page.
include_removed: Whether to include removed items in the response. include_removed: Whether to include removed items in the response.
Returns: Returns:
Tuple of (changes list, new_start_page_token, error message). Tuple of (changes list, new_start_page_token, error message).
""" """
@ -713,42 +731,44 @@ class ComposioService:
} }
if page_token: if page_token:
params["pageToken"] = page_token params["pageToken"] = page_token
result = await self.execute_tool( result = await self.execute_tool(
connected_account_id=connected_account_id, connected_account_id=connected_account_id,
tool_name="GOOGLEDRIVE_LIST_CHANGES", tool_name="GOOGLEDRIVE_LIST_CHANGES",
params=params, params=params,
entity_id=entity_id, entity_id=entity_id,
) )
if not result.get("success"): if not result.get("success"):
return [], None, result.get("error", "Unknown error") return [], None, result.get("error", "Unknown error")
data = result.get("data", {}) data = result.get("data", {})
# Handle nested response structure # Handle nested response structure
changes = [] changes = []
new_start_token = None new_start_token = None
if isinstance(data, dict): if isinstance(data, dict):
inner_data = data.get("data", data) inner_data = data.get("data", data)
changes = inner_data.get("changes", []) or data.get("changes", []) changes = inner_data.get("changes", []) or data.get("changes", [])
# Get the token for next sync # Get the token for next sync
# newStartPageToken is returned when all changes have been fetched # newStartPageToken is returned when all changes have been fetched
# nextPageToken is for pagination within the current fetch # nextPageToken is for pagination within the current fetch
new_start_token = ( new_start_token = (
inner_data.get("newStartPageToken") or inner_data.get("newStartPageToken")
inner_data.get("new_start_page_token") or or inner_data.get("new_start_page_token")
inner_data.get("nextPageToken") or or inner_data.get("nextPageToken")
inner_data.get("next_page_token") or or inner_data.get("next_page_token")
data.get("newStartPageToken") or or data.get("newStartPageToken")
data.get("nextPageToken") or data.get("nextPageToken")
) )
logger.info(f"Got {len(changes)} Drive changes, new token: {new_start_token[:20] if new_start_token else 'None'}...") logger.info(
f"Got {len(changes)} Drive changes, new token: {new_start_token[:20] if new_start_token else 'None'}..."
)
return changes, new_start_token, None return changes, new_start_token, None
except Exception as e: except Exception as e:
logger.error(f"Failed to list Drive changes: {e!s}") logger.error(f"Failed to list Drive changes: {e!s}")
return [], None, str(e) return [], None, str(e)

View file

@ -385,7 +385,9 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
metadata_updates = { metadata_updates = {
"indexed_count": indexed_count, "indexed_count": indexed_count,
"sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", "sync_stage": "completed"
if (not error_message or is_warning or indexed_count > 0)
else "failed",
"error_message": error_message, "error_message": error_message,
} }

View file

@ -208,7 +208,7 @@ async def index_google_calendar_events(
# Use provided dates (including future dates) # Use provided dates (including future dates)
start_date_str = start_date start_date_str = start_date
end_date_str = end_date end_date_str = end_date
# If start_date and end_date are the same, adjust end_date to be one day later # If start_date and end_date are the same, adjust end_date to be one day later
# to ensure valid date range (start_date must be strictly before end_date) # to ensure valid date range (start_date must be strictly before end_date)
if start_date_str == end_date_str: if start_date_str == end_date_str:
@ -269,10 +269,14 @@ async def index_google_calendar_events(
# Check if this is an authentication error that requires re-authentication # Check if this is an authentication error that requires re-authentication
error_message = error error_message = error
error_type = "APIError" error_type = "APIError"
if "re-authenticate" in error.lower() or "expired or been revoked" in error.lower() or "authentication failed" in error.lower(): if (
"re-authenticate" in error.lower()
or "expired or been revoked" in error.lower()
or "authentication failed" in error.lower()
):
error_message = "Google Calendar authentication failed. Please re-authenticate." error_message = "Google Calendar authentication failed. Please re-authenticate."
error_type = "AuthenticationError" error_type = "AuthenticationError"
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, log_entry,
error_message, error_message,
@ -290,7 +294,9 @@ async def index_google_calendar_events(
documents_indexed = 0 documents_indexed = 0
documents_skipped = 0 documents_skipped = 0
skipped_events = [] skipped_events = []
duplicate_content_count = 0 # Track events skipped due to duplicate content_hash duplicate_content_count = (
0 # Track events skipped due to duplicate content_hash
)
for event in events: for event in events:
try: try:
@ -417,7 +423,7 @@ async def index_google_calendar_events(
duplicate_by_content = await check_duplicate_document_by_hash( duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash session, content_hash
) )
if duplicate_by_content: if duplicate_by_content:
# A document with the same content already exists (likely from Composio connector) # A document with the same content already exists (likely from Composio connector)
logger.info( logger.info(
@ -528,7 +534,10 @@ async def index_google_calendar_events(
await session.commit() await session.commit()
except Exception as e: except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.) # Handle any remaining integrity errors gracefully (race conditions, etc.)
if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower(): if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning( logger.warning(
f"Duplicate content_hash detected during final commit. " f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. " f"This may occur if the same event was indexed by multiple connectors. "

View file

@ -578,7 +578,7 @@ async def _check_rename_only_update(
- (True, message): Only filename changed, document was updated - (True, message): Only filename changed, document was updated
- (False, None): Content changed or new file, needs full processing - (False, None): Content changed or new file, needs full processing
""" """
from sqlalchemy import cast, select, String from sqlalchemy import String, cast, select
from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.attributes import flag_modified
from app.db import Document from app.db import Document
@ -603,7 +603,8 @@ async def _check_rename_only_update(
select(Document).where( select(Document).where(
Document.search_space_id == search_space_id, Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
cast(Document.document_metadata["google_drive_file_id"], String) == file_id, cast(Document.document_metadata["google_drive_file_id"], String)
== file_id,
) )
) )
existing_document = result.scalar_one_or_none() existing_document = result.scalar_one_or_none()
@ -755,7 +756,7 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
Handles both new (file_id-based) and legacy (filename-based) hash schemes. Handles both new (file_id-based) and legacy (filename-based) hash schemes.
""" """
from sqlalchemy import cast, select, String from sqlalchemy import String, cast, select
from app.db import Document from app.db import Document
@ -774,7 +775,8 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
select(Document).where( select(Document).where(
Document.search_space_id == search_space_id, Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
cast(Document.document_metadata["google_drive_file_id"], String) == file_id, cast(Document.document_metadata["google_drive_file_id"], String)
== file_id,
) )
) )
existing_document = result.scalar_one_or_none() existing_document = result.scalar_one_or_none()

View file

@ -173,15 +173,16 @@ async def index_google_gmail_messages(
# Check if this is an authentication error that requires re-authentication # Check if this is an authentication error that requires re-authentication
error_message = error error_message = error
error_type = "APIError" error_type = "APIError"
if "re-authenticate" in error.lower() or "expired or been revoked" in error.lower() or "authentication failed" in error.lower(): if (
"re-authenticate" in error.lower()
or "expired or been revoked" in error.lower()
or "authentication failed" in error.lower()
):
error_message = "Gmail authentication failed. Please re-authenticate." error_message = "Gmail authentication failed. Please re-authenticate."
error_type = "AuthenticationError" error_type = "AuthenticationError"
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, log_entry, error_message, error, {"error_type": error_type}
error_message,
error,
{"error_type": error_type}
) )
return 0, error_message return 0, error_message

View file

@ -18,7 +18,10 @@ import { ConnectorDialogHeader } from "./connector-popup/components/connector-di
import { ConnectorConnectView } from "./connector-popup/connector-configs/views/connector-connect-view"; import { ConnectorConnectView } from "./connector-popup/connector-configs/views/connector-connect-view";
import { ConnectorEditView } from "./connector-popup/connector-configs/views/connector-edit-view"; import { ConnectorEditView } from "./connector-popup/connector-configs/views/connector-edit-view";
import { IndexingConfigurationView } from "./connector-popup/connector-configs/views/indexing-configuration-view"; import { IndexingConfigurationView } from "./connector-popup/connector-configs/views/indexing-configuration-view";
import { COMPOSIO_CONNECTORS, OAUTH_CONNECTORS } from "./connector-popup/constants/connector-constants"; import {
COMPOSIO_CONNECTORS,
OAUTH_CONNECTORS,
} from "./connector-popup/constants/connector-constants";
import { useConnectorDialog } from "./connector-popup/hooks/use-connector-dialog"; import { useConnectorDialog } from "./connector-popup/hooks/use-connector-dialog";
import { useIndexingConnectors } from "./connector-popup/hooks/use-indexing-connectors"; import { useIndexingConnectors } from "./connector-popup/hooks/use-indexing-connectors";
import { ActiveConnectorsTab } from "./connector-popup/tabs/active-connectors-tab"; import { ActiveConnectorsTab } from "./connector-popup/tabs/active-connectors-tab";

View file

@ -12,4 +12,3 @@ interface ComposioCalendarConfigProps {
export const ComposioCalendarConfig: FC<ComposioCalendarConfigProps> = () => { export const ComposioCalendarConfig: FC<ComposioCalendarConfigProps> = () => {
return <div className="space-y-6" />; return <div className="space-y-6" />;
}; };

View file

@ -1,6 +1,14 @@
"use client"; "use client";
import { File, FileSpreadsheet, FileText, FolderClosed, Image, Presentation, X } from "lucide-react"; import {
File,
FileSpreadsheet,
FileText,
FolderClosed,
Image,
Presentation,
X,
} from "lucide-react";
import type { FC } from "react"; import type { FC } from "react";
import { useEffect, useState } from "react"; import { useEffect, useState } from "react";
import { ComposioDriveFolderTree } from "@/components/connectors/composio-drive-folder-tree"; import { ComposioDriveFolderTree } from "@/components/connectors/composio-drive-folder-tree";
@ -85,7 +93,10 @@ function getFileIconFromName(fileName: string, className: string = "size-3.5 shr
return <File className={`${className} text-gray-500`} />; return <File className={`${className} text-gray-500`} />;
} }
export const ComposioDriveConfig: FC<ComposioDriveConfigProps> = ({ connector, onConfigChange }) => { export const ComposioDriveConfig: FC<ComposioDriveConfigProps> = ({
connector,
onConfigChange,
}) => {
const isIndexable = connector.config?.is_indexable as boolean; const isIndexable = connector.config?.is_indexable as boolean;
// Initialize with existing selected folders and files from connector config // Initialize with existing selected folders and files from connector config
@ -184,9 +195,7 @@ export const ComposioDriveConfig: FC<ComposioDriveConfigProps> = ({ connector, o
); );
} }
if (selectedFiles.length > 0) { if (selectedFiles.length > 0) {
parts.push( parts.push(`${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`);
`${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`
);
} }
return parts.length > 0 ? `(${parts.join(", ")})` : ""; return parts.length > 0 ? `(${parts.join(", ")})` : "";
})()} })()}
@ -329,13 +338,10 @@ export const ComposioDriveConfig: FC<ComposioDriveConfigProps> = ({ connector, o
<Switch <Switch
id="include-subfolders" id="include-subfolders"
checked={indexingOptions.include_subfolders} checked={indexingOptions.include_subfolders}
onCheckedChange={(checked) => onCheckedChange={(checked) => handleIndexingOptionChange("include_subfolders", checked)}
handleIndexingOptionChange("include_subfolders", checked)
}
/> />
</div> </div>
</div> </div>
</div> </div>
); );
}; };

View file

@ -12,4 +12,3 @@ interface ComposioGmailConfigProps {
export const ComposioGmailConfig: FC<ComposioGmailConfigProps> = () => { export const ComposioGmailConfig: FC<ComposioGmailConfigProps> = () => {
return <div className="space-y-6" />; return <div className="space-y-6" />;
}; };

View file

@ -1,6 +1,14 @@
"use client"; "use client";
import { File, FileSpreadsheet, FileText, FolderClosed, Image, Presentation, X } from "lucide-react"; import {
File,
FileSpreadsheet,
FileText,
FolderClosed,
Image,
Presentation,
X,
} from "lucide-react";
import type { FC } from "react"; import type { FC } from "react";
import { useEffect, useState } from "react"; import { useEffect, useState } from "react";
import { GoogleDriveFolderTree } from "@/components/connectors/google-drive-folder-tree"; import { GoogleDriveFolderTree } from "@/components/connectors/google-drive-folder-tree";

View file

@ -276,7 +276,8 @@ export const ConnectorEditView: FC<ConnectorEditViewProps> = ({
Re-indexing runs in the background Re-indexing runs in the background
</p> </p>
<p className="text-muted-foreground mt-1 text-[10px] sm:text-sm"> <p className="text-muted-foreground mt-1 text-[10px] sm:text-sm">
You can continue using SurfSense while we sync your data. Check inbox for updates. You can continue using SurfSense while we sync your data. Check inbox for
updates.
</p> </p>
</div> </div>
</div> </div>

View file

@ -170,13 +170,13 @@ export const IndexingConfigurationView: FC<IndexingConfigurationViewProps> = ({
{/* Periodic sync - not shown for Google Drive (regular and Composio) */} {/* Periodic sync - not shown for Google Drive (regular and Composio) */}
{config.connectorType !== "GOOGLE_DRIVE_CONNECTOR" && {config.connectorType !== "GOOGLE_DRIVE_CONNECTOR" &&
config.connectorType !== "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" && ( config.connectorType !== "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" && (
<PeriodicSyncConfig <PeriodicSyncConfig
enabled={periodicEnabled} enabled={periodicEnabled}
frequencyMinutes={frequencyMinutes} frequencyMinutes={frequencyMinutes}
onEnabledChange={onPeriodicEnabledChange} onEnabledChange={onPeriodicEnabledChange}
onFrequencyChange={onFrequencyChange} onFrequencyChange={onFrequencyChange}
/> />
)} )}
</> </>
)} )}
@ -189,7 +189,8 @@ export const IndexingConfigurationView: FC<IndexingConfigurationViewProps> = ({
<div className="text-xs sm:text-sm"> <div className="text-xs sm:text-sm">
<p className="font-medium text-xs sm:text-sm">Indexing runs in the background</p> <p className="font-medium text-xs sm:text-sm">Indexing runs in the background</p>
<p className="text-muted-foreground mt-1 text-[10px] sm:text-sm"> <p className="text-muted-foreground mt-1 text-[10px] sm:text-sm">
You can continue using SurfSense while we sync your data. Check inbox for updates. You can continue using SurfSense while we sync your data. Check inbox for
updates.
</p> </p>
</div> </div>
</div> </div>

View file

@ -328,11 +328,7 @@ export const useConnectorDialog = () => {
return; return;
} }
if ( if (params.success === "true" && searchSpaceId && params.modal === "connectors") {
params.success === "true" &&
searchSpaceId &&
params.modal === "connectors"
) {
refetchAllConnectors().then((result) => { refetchAllConnectors().then((result) => {
if (!result.data) return; if (!result.data) return;
@ -346,16 +342,12 @@ export const useConnectorDialog = () => {
if (params.connectorId) { if (params.connectorId) {
const connectorId = parseInt(params.connectorId, 10); const connectorId = parseInt(params.connectorId, 10);
newConnector = result.data.find((c: SearchSourceConnector) => c.id === connectorId); newConnector = result.data.find((c: SearchSourceConnector) => c.id === connectorId);
// If we found the connector, find the matching OAuth/Composio connector by type // If we found the connector, find the matching OAuth/Composio connector by type
if (newConnector) { if (newConnector) {
oauthConnector = oauthConnector =
OAUTH_CONNECTORS.find( OAUTH_CONNECTORS.find((c) => c.connectorType === newConnector!.connector_type) ||
(c) => c.connectorType === newConnector!.connector_type COMPOSIO_CONNECTORS.find((c) => c.connectorType === newConnector!.connector_type);
) ||
COMPOSIO_CONNECTORS.find(
(c) => c.connectorType === newConnector!.connector_type
);
} }
} }
@ -364,7 +356,7 @@ export const useConnectorDialog = () => {
oauthConnector = oauthConnector =
OAUTH_CONNECTORS.find((c) => c.id === params.connector) || OAUTH_CONNECTORS.find((c) => c.id === params.connector) ||
COMPOSIO_CONNECTORS.find((c) => c.id === params.connector); COMPOSIO_CONNECTORS.find((c) => c.id === params.connector);
if (oauthConnector) { if (oauthConnector) {
newConnector = result.data.find( newConnector = result.data.find(
(c: SearchSourceConnector) => c.connector_type === oauthConnector!.connectorType (c: SearchSourceConnector) => c.connector_type === oauthConnector!.connectorType

View file

@ -68,9 +68,7 @@ export function useIndexingConnectors(
// Only check connector_indexing notifications // Only check connector_indexing notifications
if (item.type !== "connector_indexing") continue; if (item.type !== "connector_indexing") continue;
const metadata = isConnectorIndexingMetadata(item.metadata) const metadata = isConnectorIndexingMetadata(item.metadata) ? item.metadata : null;
? item.metadata
: null;
if (!metadata) continue; if (!metadata) continue;
// If status is "in_progress", add connector to indexing set // If status is "in_progress", add connector to indexing set