merge upstream/dev into feat/migrate-electric-to-zero

Resolve 8 conflicts:
- Accept upstream deletion of 3 composio_*_connector.py (unified Google connectors)
- Accept our deletion of ElectricProvider.tsx, use-connectors-electric.ts,
  use-messages-electric.ts (replaced by Zero equivalents)
- Keep both new deps in package.json (@rocicorp/zero + @slate-serializers/html)
- Regenerate pnpm-lock.yaml
This commit is contained in:
CREDO23 2026-03-24 17:40:34 +02:00
commit 5d8a62a4a6
207 changed files with 28023 additions and 12247 deletions

View file

@ -55,7 +55,6 @@ async def _check_and_trigger_schedules():
from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task,
index_clickup_tasks_task,
index_composio_connector_task,
index_confluence_pages_task,
index_crawled_urls_task,
index_discord_messages_task,
@ -88,10 +87,10 @@ async def _check_and_trigger_schedules():
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task,
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
# Composio connector types
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_composio_connector_task,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_composio_connector_task,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_composio_connector_task,
# Composio connector types (unified with native Google tasks)
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task,
}
# Trigger indexing for each due connector
@ -129,11 +128,11 @@ async def _check_and_trigger_schedules():
f"({connector.connector_type.value})"
)
# Special handling for Google Drive - uses config for folder/file selection
if (
connector.connector_type
== SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
):
# Special handling for Google Drive (native and Composio) - uses config for folder/file selection
if connector.connector_type in [
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]:
connector_config = connector.config or {}
selected_folders = connector_config.get("selected_folders", [])
selected_files = connector_config.get("selected_files", [])

View file

@ -936,6 +936,19 @@ async def _stream_agent_events(
"delete_linear_issue",
"create_google_drive_file",
"delete_google_drive_file",
"create_gmail_draft",
"update_gmail_draft",
"send_gmail_email",
"trash_gmail_email",
"create_calendar_event",
"update_calendar_event",
"delete_calendar_event",
"create_jira_issue",
"update_jira_issue",
"delete_jira_issue",
"create_confluence_page",
"update_confluence_page",
"delete_confluence_page",
):
yield streaming_service.format_tool_output_available(
tool_call_id,

View file

@ -25,6 +25,10 @@ from app.utils.document_converters import (
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
from .base import (
check_document_by_unique_identifier,
@ -37,6 +41,11 @@ from .base import (
update_connector_last_indexed,
)
ACCEPTED_CALENDAR_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
}
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
@ -53,7 +62,7 @@ async def index_google_calendar_events(
end_date: str | None = None,
update_last_indexed: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
) -> tuple[int, int, str | None]:
"""
Index Google Calendar events.
@ -69,7 +78,7 @@ async def index_google_calendar_events(
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
Tuple containing (number of documents indexed, error message or None)
Tuple containing (number of documents indexed, number of documents skipped, error message or None)
"""
task_logger = TaskLoggingService(session, search_space_id)
@ -87,10 +96,12 @@ async def index_google_calendar_events(
)
try:
# Get the connector from the database
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR
)
# Accept both native and Composio Calendar connectors
connector = None
for ct in ACCEPTED_CALENDAR_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
await task_logger.log_task_failure(
@ -99,71 +110,80 @@ async def index_google_calendar_events(
"Connector not found",
{"error_type": "ConnectorNotFound"},
)
return 0, f"Connector with ID {connector_id} not found"
return 0, 0, f"Connector with ID {connector_id} not found"
# Get the Google Calendar credentials from the connector config
config_data = connector.config
# Decrypt sensitive credentials if encrypted (for backward compatibility)
from app.config import config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Calendar credentials for connector {connector_id}"
)
except Exception as e:
# Build credentials based on connector type
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Calendar credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
f"Composio connected_account_id not found for connector {connector_id}",
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, f"Failed to decrypt Google Calendar credentials: {e!s}"
return 0, 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id)
else:
config_data = connector.config
exp = config_data.get("expiry", "").replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes"),
expiry=datetime.fromisoformat(exp) if exp else None,
)
from app.config import config
from app.utils.oauth_security import TokenEncryption
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google Calendar credentials not found in connector config for connector {connector_id}",
"Missing Google Calendar credentials",
{"error_type": "MissingCredentials"},
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Calendar credentials for connector {connector_id}"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Calendar credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
)
return 0, 0, f"Failed to decrypt Google Calendar credentials: {e!s}"
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
return 0, "Google Calendar credentials not found in connector config"
# Initialize Google Calendar client
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google Calendar credentials not found in connector config for connector {connector_id}",
"Missing Google Calendar credentials",
{"error_type": "MissingCredentials"},
)
return 0, 0, "Google Calendar credentials not found in connector config"
await task_logger.log_task_progress(
log_entry,
f"Initializing Google Calendar client for connector {connector_id}",
@ -281,7 +301,7 @@ async def index_google_calendar_events(
f"No Google Calendar events found in date range {start_date_str} to {end_date_str}",
{"events_found": 0},
)
return 0, None
return 0, 0, None
else:
logger.error(f"Failed to get Google Calendar events: {error}")
# Check if this is an authentication error that requires re-authentication
@ -301,13 +321,13 @@ async def index_google_calendar_events(
error,
{"error_type": error_type},
)
return 0, error_message
return 0, 0, error_message
logger.info(f"Retrieved {len(events)} events from Google Calendar API")
except Exception as e:
logger.error(f"Error fetching Google Calendar events: {e!s}", exc_info=True)
return 0, f"Error fetching Google Calendar events: {e!s}"
return 0, 0, f"Error fetching Google Calendar events: {e!s}"
documents_indexed = 0
documents_skipped = 0
@ -363,6 +383,31 @@ async def index_google_calendar_events(
session, unique_identifier_hash
)
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
event_id,
search_space_id,
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = (
unique_identifier_hash
)
if (
existing_document.document_type
== DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
):
existing_document.document_type = (
DocumentType.GOOGLE_CALENDAR_CONNECTOR
)
logger.info(
f"Migrated legacy Composio Calendar document: {event_id}"
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
@ -609,7 +654,7 @@ async def index_google_calendar_events(
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return total_processed, warning_message
return total_processed, documents_skipped, warning_message
except SQLAlchemyError as db_error:
await session.rollback()
@ -620,7 +665,7 @@ async def index_google_calendar_events(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -630,4 +675,4 @@ async def index_google_calendar_events(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Calendar events: {e!s}", exc_info=True)
return 0, f"Failed to index Google Calendar events: {e!s}"
return 0, 0, f"Failed to index Google Calendar events: {e!s}"

View file

@ -31,6 +31,15 @@ from app.tasks.connector_indexers.base import (
update_connector_last_indexed,
)
from app.utils.document_converters import generate_unique_identifier_hash
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
ACCEPTED_DRIVE_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
}
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
@ -53,7 +62,7 @@ async def index_google_drive_files(
max_files: int = 500,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]:
) -> tuple[int, int, str | None]:
"""
Index Google Drive files for a specific connector.
@ -71,7 +80,7 @@ async def index_google_drive_files(
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
Tuple of (number_of_indexed_files, error_message)
Tuple of (number_of_indexed_files, number_of_skipped_files, error_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
@ -89,16 +98,19 @@ async def index_google_drive_files(
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
)
# Accept both native and Composio Drive connectors
connector = None
for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
return 0, 0, error_msg
await task_logger.log_task_progress(
log_entry,
@ -106,34 +118,51 @@ async def index_google_drive_files(
{"stage": "client_initialization"},
)
# Check if credentials are encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
# Credentials are explicitly marked as encrypted, will be decrypted during client initialization
if not config.SECRET_KEY:
# Build credentials based on connector type
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
return 0, 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
# If _token_encrypted is False or not set, treat credentials as plaintext
drive_client = GoogleDriveClient(session, connector_id)
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
if not folder_id:
error_msg = "folder_id is required for Google Drive indexing"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "MissingParameter"}
)
return 0, error_msg
return 0, 0, error_msg
target_folder_id = folder_id
target_folder_name = folder_name or "Selected Folder"
@ -164,7 +193,33 @@ async def index_google_drive_files(
max_files=max_files,
include_subfolders=include_subfolders,
on_heartbeat_callback=on_heartbeat_callback,
enable_summary=connector_enable_summary,
)
documents_indexed, documents_skipped = result
# Reconciliation: full scan re-indexes documents that were manually
# deleted from SurfSense but still exist in Google Drive.
# Already-indexed files are skipped via md5/modifiedTime checks,
# so the overhead is just one API listing call + fast DB lookups.
logger.info("Running reconciliation scan after delta sync")
reconcile_result = await _index_full_scan(
drive_client=drive_client,
session=session,
connector=connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
folder_id=target_folder_id,
folder_name=target_folder_name,
task_logger=task_logger,
log_entry=log_entry,
max_files=max_files,
include_subfolders=include_subfolders,
on_heartbeat_callback=on_heartbeat_callback,
enable_summary=connector_enable_summary,
)
documents_indexed += reconcile_result[0]
documents_skipped += reconcile_result[1]
else:
logger.info(f"Using full scan for connector {connector_id}")
result = await _index_full_scan(
@ -181,9 +236,9 @@ async def index_google_drive_files(
max_files=max_files,
include_subfolders=include_subfolders,
on_heartbeat_callback=on_heartbeat_callback,
enable_summary=connector_enable_summary,
)
documents_indexed, documents_skipped = result
documents_indexed, documents_skipped = result
if documents_indexed > 0 or can_use_delta_sync:
new_token, token_error = await get_start_page_token(drive_client)
@ -217,7 +272,7 @@ async def index_google_drive_files(
logger.info(
f"Google Drive indexing completed: {documents_indexed} files indexed, {documents_skipped} skipped"
)
return documents_indexed, None
return documents_indexed, documents_skipped, None
except SQLAlchemyError as db_error:
await session.rollback()
@ -228,7 +283,7 @@ async def index_google_drive_files(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -238,7 +293,7 @@ async def index_google_drive_files(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google Drive files: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive files: {e!s}"
return 0, 0, f"Failed to index Google Drive files: {e!s}"
async def index_google_drive_single_file(
@ -278,14 +333,17 @@ async def index_google_drive_single_file(
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR
)
# Accept both native and Composio Drive connectors
connector = None
for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
error_msg = f"Google Drive connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
@ -295,27 +353,42 @@ async def index_google_drive_single_file(
{"stage": "client_initialization"},
)
# Check if credentials are encrypted (only when explicitly marked)
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
# Credentials are explicitly marked as encrypted, will be decrypted during client initialization
if not config.SECRET_KEY:
pre_built_credentials = None
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
error_msg = f"Composio connected_account_id not found for connector {connector_id}"
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
error_msg,
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
return 0, error_msg
pre_built_credentials = build_composio_credentials(connected_account_id)
else:
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted:
if not config.SECRET_KEY:
await task_logger.log_task_failure(
log_entry,
f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}",
"Missing SECRET_KEY for token decryption",
{"error_type": "MissingSecretKey"},
)
return (
0,
"SECRET_KEY not configured but credentials are marked as encrypted",
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
logger.info(
f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization"
)
# If _token_encrypted is False or not set, treat credentials as plaintext
drive_client = GoogleDriveClient(session, connector_id)
connector_enable_summary = getattr(connector, "enable_summary", True)
drive_client = GoogleDriveClient(
session, connector_id, credentials=pre_built_credentials
)
# Fetch the file metadata
file, error = await get_file_by_id(drive_client, file_id)
@ -362,6 +435,7 @@ async def index_google_drive_single_file(
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
enable_summary=connector_enable_summary,
)
await session.commit()
@ -433,6 +507,7 @@ async def _index_full_scan(
max_files: int,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Perform full scan indexing of a folder.
@ -467,6 +542,7 @@ async def _index_full_scan(
# Queue of folders to process: (folder_id, folder_name)
folders_to_process = [(folder_id, folder_name)]
first_listing_error: str | None = None
logger.info("Phase 1: Collecting files and creating pending documents")
@ -486,6 +562,8 @@ async def _index_full_scan(
if error:
logger.error(f"Error listing files in {current_folder_name}: {error}")
if first_listing_error is None:
first_listing_error = error
break
if not files:
@ -531,6 +609,19 @@ async def _index_full_scan(
if not page_token:
break
if not files_to_process and first_listing_error:
error_lower = first_listing_error.lower()
if (
"401" in first_listing_error
or "invalid credentials" in error_lower
or "authError" in first_listing_error
):
raise Exception(
f"Google Drive authentication failed. Please re-authenticate. "
f"(Error: {first_listing_error})"
)
raise Exception(f"Failed to list Google Drive files: {first_listing_error}")
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
@ -562,6 +653,7 @@ async def _index_full_scan(
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
enable_summary=enable_summary,
)
documents_indexed += indexed
@ -592,6 +684,7 @@ async def _index_with_delta_sync(
max_files: int,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Perform delta sync indexing using change tracking.
@ -614,7 +707,17 @@ async def _index_with_delta_sync(
if error:
logger.error(f"Error fetching changes: {error}")
return 0, 0
error_lower = error.lower()
if (
"401" in error
or "invalid credentials" in error_lower
or "authError" in error
):
raise Exception(
f"Google Drive authentication failed. Please re-authenticate. "
f"(Error: {error})"
)
raise Exception(f"Failed to fetch Google Drive changes: {error}")
if not changes:
logger.info("No changes detected since last sync")
@ -703,6 +806,7 @@ async def _index_with_delta_sync(
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
enable_summary=enable_summary,
)
documents_indexed += indexed
@ -763,10 +867,25 @@ async def _create_pending_document_for_file(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
# Check if document exists
# Check if document exists (primary hash first, then legacy Composio hash)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = unique_identifier_hash
if (
existing_document.document_type
== DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE
logger.info(f"Migrated legacy Composio document to native type: {file_id}")
if existing_document:
# Check if this is a rename-only update (content unchanged)
@ -862,12 +981,26 @@ async def _check_rename_only_update(
)
existing_document = await check_document_by_unique_identifier(session, primary_hash)
# If not found by primary hash, try searching by metadata (for legacy documents)
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
# Fallback: metadata search (covers old filename-based hashes)
if not existing_document:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
Document.document_type.in_(
[
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]
),
cast(Document.document_metadata["google_drive_file_id"], String)
== file_id,
)
@ -876,6 +1009,17 @@ async def _check_rename_only_update(
if existing_document:
logger.debug(f"Found legacy document by metadata for file_id: {file_id}")
# Migrate legacy Composio document to native type
if existing_document:
if existing_document.unique_identifier_hash != primary_hash:
existing_document.unique_identifier_hash = primary_hash
if (
existing_document.document_type
== DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR
):
existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE
logger.info(f"Migrated legacy Composio Drive document: {file_id}")
if not existing_document:
# New file, needs full processing
return False, None
@ -957,6 +1101,7 @@ async def _process_single_file(
task_logger: TaskLoggingService,
log_entry: any,
pending_document: Document | None = None,
enable_summary: bool = True,
) -> tuple[int, int, int]:
"""
Process a single file by downloading and using Surfsense's file processor.
@ -1020,6 +1165,7 @@ async def _process_single_file(
task_logger=task_logger,
log_entry=log_entry,
connector_id=connector_id,
enable_summary=enable_summary,
)
if error:
@ -1088,12 +1234,26 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id:
session, unique_identifier_hash
)
# If not found, search by metadata (for legacy documents with filename-based hash)
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
# Fallback: metadata search (covers old filename-based hashes, both native and Composio)
if not existing_document:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.GOOGLE_DRIVE_FILE,
Document.document_type.in_(
[
DocumentType.GOOGLE_DRIVE_FILE,
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
]
),
cast(Document.document_metadata["google_drive_file_id"], String)
== file_id,
)

View file

@ -30,6 +30,10 @@ from app.utils.document_converters import (
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.google_credentials import (
COMPOSIO_GOOGLE_CONNECTOR_TYPES,
build_composio_credentials,
)
from .base import (
calculate_date_range,
@ -42,6 +46,11 @@ from .base import (
update_connector_last_indexed,
)
ACCEPTED_GMAIL_CONNECTOR_TYPES = {
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
}
# Type hint for heartbeat callback
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
@ -59,7 +68,7 @@ async def index_google_gmail_messages(
update_last_indexed: bool = True,
max_messages: int = 1000,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str]:
) -> tuple[int, int, str | None]:
"""
Index Gmail messages for a specific connector.
@ -75,7 +84,7 @@ async def index_google_gmail_messages(
on_heartbeat_callback: Optional callback to update notification during long-running indexing.
Returns:
Tuple of (number_of_indexed_messages, status_message)
Tuple of (number_of_indexed_messages, number_of_skipped_messages, status_message)
"""
task_logger = TaskLoggingService(session, search_space_id)
@ -94,90 +103,98 @@ async def index_google_gmail_messages(
)
try:
# Get connector by id
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR
)
# Accept both native and Composio Gmail connectors
connector = None
for ct in ACCEPTED_GMAIL_CONNECTOR_TYPES:
connector = await get_connector_by_id(session, connector_id, ct)
if connector:
break
if not connector:
error_msg = f"Gmail connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "ConnectorNotFound"}
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, error_msg
return 0, 0, error_msg
# Get the Google Gmail credentials from the connector config
config_data = connector.config
# Decrypt sensitive credentials if encrypted (for backward compatibility)
from app.config import config
from app.utils.oauth_security import TokenEncryption
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
# Decrypt sensitive fields
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Gmail credentials for connector {connector_id}"
)
except Exception as e:
# Build credentials based on connector type
if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES:
connected_account_id = connector.config.get("composio_connected_account_id")
if not connected_account_id:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Gmail credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
f"Composio connected_account_id not found for connector {connector_id}",
"Missing Composio account",
{"error_type": "MissingComposioAccount"},
)
return 0, f"Failed to decrypt Google Gmail credentials: {e!s}"
return 0, 0, "Composio connected_account_id not found"
credentials = build_composio_credentials(connected_account_id)
else:
config_data = connector.config
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
from app.config import config
from app.utils.oauth_security import TokenEncryption
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
token_encrypted = config_data.get("_token_encrypted", False)
if token_encrypted and config.SECRET_KEY:
try:
token_encryption = TokenEncryption(config.SECRET_KEY)
if config_data.get("token"):
config_data["token"] = token_encryption.decrypt_token(
config_data["token"]
)
if config_data.get("refresh_token"):
config_data["refresh_token"] = token_encryption.decrypt_token(
config_data["refresh_token"]
)
if config_data.get("client_secret"):
config_data["client_secret"] = token_encryption.decrypt_token(
config_data["client_secret"]
)
logger.info(
f"Decrypted Google Gmail credentials for connector {connector_id}"
)
except Exception as e:
await task_logger.log_task_failure(
log_entry,
f"Failed to decrypt Google Gmail credentials for connector {connector_id}: {e!s}",
"Credential decryption failed",
{"error_type": "CredentialDecryptionError"},
)
return 0, 0, f"Failed to decrypt Google Gmail credentials: {e!s}"
exp = config_data.get("expiry", "")
if exp:
exp = exp.replace("Z", "")
credentials = Credentials(
token=config_data.get("token"),
refresh_token=config_data.get("refresh_token"),
token_uri=config_data.get("token_uri"),
client_id=config_data.get("client_id"),
client_secret=config_data.get("client_secret"),
scopes=config_data.get("scopes", []),
expiry=datetime.fromisoformat(exp) if exp else None,
)
return 0, "Google gmail credentials not found in connector config"
# Initialize Google gmail client
if (
not credentials.client_id
or not credentials.client_secret
or not credentials.refresh_token
):
await task_logger.log_task_failure(
log_entry,
f"Google gmail credentials not found in connector config for connector {connector_id}",
"Missing Google gmail credentials",
{"error_type": "MissingCredentials"},
)
return 0, 0, "Google gmail credentials not found in connector config"
await task_logger.log_task_progress(
log_entry,
f"Initializing Google gmail client for connector {connector_id}",
{"stage": "client_initialization"},
)
# Initialize Google gmail connector
gmail_connector = GoogleGmailConnector(
credentials, session, user_id, connector_id
)
@ -215,14 +232,14 @@ async def index_google_gmail_messages(
await task_logger.log_task_failure(
log_entry, error_message, error, {"error_type": error_type}
)
return 0, error_message
return 0, 0, error_message
if not messages:
success_msg = "No Google gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
return 0, success_msg
return 0, 0, success_msg
logger.info(f"Found {len(messages)} Google gmail messages to index")
@ -293,6 +310,31 @@ async def index_google_gmail_messages(
session, unique_identifier_hash
)
# Fallback: legacy Composio hash
if not existing_document:
legacy_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_GMAIL_CONNECTOR,
message_id,
search_space_id,
)
existing_document = await check_document_by_unique_identifier(
session, legacy_hash
)
if existing_document:
existing_document.unique_identifier_hash = (
unique_identifier_hash
)
if (
existing_document.document_type
== DocumentType.COMPOSIO_GMAIL_CONNECTOR
):
existing_document.document_type = (
DocumentType.GOOGLE_GMAIL_CONNECTOR
)
logger.info(
f"Migrated legacy Composio Gmail document: {message_id}"
)
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
@ -531,10 +573,7 @@ async def index_google_gmail_messages(
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return (
total_processed,
warning_message,
) # Return warning_message (None on success)
return total_processed, documents_skipped, warning_message
except SQLAlchemyError as db_error:
await session.rollback()
@ -545,7 +584,7 @@ async def index_google_gmail_messages(
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, f"Database error: {db_error!s}"
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
@ -555,4 +594,4 @@ async def index_google_gmail_messages(
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Google gmail emails: {e!s}", exc_info=True)
return 0, f"Failed to index Google gmail emails: {e!s}"
return 0, 0, f"Failed to index Google gmail emails: {e!s}"

View file

@ -170,7 +170,34 @@ async def handle_existing_document_update(
logging.info(f"Document for file {filename} unchanged. Skipping.")
return True, existing_document
else:
# Content has changed - need to re-process
# Content has changed — guard against content_hash collision before
# expensive ETL processing. A collision means the exact same content
# already lives in a *different* document (e.g. a manual upload of the
# same file). Proceeding would trigger a unique-constraint violation
# on ix_documents_content_hash.
collision_doc = await check_duplicate_document(session, content_hash)
if collision_doc and collision_doc.id != existing_document.id:
logging.warning(
"Content-hash collision for %s: identical content exists in "
"document #%s (%s). Skipping re-processing.",
filename,
collision_doc.id,
collision_doc.document_type,
)
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
) or DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
# Pending/processing doc has no real content yet — remove it
# so the UI doesn't show a contentless entry.
await session.delete(existing_document)
await session.commit()
return True, None
# Document already has valid content — keep it as-is.
return True, existing_document
logging.info(f"Content changed for file {filename}. Updating document.")
return False, None
@ -411,6 +438,7 @@ async def add_received_file_document_using_unstructured(
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store a file document using Unstructured service.
@ -471,9 +499,13 @@ async def add_received_file_document_using_unstructured(
"etl_service": "UNSTRUCTURED",
"document_type": "File Document",
}
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
if enable_summary:
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
else:
summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
@ -493,14 +525,13 @@ async def add_received_file_document_using_unstructured(
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
existing_document.status = DocumentStatus.ready()
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
@ -523,7 +554,7 @@ async def add_received_file_document_using_unstructured(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
status=DocumentStatus.ready(),
)
session.add(document)
@ -533,6 +564,12 @@ async def add_received_file_document_using_unstructured(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (Unstructured). Skipping.",
file_name,
)
return None
raise db_error
except Exception as e:
await session.rollback()
@ -546,6 +583,7 @@ async def add_received_file_document_using_llamacloud(
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store document content parsed by LlamaCloud.
@ -605,16 +643,19 @@ async def add_received_file_document_using_llamacloud(
"etl_service": "LLAMACLOUD",
"document_type": "File Document",
}
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
if enable_summary:
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
else:
summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(file_in_markdown)
# Update or create document
if existing_document:
# Update existing document
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content_hash = content_hash
@ -627,14 +668,12 @@ async def add_received_file_document_using_llamacloud(
existing_document.source_markdown = file_in_markdown
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
existing_document.status = DocumentStatus.ready()
await session.commit()
await session.refresh(existing_document)
document = existing_document
else:
# Create new document
# Determine document type based on connector
doc_type = DocumentType.FILE
if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE:
doc_type = DocumentType.GOOGLE_DRIVE_FILE
@ -657,7 +696,7 @@ async def add_received_file_document_using_llamacloud(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
status=DocumentStatus.ready(),
)
session.add(document)
@ -667,6 +706,12 @@ async def add_received_file_document_using_llamacloud(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (LlamaCloud). Skipping.",
file_name,
)
return None
raise db_error
except Exception as e:
await session.rollback()
@ -682,6 +727,7 @@ async def add_received_file_document_using_docling(
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store document content parsed by Docling.
@ -734,33 +780,32 @@ async def add_received_file_document_using_docling(
f"No long context LLM configured for user {user_id} in search_space {search_space_id}"
)
# Generate summary using chunked processing for large documents
from app.services.docling_service import create_docling_service
if enable_summary:
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
docling_service = create_docling_service()
summary_content = await docling_service.process_large_document_summary(
content=file_in_markdown, llm=user_llm, document_title=file_name
)
summary_content = await docling_service.process_large_document_summary(
content=file_in_markdown, llm=user_llm, document_title=file_name
)
# Enhance summary with metadata
document_metadata = {
"file_name": file_name,
"etl_service": "DOCLING",
"document_type": "File Document",
}
metadata_parts = []
metadata_parts.append("# DOCUMENT METADATA")
document_metadata = {
"file_name": file_name,
"etl_service": "DOCLING",
"document_type": "File Document",
}
metadata_parts = ["# DOCUMENT METADATA"]
for key, value in document_metadata.items():
if value:
formatted_key = key.replace("_", " ").title()
metadata_parts.append(f"**{formatted_key}:** {value}")
for key, value in document_metadata.items():
if value: # Only include non-empty values
formatted_key = key.replace("_", " ").title()
metadata_parts.append(f"**{formatted_key}:** {value}")
metadata_section = "\n".join(metadata_parts)
enhanced_summary_content = (
f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}"
)
metadata_section = "\n".join(metadata_parts)
enhanced_summary_content = (
f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}"
)
else:
enhanced_summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(enhanced_summary_content)
@ -822,6 +867,12 @@ async def add_received_file_document_using_docling(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (Docling). Skipping.",
file_name,
)
return None
raise db_error
except Exception as e:
await session.rollback()
@ -1219,9 +1270,17 @@ async def process_file_in_background(
print("Error deleting temp file", e)
pass
# Pass the documents to the existing background task
enable_summary = (
connector.get("enable_summary", True) if connector else True
)
result = await add_received_file_document_using_unstructured(
session, filename, docs, search_space_id, user_id, connector
session,
filename,
docs,
search_space_id,
user_id,
connector,
enable_summary=enable_summary,
)
if connector:
@ -1362,7 +1421,9 @@ async def process_file_in_background(
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
enable_summary = (
connector.get("enable_summary", True) if connector else True
)
doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
@ -1370,6 +1431,7 @@ async def process_file_in_background(
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
enable_summary=enable_summary,
)
# Track if this document was successfully created
@ -1516,7 +1578,9 @@ async def process_file_in_background(
session, notification, stage="chunking"
)
# Process the document using our Docling background task
enable_summary = (
connector.get("enable_summary", True) if connector else True
)
doc_result = await add_received_file_document_using_docling(
session,
filename,
@ -1524,6 +1588,7 @@ async def process_file_in_background(
search_space_id=search_space_id,
user_id=user_id,
connector=connector,
enable_summary=enable_summary,
)
if doc_result:

View file

@ -158,6 +158,28 @@ async def _handle_existing_document_update(
logging.info(f"Document for markdown file {filename} unchanged. Skipping.")
return True, existing_document
else:
# Content has changed — guard against content_hash collision (same
# content already lives in a different document).
collision_doc = await check_duplicate_document(session, content_hash)
if collision_doc and collision_doc.id != existing_document.id:
logging.warning(
"Content-hash collision for markdown %s: identical content "
"exists in document #%s (%s). Skipping re-processing.",
filename,
collision_doc.id,
collision_doc.document_type,
)
if DocumentStatus.is_state(
existing_document.status, DocumentStatus.PENDING
) or DocumentStatus.is_state(
existing_document.status, DocumentStatus.PROCESSING
):
await session.delete(existing_document)
await session.commit()
return True, None
return True, existing_document
logging.info(
f"Content changed for markdown file {filename}. Updating document."
)
@ -312,6 +334,12 @@ async def add_received_markdown_file_document(
return document
except SQLAlchemyError as db_error:
await session.rollback()
if "ix_documents_content_hash" in str(db_error):
logging.warning(
"content_hash collision during commit for %s (markdown). Skipping.",
file_name,
)
return None
await task_logger.log_task_failure(
log_entry,
f"Database error processing markdown file: {file_name}",