diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py
index 5a9645a66..953e2e8fc 100644
--- a/surfsense_backend/app/connectors/composio_gmail_connector.py
+++ b/surfsense_backend/app/connectors/composio_gmail_connector.py
@@ -611,4 +611,3 @@ async def index_composio_gmail(
except Exception as e:
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Gmail via Composio: {e!s}"
-
diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py
index 3ac235848..ec5b22b7f 100644
--- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py
+++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py
@@ -259,7 +259,9 @@ async def index_composio_google_calendar(
documents_indexed = 0
documents_skipped = 0
- duplicate_content_count = 0 # Track events skipped due to duplicate content_hash
+ duplicate_content_count = (
+ 0 # Track events skipped due to duplicate content_hash
+ )
for event in events:
try:
@@ -353,7 +355,7 @@ async def index_composio_google_calendar(
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
)
- await session.commit( )
+ await session.commit()
continue
# Document doesn't exist by unique_identifier_hash
@@ -362,7 +364,7 @@ async def index_composio_google_calendar(
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
-
+
if duplicate_by_content:
# A document with the same content already exists (likely from standard connector)
logger.info(
@@ -458,7 +460,10 @@ async def index_composio_google_calendar(
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
- if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower():
+ if (
+ "duplicate key value violates unique constraint" in str(e).lower()
+ or "uniqueviolationerror" in str(e).lower()
+ ):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. "
@@ -495,4 +500,3 @@ async def index_composio_google_calendar(
f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True
)
return 0, f"Failed to index Google Calendar via Composio: {e!s}"
-
diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py
index e19436611..e3b988676 100644
--- a/surfsense_backend/app/connectors/composio_google_drive_connector.py
+++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py
@@ -453,8 +453,8 @@ async def check_document_by_unique_identifier(
session: AsyncSession, unique_identifier_hash: str
) -> Document | None:
"""Check if a document with the given unique identifier hash already exists."""
- from sqlalchemy.orm import selectinload
from sqlalchemy.future import select
+ from sqlalchemy.orm import selectinload
existing_doc_result = await session.execute(
select(Document)
@@ -517,14 +517,20 @@ async def index_composio_google_drive(
# Route to delta sync or full scan
if use_delta_sync:
- logger.info(f"Using delta sync for Composio Google Drive connector {connector_id}")
+ logger.info(
+ f"Using delta sync for Composio Google Drive connector {connector_id}"
+ )
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync for Google Drive via Composio (connector {connector_id})",
{"stage": "delta_sync", "token": stored_page_token[:20] + "..."},
)
- documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_delta_sync(
+ (
+ documents_indexed,
+ documents_skipped,
+ processing_errors,
+ ) = await _index_composio_drive_delta_sync(
session=session,
composio_connector=composio_connector,
connector_id=connector_id,
@@ -536,7 +542,9 @@ async def index_composio_google_drive(
log_entry=log_entry,
)
else:
- logger.info(f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)")
+ logger.info(
+ f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)"
+ )
await task_logger.log_task_progress(
log_entry,
f"Fetching Google Drive files via Composio for connector {connector_id}",
@@ -547,7 +555,11 @@ async def index_composio_google_drive(
},
)
- documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_full_scan(
+ (
+ documents_indexed,
+ documents_skipped,
+ processing_errors,
+ ) = await _index_composio_drive_full_scan(
session=session,
composio_connector=composio_connector,
connector_id=connector_id,
@@ -580,9 +592,13 @@ async def index_composio_google_drive(
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit
- logger.info(f"Final commit: Total {documents_indexed} Google Drive files processed")
+ logger.info(
+ f"Final commit: Total {documents_indexed} Google Drive files processed"
+ )
await session.commit()
- logger.info("Successfully committed all Composio Google Drive document changes to database")
+ logger.info(
+ "Successfully committed all Composio Google Drive document changes to database"
+ )
# Handle processing errors
error_message = None
@@ -731,7 +747,9 @@ async def _index_composio_drive_delta_sync(
processing_errors.append(error_msg)
documents_skipped += 1
- logger.info(f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped")
+ logger.info(
+ f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped"
+ )
return documents_indexed, documents_skipped, processing_errors
@@ -858,20 +876,18 @@ async def _index_composio_drive_full_scan(
logger.info("No Google Drive files found")
return 0, 0, []
- logger.info(f"Found {len(all_files)} Google Drive files to index via Composio (full scan)")
+ logger.info(
+ f"Found {len(all_files)} Google Drive files to index via Composio (full scan)"
+ )
for file_info in all_files:
try:
# Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "")
file_name = (
- file_info.get("name", "")
- or file_info.get("fileName", "")
- or "Untitled"
- )
- mime_type = file_info.get("mimeType", "") or file_info.get(
- "mime_type", ""
+ file_info.get("name", "") or file_info.get("fileName", "") or "Untitled"
)
+ mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "")
if not file_id:
documents_skipped += 1
@@ -901,7 +917,9 @@ async def _index_composio_drive_full_scan(
# Batch commit every 10 documents
if documents_indexed > 0 and documents_indexed % 10 == 0:
- logger.info(f"Committing batch: {documents_indexed} Google Drive files processed so far")
+ logger.info(
+ f"Committing batch: {documents_indexed} Google Drive files processed so far"
+ )
await session.commit()
except Exception as e:
@@ -910,7 +928,9 @@ async def _index_composio_drive_full_scan(
processing_errors.append(error_msg)
documents_skipped += 1
- logger.info(f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped")
+ logger.info(
+ f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
+ )
return documents_indexed, documents_skipped, processing_errors
@@ -948,9 +968,7 @@ async def _process_single_drive_file(
content, content_error = await composio_connector.get_drive_file_content(file_id)
if content_error or not content:
- logger.warning(
- f"Could not get content for file {file_name}: {content_error}"
- )
+ logger.warning(f"Could not get content for file {file_name}: {content_error}")
# Use metadata as content fallback
markdown_content = f"# {file_name}\n\n"
markdown_content += f"**File ID:** {file_id}\n"
@@ -985,9 +1003,7 @@ async def _process_single_drive_file(
return 0, 1, processing_errors # Skipped
# Update existing document
- user_llm = await get_user_long_context_llm(
- session, user_id, search_space_id
- )
+ user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata = {
@@ -1003,12 +1019,8 @@ async def _process_single_drive_file(
markdown_content, user_llm, document_metadata
)
else:
- summary_content = (
- f"Google Drive File: {file_name}\n\nType: {mime_type}"
- )
- summary_embedding = config.embedding_model_instance.embed(
- summary_content
- )
+ summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
+ summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
@@ -1030,9 +1042,7 @@ async def _process_single_drive_file(
return 1, 0, processing_errors # Indexed
# Create new document
- user_llm = await get_user_long_context_llm(
- session, user_id, search_space_id
- )
+ user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if user_llm:
document_metadata = {
@@ -1048,12 +1058,8 @@ async def _process_single_drive_file(
markdown_content, user_llm, document_metadata
)
else:
- summary_content = (
- f"Google Drive File: {file_name}\n\nType: {mime_type}"
- )
- summary_embedding = config.embedding_model_instance.embed(
- summary_content
- )
+ summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
+ summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
@@ -1159,4 +1165,3 @@ async def _fetch_folder_files_recursively(
except Exception as e:
logger.error(f"Error in recursive folder fetch: {e!s}")
return all_files
-
diff --git a/surfsense_backend/app/connectors/google_calendar_connector.py b/surfsense_backend/app/connectors/google_calendar_connector.py
index ac60b02a8..d8160cf25 100644
--- a/surfsense_backend/app/connectors/google_calendar_connector.py
+++ b/surfsense_backend/app/connectors/google_calendar_connector.py
@@ -144,7 +144,10 @@ class GoogleCalendarConnector:
except Exception as e:
error_str = str(e)
# Check if this is an invalid_grant error (token expired/revoked)
- if "invalid_grant" in error_str.lower() or "token has been expired or revoked" in error_str.lower():
+ if (
+ "invalid_grant" in error_str.lower()
+ or "token has been expired or revoked" in error_str.lower()
+ ):
raise Exception(
"Google Calendar authentication failed. Please re-authenticate."
) from e
@@ -173,7 +176,11 @@ class GoogleCalendarConnector:
except Exception as e:
error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it
- if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower():
+ if (
+ "re-authenticate" in error_str.lower()
+ or "expired or been revoked" in error_str.lower()
+ or "authentication failed" in error_str.lower()
+ ):
raise Exception(error_str) from e
raise Exception(f"Failed to create Google Calendar service: {e!s}") from e
@@ -283,7 +290,11 @@ class GoogleCalendarConnector:
except Exception as e:
error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it
- if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower():
+ if (
+ "re-authenticate" in error_str.lower()
+ or "expired or been revoked" in error_str.lower()
+ or "authentication failed" in error_str.lower()
+ ):
return [], error_str
return [], f"Error fetching events: {e!s}"
diff --git a/surfsense_backend/app/connectors/google_gmail_connector.py b/surfsense_backend/app/connectors/google_gmail_connector.py
index c86a96413..7c7262bff 100644
--- a/surfsense_backend/app/connectors/google_gmail_connector.py
+++ b/surfsense_backend/app/connectors/google_gmail_connector.py
@@ -143,7 +143,10 @@ class GoogleGmailConnector:
except Exception as e:
error_str = str(e)
# Check if this is an invalid_grant error (token expired/revoked)
- if "invalid_grant" in error_str.lower() or "token has been expired or revoked" in error_str.lower():
+ if (
+ "invalid_grant" in error_str.lower()
+ or "token has been expired or revoked" in error_str.lower()
+ ):
raise Exception(
"Gmail authentication failed. Please re-authenticate."
) from e
@@ -172,7 +175,11 @@ class GoogleGmailConnector:
except Exception as e:
error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it
- if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower():
+ if (
+ "re-authenticate" in error_str.lower()
+ or "expired or been revoked" in error_str.lower()
+ or "authentication failed" in error_str.lower()
+ ):
raise Exception(error_str) from e
raise Exception(f"Failed to create Gmail service: {e!s}") from e
@@ -237,7 +244,11 @@ class GoogleGmailConnector:
except Exception as e:
error_str = str(e)
# If the error already contains a user-friendly re-authentication message, preserve it
- if "re-authenticate" in error_str.lower() or "expired or been revoked" in error_str.lower() or "authentication failed" in error_str.lower():
+ if (
+ "re-authenticate" in error_str.lower()
+ or "expired or been revoked" in error_str.lower()
+ or "authentication failed" in error_str.lower()
+ ):
return [], error_str
return [], f"Error fetching messages list: {e!s}"
diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py
index 14ef9efcf..a28361132 100644
--- a/surfsense_backend/app/routes/composio_routes.py
+++ b/surfsense_backend/app/routes/composio_routes.py
@@ -350,10 +350,10 @@ async def composio_callback(
count = await count_connectors_of_type(
session, connector_type, space_id, user_id
)
-
+
# Generate base name (e.g., "Gmail", "Google Drive")
base_name = get_base_name_for_type(connector_type)
-
+
# Format: "Gmail (Composio) 1", "Gmail (Composio) 2", etc.
if count == 0:
connector_name = f"{base_name} (Composio) 1"
diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py
index 487a689dc..191c6f954 100644
--- a/surfsense_backend/app/routes/search_source_connectors_routes.py
+++ b/surfsense_backend/app/routes/search_source_connectors_routes.py
@@ -662,16 +662,16 @@ async def index_connector_content(
# Use UTC for "today" to match how last_indexed_at is stored
today_utc = datetime.now(UTC).replace(tzinfo=None).date()
last_indexed_date = last_indexed_naive.date()
-
+
if last_indexed_date == today_utc:
# If last indexed today, go back 1 day to ensure we don't miss anything
indexing_from = (today_utc - timedelta(days=1)).strftime("%Y-%m-%d")
else:
indexing_from = last_indexed_naive.strftime("%Y-%m-%d")
else:
- indexing_from = (datetime.now(UTC).replace(tzinfo=None) - timedelta(days=365)).strftime(
- "%Y-%m-%d"
- )
+ indexing_from = (
+ datetime.now(UTC).replace(tzinfo=None) - timedelta(days=365)
+ ).strftime("%Y-%m-%d")
else:
indexing_from = start_date
@@ -683,7 +683,7 @@ async def index_connector_content(
]:
# Default to today if no end_date provided (users can manually select future dates)
indexing_to = today_str if end_date is None else end_date
-
+
# If start_date and end_date are the same, adjust end_date to be one day later
# to ensure valid date range (start_date must be strictly before end_date)
if indexing_from == indexing_to:
@@ -1251,16 +1251,19 @@ async def _run_indexing_with_notifications(
if error_or_warning:
# Check if this is a duplicate warning or empty result (success cases) or an actual error
# Handle both normal and Composio calendar connectors
- error_or_warning_lower = str(error_or_warning).lower() if error_or_warning else ""
+ error_or_warning_lower = (
+ str(error_or_warning).lower() if error_or_warning else ""
+ )
is_duplicate_warning = "skipped (duplicate)" in error_or_warning_lower
# "No X found" messages are success cases - sync worked, just found nothing in date range
- is_empty_result = ("no " in error_or_warning_lower and "found" in error_or_warning_lower)
-
+ is_empty_result = (
+ "no " in error_or_warning_lower
+ and "found" in error_or_warning_lower
+ )
+
if is_duplicate_warning or is_empty_result:
# These are success cases - sync worked, just found nothing new
- logger.info(
- f"Indexing completed successfully: {error_or_warning}"
- )
+ logger.info(f"Indexing completed successfully: {error_or_warning}")
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
if update_timestamp_func:
await update_timestamp_func(session, connector_id)
@@ -1269,7 +1272,11 @@ async def _run_indexing_with_notifications(
# Refresh notification to ensure it's not stale after timestamp update commit
await session.refresh(notification)
# For empty results, use a cleaner message
- notification_message = "No new items found in date range" if is_empty_result else error_or_warning
+ notification_message = (
+ "No new items found in date range"
+ if is_empty_result
+ else error_or_warning
+ )
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py
index 3ea2d1bf2..ad7841a8b 100644
--- a/surfsense_backend/app/services/composio_service.py
+++ b/surfsense_backend/app/services/composio_service.py
@@ -81,7 +81,9 @@ class ComposioService:
# Default download directory for files from Composio
DEFAULT_DOWNLOAD_DIR = "/tmp/composio_downloads"
- def __init__(self, api_key: str | None = None, file_download_dir: str | None = None):
+ def __init__(
+ self, api_key: str | None = None, file_download_dir: str | None = None
+ ):
"""
Initialize the Composio service.
@@ -90,18 +92,20 @@ class ComposioService:
file_download_dir: Directory for downloaded files. Defaults to /tmp/composio_downloads.
"""
import os
-
+
self.api_key = api_key or config.COMPOSIO_API_KEY
if not self.api_key:
raise ValueError("COMPOSIO_API_KEY is required but not configured")
-
+
# Set up download directory
self.file_download_dir = file_download_dir or self.DEFAULT_DOWNLOAD_DIR
os.makedirs(self.file_download_dir, exist_ok=True)
-
+
# Initialize Composio client with download directory
# Per docs: file_download_dir configures where files are downloaded
- self.client = Composio(api_key=self.api_key, file_download_dir=self.file_download_dir)
+ self.client = Composio(
+ api_key=self.api_key, file_download_dir=self.file_download_dir
+ )
@staticmethod
def is_enabled() -> bool:
@@ -512,7 +516,7 @@ class ComposioService:
Tuple of (file content bytes, error message).
"""
from pathlib import Path
-
+
try:
result = await self.execute_tool(
connected_account_id=connected_account_id,
@@ -532,35 +536,37 @@ class ComposioService:
# Response structure: {data: {...}, error: ..., successful: ...}
# The actual file info is nested inside data["data"]
file_path = None
-
+
if isinstance(data, dict):
# Handle nested response structure: data contains {data, error, successful}
# The actual file info is in data["data"]
inner_data = data
if "data" in data and isinstance(data["data"], dict):
inner_data = data["data"]
- logger.debug(f"Found nested data structure. Inner keys: {list(inner_data.keys())}")
+ logger.debug(
+ f"Found nested data structure. Inner keys: {list(inner_data.keys())}"
+ )
elif "successful" in data and "data" in data:
# Standard Composio response wrapper
inner_data = data["data"] if data["data"] else data
-
+
# Try documented fields: file_path, downloaded_file_content, path, uri
file_path = (
- inner_data.get("file_path") or
- inner_data.get("downloaded_file_content") or
- inner_data.get("path") or
- inner_data.get("uri")
+ inner_data.get("file_path")
+ or inner_data.get("downloaded_file_content")
+ or inner_data.get("path")
+ or inner_data.get("uri")
)
-
+
# Handle nested dict case where downloaded_file_content contains the path
if isinstance(file_path, dict):
file_path = (
- file_path.get("file_path") or
- file_path.get("downloaded_file_content") or
- file_path.get("path") or
- file_path.get("uri")
+ file_path.get("file_path")
+ or file_path.get("downloaded_file_content")
+ or file_path.get("path")
+ or file_path.get("uri")
)
-
+
# If still no path, check if inner_data itself has the nested structure
if not file_path and isinstance(inner_data, dict):
for key in ["downloaded_file_content", "file_path", "path", "uri"]:
@@ -572,15 +578,17 @@ class ComposioService:
elif isinstance(val, dict):
# One more level of nesting
file_path = (
- val.get("file_path") or
- val.get("downloaded_file_content") or
- val.get("path") or
- val.get("uri")
+ val.get("file_path")
+ or val.get("downloaded_file_content")
+ or val.get("path")
+ or val.get("uri")
)
if file_path:
break
-
- logger.debug(f"Composio response keys: {list(data.keys())}, inner keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}, extracted path: {file_path}")
+
+ logger.debug(
+ f"Composio response keys: {list(data.keys())}, inner keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}, extracted path: {file_path}"
+ )
elif isinstance(data, str):
# Direct string response (could be path or content)
file_path = data
@@ -591,24 +599,31 @@ class ComposioService:
# Read file from the path
if file_path and isinstance(file_path, str):
path_obj = Path(file_path)
-
+
# Check if it's a valid file path (absolute or in .composio directory)
- if path_obj.is_absolute() or '.composio' in str(path_obj):
+ if path_obj.is_absolute() or ".composio" in str(path_obj):
try:
if path_obj.exists():
content = path_obj.read_bytes()
- logger.info(f"Successfully read {len(content)} bytes from Composio file: {file_path}")
+ logger.info(
+ f"Successfully read {len(content)} bytes from Composio file: {file_path}"
+ )
return content, None
else:
- logger.warning(f"File path from Composio does not exist: {file_path}")
+ logger.warning(
+ f"File path from Composio does not exist: {file_path}"
+ )
return None, f"File not found at path: {file_path}"
except Exception as e:
- logger.error(f"Failed to read file from Composio path {file_path}: {e!s}")
+ logger.error(
+ f"Failed to read file from Composio path {file_path}: {e!s}"
+ )
return None, f"Failed to read file: {e!s}"
else:
# Not a file path - might be base64 encoded content
try:
import base64
+
content = base64.b64decode(file_path)
return content, None
except Exception:
@@ -625,8 +640,11 @@ class ComposioService:
f"Inner data keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else type(inner_data).__name__}, "
f"Full inner data: {inner_data}"
)
- return None, f"No file path in Composio response. Keys: {list(data.keys())}, inner: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}"
-
+ return (
+ None,
+ f"No file path in Composio response. Keys: {list(data.keys())}, inner: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}",
+ )
+
return None, f"Unexpected data type from Composio: {type(data).__name__}"
except Exception as e:
@@ -638,14 +656,14 @@ class ComposioService:
) -> tuple[str | None, str | None]:
"""
Get the starting page token for Google Drive change tracking.
-
+
This token represents the current state and is used for future delta syncs.
Per Composio docs: Use GOOGLEDRIVE_GET_CHANGES_START_PAGE_TOKEN to get initial token.
-
+
Args:
connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account.
-
+
Returns:
Tuple of (start_page_token, error message).
"""
@@ -656,27 +674,27 @@ class ComposioService:
params={},
entity_id=entity_id,
)
-
+
if not result.get("success"):
return None, result.get("error", "Unknown error")
-
+
data = result.get("data", {})
# Handle nested response: {data: {startPageToken: ...}, successful: ...}
if isinstance(data, dict):
inner_data = data.get("data", data)
token = (
- inner_data.get("startPageToken") or
- inner_data.get("start_page_token") or
- data.get("startPageToken") or
- data.get("start_page_token")
+ inner_data.get("startPageToken")
+ or inner_data.get("start_page_token")
+ or data.get("startPageToken")
+ or data.get("start_page_token")
)
if token:
logger.info(f"Got Drive start page token: {token}")
return token, None
-
+
logger.warning(f"Could not extract start page token from response: {data}")
return None, "No start page token in response"
-
+
except Exception as e:
logger.error(f"Failed to get Drive start page token: {e!s}")
return None, str(e)
@@ -691,18 +709,18 @@ class ComposioService:
) -> tuple[list[dict[str, Any]], str | None, str | None]:
"""
List changes in Google Drive since the given page token.
-
+
Per Composio docs: GOOGLEDRIVE_LIST_CHANGES tracks modifications to files/folders.
If pageToken is not provided, it auto-fetches the current start page token.
Response includes nextPageToken for pagination and newStartPageToken for future syncs.
-
+
Args:
connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account.
page_token: Page token from previous sync (optional - will auto-fetch if not provided).
page_size: Number of changes per page.
include_removed: Whether to include removed items in the response.
-
+
Returns:
Tuple of (changes list, new_start_page_token, error message).
"""
@@ -713,42 +731,44 @@ class ComposioService:
}
if page_token:
params["pageToken"] = page_token
-
+
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GOOGLEDRIVE_LIST_CHANGES",
params=params,
entity_id=entity_id,
)
-
+
if not result.get("success"):
return [], None, result.get("error", "Unknown error")
-
+
data = result.get("data", {})
-
+
# Handle nested response structure
changes = []
new_start_token = None
-
+
if isinstance(data, dict):
inner_data = data.get("data", data)
changes = inner_data.get("changes", []) or data.get("changes", [])
-
+
# Get the token for next sync
# newStartPageToken is returned when all changes have been fetched
# nextPageToken is for pagination within the current fetch
new_start_token = (
- inner_data.get("newStartPageToken") or
- inner_data.get("new_start_page_token") or
- inner_data.get("nextPageToken") or
- inner_data.get("next_page_token") or
- data.get("newStartPageToken") or
- data.get("nextPageToken")
+ inner_data.get("newStartPageToken")
+ or inner_data.get("new_start_page_token")
+ or inner_data.get("nextPageToken")
+ or inner_data.get("next_page_token")
+ or data.get("newStartPageToken")
+ or data.get("nextPageToken")
)
-
- logger.info(f"Got {len(changes)} Drive changes, new token: {new_start_token[:20] if new_start_token else 'None'}...")
+
+ logger.info(
+ f"Got {len(changes)} Drive changes, new token: {new_start_token[:20] if new_start_token else 'None'}..."
+ )
return changes, new_start_token, None
-
+
except Exception as e:
logger.error(f"Failed to list Drive changes: {e!s}")
return [], None, str(e)
diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py
index 9fcf807e7..04f39d8ef 100644
--- a/surfsense_backend/app/services/notification_service.py
+++ b/surfsense_backend/app/services/notification_service.py
@@ -385,7 +385,9 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
metadata_updates = {
"indexed_count": indexed_count,
- "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed",
+ "sync_stage": "completed"
+ if (not error_message or is_warning or indexed_count > 0)
+ else "failed",
"error_message": error_message,
}
diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py
index ef1f821d2..2365ff984 100644
--- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py
+++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py
@@ -208,7 +208,7 @@ async def index_google_calendar_events(
# Use provided dates (including future dates)
start_date_str = start_date
end_date_str = end_date
-
+
# If start_date and end_date are the same, adjust end_date to be one day later
# to ensure valid date range (start_date must be strictly before end_date)
if start_date_str == end_date_str:
@@ -269,10 +269,14 @@ async def index_google_calendar_events(
# Check if this is an authentication error that requires re-authentication
error_message = error
error_type = "APIError"
- if "re-authenticate" in error.lower() or "expired or been revoked" in error.lower() or "authentication failed" in error.lower():
+ if (
+ "re-authenticate" in error.lower()
+ or "expired or been revoked" in error.lower()
+ or "authentication failed" in error.lower()
+ ):
error_message = "Google Calendar authentication failed. Please re-authenticate."
error_type = "AuthenticationError"
-
+
await task_logger.log_task_failure(
log_entry,
error_message,
@@ -290,7 +294,9 @@ async def index_google_calendar_events(
documents_indexed = 0
documents_skipped = 0
skipped_events = []
- duplicate_content_count = 0 # Track events skipped due to duplicate content_hash
+ duplicate_content_count = (
+ 0 # Track events skipped due to duplicate content_hash
+ )
for event in events:
try:
@@ -417,7 +423,7 @@ async def index_google_calendar_events(
duplicate_by_content = await check_duplicate_document_by_hash(
session, content_hash
)
-
+
if duplicate_by_content:
# A document with the same content already exists (likely from Composio connector)
logger.info(
@@ -528,7 +534,10 @@ async def index_google_calendar_events(
await session.commit()
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
- if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower():
+ if (
+ "duplicate key value violates unique constraint" in str(e).lower()
+ or "uniqueviolationerror" in str(e).lower()
+ ):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same event was indexed by multiple connectors. "
diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py
index af180c36b..f50e149d3 100644
--- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py
+++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py
@@ -578,7 +578,7 @@ async def _check_rename_only_update(
- (True, message): Only filename changed, document was updated
- (False, None): Content changed or new file, needs full processing
"""
- from sqlalchemy import cast, select, String
+ from sqlalchemy import String, cast, select
from sqlalchemy.orm.attributes import flag_modified
from app.db import Document
@@ -603,7 +603,8 @@ async def _check_rename_only_update(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
- cast(Document.document_metadata["google_drive_file_id"], String) == file_id,
+ cast(Document.document_metadata["google_drive_file_id"], String)
+ == file_id,
)
)
existing_document = result.scalar_one_or_none()
@@ -755,7 +756,7 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
Handles both new (file_id-based) and legacy (filename-based) hash schemes.
"""
- from sqlalchemy import cast, select, String
+ from sqlalchemy import String, cast, select
from app.db import Document
@@ -774,7 +775,8 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
- cast(Document.document_metadata["google_drive_file_id"], String) == file_id,
+ cast(Document.document_metadata["google_drive_file_id"], String)
+ == file_id,
)
)
existing_document = result.scalar_one_or_none()
diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py
index 6a3057437..08d2904d6 100644
--- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py
+++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py
@@ -173,15 +173,16 @@ async def index_google_gmail_messages(
# Check if this is an authentication error that requires re-authentication
error_message = error
error_type = "APIError"
- if "re-authenticate" in error.lower() or "expired or been revoked" in error.lower() or "authentication failed" in error.lower():
+ if (
+ "re-authenticate" in error.lower()
+ or "expired or been revoked" in error.lower()
+ or "authentication failed" in error.lower()
+ ):
error_message = "Gmail authentication failed. Please re-authenticate."
error_type = "AuthenticationError"
-
+
await task_logger.log_task_failure(
- log_entry,
- error_message,
- error,
- {"error_type": error_type}
+ log_entry, error_message, error, {"error_type": error_type}
)
return 0, error_message
diff --git a/surfsense_web/components/assistant-ui/connector-popup.tsx b/surfsense_web/components/assistant-ui/connector-popup.tsx
index 68a548409..293d4a243 100644
--- a/surfsense_web/components/assistant-ui/connector-popup.tsx
+++ b/surfsense_web/components/assistant-ui/connector-popup.tsx
@@ -18,7 +18,10 @@ import { ConnectorDialogHeader } from "./connector-popup/components/connector-di
import { ConnectorConnectView } from "./connector-popup/connector-configs/views/connector-connect-view";
import { ConnectorEditView } from "./connector-popup/connector-configs/views/connector-edit-view";
import { IndexingConfigurationView } from "./connector-popup/connector-configs/views/indexing-configuration-view";
-import { COMPOSIO_CONNECTORS, OAUTH_CONNECTORS } from "./connector-popup/constants/connector-constants";
+import {
+ COMPOSIO_CONNECTORS,
+ OAUTH_CONNECTORS,
+} from "./connector-popup/constants/connector-constants";
import { useConnectorDialog } from "./connector-popup/hooks/use-connector-dialog";
import { useIndexingConnectors } from "./connector-popup/hooks/use-indexing-connectors";
import { ActiveConnectorsTab } from "./connector-popup/tabs/active-connectors-tab";
diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx
index ce5133a9d..6f282d892 100644
--- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx
+++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx
@@ -12,4 +12,3 @@ interface ComposioCalendarConfigProps {
export const ComposioCalendarConfig: FC
- You can continue using SurfSense while we sync your data. Check inbox for updates. + You can continue using SurfSense while we sync your data. Check inbox for + updates.
diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx index 019e6b37f..684f03252 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx @@ -170,13 +170,13 @@ export const IndexingConfigurationView: FCIndexing runs in the background
- You can continue using SurfSense while we sync your data. Check inbox for updates. + You can continue using SurfSense while we sync your data. Check inbox for + updates.