From 5042fbfb852773edd58c5ead37854a94c3dc661c Mon Sep 17 00:00:00 2001
From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com>
Date: Thu, 5 Feb 2026 22:59:56 +0530
Subject: [PATCH] feat: enhance Gmail and Google Drive connectors with document
status management and duplicate content checks
---
.../connectors/composio_gmail_connector.py | 482 ++++++------
.../composio_google_drive_connector.py | 687 +++++++++++-------
.../components/DocumentsTableShell.tsx | 2 +-
.../(manage)/components/RowActions.tsx | 4 +-
4 files changed, 708 insertions(+), 467 deletions(-)
diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py
index 05395bfba..870053c7f 100644
--- a/surfsense_backend/app/connectors/composio_gmail_connector.py
+++ b/surfsense_backend/app/connectors/composio_gmail_connector.py
@@ -16,11 +16,15 @@ from sqlalchemy.orm import selectinload
from app.config import config
from app.connectors.composio_connector import ComposioConnector
-from app.db import Document, DocumentType
+from app.db import Document, DocumentStatus, DocumentType
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
-from app.tasks.connector_indexers.base import calculate_date_range
+from app.tasks.connector_indexers.base import (
+ calculate_date_range,
+ check_duplicate_document_by_hash,
+ safe_set_chunks,
+)
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
@@ -206,26 +210,24 @@ class ComposioGmailConnector(ComposioConnector):
# ============ Indexer Functions ============
-async def _process_gmail_message_batch(
+async def _analyze_gmail_messages_phase1(
session: AsyncSession,
messages: list[dict[str, Any]],
composio_connector: ComposioGmailConnector,
connector_id: int,
search_space_id: int,
user_id: str,
- total_documents_indexed: int = 0,
-) -> tuple[int, int]:
+) -> tuple[list[dict[str, Any]], int, int]:
"""
- Process a batch of Gmail messages and index them.
-
- Args:
- total_documents_indexed: Running total of documents indexed so far (for batch commits).
+ Phase 1: Analyze all messages, create pending documents.
+ Makes ALL documents visible in the UI immediately with pending status.
Returns:
- Tuple of (documents_indexed, documents_skipped)
+ Tuple of (messages_to_process, documents_skipped, duplicate_content_count)
"""
- documents_indexed = 0
+ messages_to_process = []
documents_skipped = 0
+ duplicate_content_count = 0
for message in messages:
try:
@@ -235,11 +237,7 @@ async def _process_gmail_message_batch(
documents_skipped += 1
continue
- # Composio's GMAIL_FETCH_EMAILS already returns full message content
- # No need for a separate detail API call
-
# Extract message info from Composio response
- # Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds
payload = message.get("payload", {})
headers = payload.get("headers", [])
@@ -262,7 +260,7 @@ async def _process_gmail_message_batch(
message
)
- # Check for empty content (defensive parsing per Composio best practices)
+ # Check for empty content
if not markdown_content.strip():
logger.warning(f"Skipping Gmail message with no content: {subject}")
documents_skipped += 1
@@ -280,99 +278,51 @@ async def _process_gmail_message_batch(
session, unique_identifier_hash
)
- # Get label IDs from Composio response
+ # Get label IDs and thread_id from Composio response
label_ids = message.get("labelIds", [])
- # Extract thread_id if available (for consistency with non-Composio implementation)
thread_id = message.get("threadId", "") or message.get("thread_id", "")
if existing_document:
if existing_document.content_hash == content_hash:
+ # Ensure status is ready (might have been stuck in processing/pending)
+ if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
+ existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
- # Update existing
- user_llm = await get_user_long_context_llm(
- session, user_id, search_space_id
- )
-
- if user_llm:
- document_metadata = {
- "message_id": message_id,
- "thread_id": thread_id,
- "subject": subject,
- "sender": sender,
- "document_type": "Gmail Message (Composio)",
- }
- (
- summary_content,
- summary_embedding,
- ) = await generate_document_summary(
- markdown_content, user_llm, document_metadata
- )
- else:
- summary_content = (
- f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
- )
- summary_embedding = config.embedding_model_instance.embed(
- summary_content
- )
-
- chunks = await create_document_chunks(markdown_content)
-
- existing_document.title = subject
- existing_document.content = summary_content
- existing_document.content_hash = content_hash
- existing_document.embedding = summary_embedding
- existing_document.document_metadata = {
- "message_id": message_id,
- "thread_id": thread_id,
- "subject": subject,
- "sender": sender,
- "date": date_str,
- "labels": label_ids,
- "connector_id": connector_id,
- "source": "composio",
- }
- existing_document.chunks = chunks
- existing_document.updated_at = get_current_timestamp()
-
- documents_indexed += 1
-
- # Batch commit every 10 documents
- current_total = total_documents_indexed + documents_indexed
- if current_total % 10 == 0:
- logger.info(
- f"Committing batch: {current_total} Gmail messages processed so far"
- )
- await session.commit()
+ # Queue existing document for update (will be set to processing in Phase 2)
+ messages_to_process.append({
+ 'document': existing_document,
+ 'is_new': False,
+ 'markdown_content': markdown_content,
+ 'content_hash': content_hash,
+ 'message_id': message_id,
+ 'thread_id': thread_id,
+ 'subject': subject,
+ 'sender': sender,
+ 'date_str': date_str,
+ 'label_ids': label_ids,
+ })
continue
- # Create new document
- user_llm = await get_user_long_context_llm(
- session, user_id, search_space_id
- )
-
- if user_llm:
- document_metadata = {
- "message_id": message_id,
- "thread_id": thread_id,
- "subject": subject,
- "sender": sender,
- "document_type": "Gmail Message (Composio)",
- }
- summary_content, summary_embedding = await generate_document_summary(
- markdown_content, user_llm, document_metadata
- )
- else:
- summary_content = (
- f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
- )
- summary_embedding = config.embedding_model_instance.embed(
- summary_content
+ # Document doesn't exist by unique_identifier_hash
+ # Check if a document with the same content_hash exists (from standard connector)
+ with session.no_autoflush:
+ duplicate_by_content = await check_duplicate_document_by_hash(
+ session, content_hash
)
- chunks = await create_document_chunks(markdown_content)
+ if duplicate_by_content:
+ logger.info(
+ f"Message {subject} already indexed by another connector "
+ f"(existing document ID: {duplicate_by_content.id}, "
+ f"type: {duplicate_by_content.document_type}). Skipping."
+ )
+ duplicate_content_count += 1
+ documents_skipped += 1
+ continue
+ # Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=subject,
@@ -388,39 +338,138 @@ async def _process_gmail_message_batch(
"toolkit_id": "gmail",
"source": "composio",
},
- content=summary_content,
- content_hash=content_hash,
+ content="Pending...", # Placeholder until processed
+ content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
- embedding=summary_embedding,
- chunks=chunks,
+ embedding=None,
+ chunks=[], # Empty at creation - safe for async
+ status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
+
+ messages_to_process.append({
+ 'document': document,
+ 'is_new': True,
+ 'markdown_content': markdown_content,
+ 'content_hash': content_hash,
+ 'message_id': message_id,
+ 'thread_id': thread_id,
+ 'subject': subject,
+ 'sender': sender,
+ 'date_str': date_str,
+ 'label_ids': label_ids,
+ })
+
+ except Exception as e:
+ logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True)
+ documents_skipped += 1
+ continue
+
+ return messages_to_process, documents_skipped, duplicate_content_count
+
+
+async def _process_gmail_messages_phase2(
+ session: AsyncSession,
+ messages_to_process: list[dict[str, Any]],
+ connector_id: int,
+ search_space_id: int,
+ user_id: str,
+ on_heartbeat_callback: HeartbeatCallbackType | None = None,
+) -> tuple[int, int]:
+ """
+ Phase 2: Process each document one by one.
+ Each document transitions: pending → processing → ready/failed
+
+ Returns:
+ Tuple of (documents_indexed, documents_failed)
+ """
+ documents_indexed = 0
+ documents_failed = 0
+ last_heartbeat_time = time.time()
+
+ for item in messages_to_process:
+ # Send heartbeat periodically
+ if on_heartbeat_callback:
+ current_time = time.time()
+ if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
+ await on_heartbeat_callback(documents_indexed)
+ last_heartbeat_time = current_time
+
+ document = item['document']
+ try:
+ # Set to PROCESSING and commit - shows "processing" in UI for THIS document only
+ document.status = DocumentStatus.processing()
+ await session.commit()
+
+ # Heavy processing (LLM, embeddings, chunks)
+ user_llm = await get_user_long_context_llm(
+ session, user_id, search_space_id
+ )
+
+ if user_llm:
+ document_metadata_for_summary = {
+ "message_id": item['message_id'],
+ "thread_id": item['thread_id'],
+ "subject": item['subject'],
+ "sender": item['sender'],
+ "document_type": "Gmail Message (Composio)",
+ }
+ summary_content, summary_embedding = await generate_document_summary(
+ item['markdown_content'], user_llm, document_metadata_for_summary
+ )
+ else:
+ summary_content = (
+ f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}"
+ )
+ summary_embedding = config.embedding_model_instance.embed(
+ summary_content
+ )
+
+ chunks = await create_document_chunks(item['markdown_content'])
+
+ # Update document to READY with actual content
+ document.title = item['subject']
+ document.content = summary_content
+ document.content_hash = item['content_hash']
+ document.embedding = summary_embedding
+ document.document_metadata = {
+ "message_id": item['message_id'],
+ "thread_id": item['thread_id'],
+ "subject": item['subject'],
+ "sender": item['sender'],
+ "date": item['date_str'],
+ "labels": item['label_ids'],
+ "connector_id": connector_id,
+ "source": "composio",
+ }
+ safe_set_chunks(document, chunks)
+ document.updated_at = get_current_timestamp()
+ document.status = DocumentStatus.ready()
+
documents_indexed += 1
- # Batch commit every 10 documents
- current_total = total_documents_indexed + documents_indexed
- if current_total % 10 == 0:
+ # Batch commit every 10 documents (for ready status updates)
+ if documents_indexed % 10 == 0:
logger.info(
- f"Committing batch: {current_total} Gmail messages processed so far"
+ f"Committing batch: {documents_indexed} Gmail messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
- documents_skipped += 1
- # Rollback on error to avoid partial state (per Composio best practices)
+ # Mark document as failed with reason (visible in UI)
try:
- await session.rollback()
- except Exception as rollback_error:
- logger.error(
- f"Error during rollback: {rollback_error!s}", exc_info=True
- )
+ document.status = DocumentStatus.failed(str(e))
+ document.updated_at = get_current_timestamp()
+ except Exception as status_error:
+ logger.error(f"Failed to update document status to failed: {status_error}")
+ documents_failed += 1
continue
- return documents_indexed, documents_skipped
+ return documents_indexed, documents_failed
async def index_composio_gmail(
@@ -437,7 +486,7 @@ async def index_composio_gmail(
max_items: int = 1000,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str]:
- """Index Gmail messages via Composio with pagination and incremental processing."""
+ """Index Gmail messages via Composio with real-time document status updates."""
try:
composio_connector = ComposioGmailConnector(session, connector_id)
@@ -448,14 +497,10 @@ async def index_composio_gmail(
end_date = None
# Use provided dates directly if both are provided, otherwise calculate from last_indexed_at
- # This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior)
if start_date is not None and end_date is not None:
- # User provided both dates - use them directly
start_date_str = start_date
end_date_str = end_date
else:
- # Calculate date range with defaults (uses last_indexed_at or 365 days back)
- # This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
@@ -473,48 +518,32 @@ async def index_composio_gmail(
f"(start_date={start_date_str}, end_date={end_date_str})"
)
- # Use smaller batch size to avoid 413 payload too large errors
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Gmail messages via Composio for connector {connector_id}",
+ {"stage": "fetching_messages"},
+ )
+
+ # =======================================================================
+ # FETCH ALL MESSAGES FIRST
+ # =======================================================================
batch_size = 50
page_token = None
- total_documents_indexed = 0
- total_documents_skipped = 0
- total_messages_fetched = 0
- result_size_estimate = None # Will be set from first API response
+ all_messages = []
+ result_size_estimate = None
last_heartbeat_time = time.time()
- while total_messages_fetched < max_items:
- # Send heartbeat periodically to indicate task is still alive
+ while len(all_messages) < max_items:
+ # Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
- await on_heartbeat_callback(total_documents_indexed)
+ await on_heartbeat_callback(len(all_messages))
last_heartbeat_time = current_time
- # Calculate how many messages to fetch in this batch
- remaining = max_items - total_messages_fetched
+ remaining = max_items - len(all_messages)
current_batch_size = min(batch_size, remaining)
- # Use result_size_estimate if available, otherwise fall back to max_items
- estimated_total = (
- result_size_estimate if result_size_estimate is not None else max_items
- )
- # Cap estimated_total at max_items to avoid showing misleading progress
- estimated_total = min(estimated_total, max_items)
-
- await task_logger.log_task_progress(
- log_entry,
- f"Fetching Gmail messages batch via Composio for connector {connector_id} "
- f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)",
- {
- "stage": "fetching_messages",
- "batch_size": current_batch_size,
- "total_fetched": total_messages_fetched,
- "total_indexed": total_documents_indexed,
- "estimated_total": estimated_total,
- },
- )
-
- # Fetch batch of messages
(
messages,
next_token,
@@ -533,97 +562,136 @@ async def index_composio_gmail(
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
- # No more messages available
break
- # Update result_size_estimate from first response (Gmail provides this estimate)
if result_size_estimate is None and result_size_estimate_batch is not None:
result_size_estimate = result_size_estimate_batch
logger.info(
f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'"
)
- total_messages_fetched += len(messages)
- # Recalculate estimated_total after potentially updating result_size_estimate
- estimated_total = (
- result_size_estimate if result_size_estimate is not None else max_items
- )
- estimated_total = min(estimated_total, max_items)
+ all_messages.extend(messages)
+ logger.info(f"Fetched {len(messages)} messages (total: {len(all_messages)})")
- logger.info(
- f"Fetched batch of {len(messages)} Gmail messages "
- f"(total: {total_messages_fetched}/{estimated_total})"
- )
-
- # Process batch incrementally
- batch_indexed, batch_skipped = await _process_gmail_message_batch(
- session=session,
- messages=messages,
- composio_connector=composio_connector,
- connector_id=connector_id,
- search_space_id=search_space_id,
- user_id=user_id,
- total_documents_indexed=total_documents_indexed,
- )
-
- total_documents_indexed += batch_indexed
- total_documents_skipped += batch_skipped
-
- logger.info(
- f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped "
- f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)"
- )
-
- # Batch commits happen in _process_gmail_message_batch every 10 documents
- # This ensures progress is saved incrementally, preventing data loss on crashes
-
- # Check if we should continue
- if not next_token:
- # No more pages available
+ if not next_token or len(messages) < current_batch_size:
break
- if len(messages) < current_batch_size:
- # Last page had fewer items than requested, we're done
- break
-
- # Continue with next page
page_token = next_token
- if total_messages_fetched == 0:
+ if not all_messages:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
- # CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
- return 0, None # Return None (not error) when no items found
+ return (
+ 0,
+ None,
+ ) # Return None (not error) when no items found - this is success with 0 items
- # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
- # This ensures the UI shows "Last indexed" instead of "Never indexed"
+ logger.info(f"Found {len(all_messages)} Gmail messages to index via Composio")
+
+ # =======================================================================
+ # PHASE 1: Analyze all messages, create pending documents
+ # This makes ALL documents visible in the UI immediately with pending status
+ # =======================================================================
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Phase 1: Creating pending documents for {len(all_messages)} messages",
+ {"stage": "phase1_pending"},
+ )
+
+ (
+ messages_to_process,
+ documents_skipped,
+ duplicate_content_count,
+ ) = await _analyze_gmail_messages_phase1(
+ session=session,
+ messages=all_messages,
+ composio_connector=composio_connector,
+ connector_id=connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ )
+
+ # Commit all pending documents - they all appear in UI now
+ new_documents_count = len([m for m in messages_to_process if m['is_new']])
+ if new_documents_count > 0:
+ logger.info(f"Phase 1: Committing {new_documents_count} pending documents")
+ await session.commit()
+
+ # =======================================================================
+ # PHASE 2: Process each document one by one
+ # Each document transitions: pending → processing → ready/failed
+ # =======================================================================
+ logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Phase 2: Processing {len(messages_to_process)} documents",
+ {"stage": "phase2_processing"},
+ )
+
+ documents_indexed, documents_failed = await _process_gmail_messages_phase2(
+ session=session,
+ messages_to_process=messages_to_process,
+ connector_id=connector_id,
+ search_space_id=search_space_id,
+ user_id=user_id,
+ on_heartbeat_callback=on_heartbeat_callback,
+ )
+
+ # CRITICAL: Always update timestamp so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
- # Final commit to ensure all documents are persisted (safety net)
- # This matches the pattern used in non-Composio Gmail indexer
+ # Final commit to ensure all documents are persisted
logger.info(
- f"Final commit: Total {total_documents_indexed} Gmail messages processed"
- )
- await session.commit()
- logger.info(
- "Successfully committed all Composio Gmail document changes to database"
+ f"Final commit: Total {documents_indexed} Gmail messages processed"
)
+ try:
+ await session.commit()
+ logger.info(
+ "Successfully committed all Composio Gmail document changes to database"
+ )
+ except Exception as e:
+ # Handle any remaining integrity errors gracefully
+ if (
+ "duplicate key value violates unique constraint" in str(e).lower()
+ or "uniqueviolationerror" in str(e).lower()
+ ):
+ logger.warning(
+ f"Duplicate content_hash detected during final commit. "
+ f"Rolling back and continuing. Error: {e!s}"
+ )
+ await session.rollback()
+ else:
+ raise
+
+ # Build warning message if there were issues
+ warning_parts = []
+ if duplicate_content_count > 0:
+ warning_parts.append(f"{duplicate_content_count} duplicate")
+ if documents_failed > 0:
+ warning_parts.append(f"{documents_failed} failed")
+ warning_message = ", ".join(warning_parts) if warning_parts else None
await task_logger.log_task_success(
log_entry,
f"Successfully completed Gmail indexing via Composio for connector {connector_id}",
{
- "documents_indexed": total_documents_indexed,
- "documents_skipped": total_documents_skipped,
- "messages_fetched": total_messages_fetched,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "documents_failed": documents_failed,
+ "duplicate_content_count": duplicate_content_count,
},
)
- return total_documents_indexed, None
+ logger.info(
+ f"Composio Gmail indexing completed: {documents_indexed} ready, "
+ f"{documents_skipped} skipped, {documents_failed} failed "
+ f"({duplicate_content_count} duplicate content)"
+ )
+ return documents_indexed, warning_message
except Exception as e:
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)
diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py
index d7299fbfe..26cfd3020 100644
--- a/surfsense_backend/app/connectors/composio_google_drive_connector.py
+++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py
@@ -21,10 +21,14 @@ from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.connectors.composio_connector import ComposioConnector
-from app.db import Document, DocumentType, Log
+from app.db import Document, DocumentStatus, DocumentType, Log
from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
+from app.tasks.connector_indexers.base import (
+ check_duplicate_document_by_hash,
+ safe_set_chunks,
+)
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
@@ -537,22 +541,6 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first()
-async def check_document_by_content_hash(
- session: AsyncSession, content_hash: str
-) -> Document | None:
- """Check if a document with the given content hash already exists.
-
- This is used to prevent duplicate content from being indexed, regardless
- of which connector originally indexed it.
- """
- from sqlalchemy.future import select
-
- existing_doc_result = await session.execute(
- select(Document).where(Document.content_hash == content_hash)
- )
- return existing_doc_result.scalars().first()
-
-
async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
@@ -843,14 +831,16 @@ async def _index_composio_drive_delta_sync(
log_entry,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
- """Index Google Drive files using delta sync (only changed files).
+ """Index Google Drive files using delta sync with real-time document status updates.
Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync.
Handles: new files, modified files, and deleted files.
"""
documents_indexed = 0
documents_skipped = 0
+ documents_failed = 0
processing_errors = []
+ duplicate_content_count = 0
last_heartbeat_time = time.time()
# Fetch all changes with pagination
@@ -881,14 +871,13 @@ async def _index_composio_drive_delta_sync(
logger.info(f"Processing {len(all_changes)} changes from delta sync")
- for change in all_changes[:max_items]:
- # Send heartbeat periodically to indicate task is still alive
- if on_heartbeat_callback:
- current_time = time.time()
- if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
- await on_heartbeat_callback(documents_indexed)
- last_heartbeat_time = current_time
+ # =======================================================================
+ # PHASE 1: Analyze all changes, handle deletions, create pending documents
+ # =======================================================================
+ files_to_process = []
+ new_documents_created = False
+ for change in all_changes[:max_items]:
try:
# Handle removed files
is_removed = change.get("removed", False)
@@ -899,9 +888,8 @@ async def _index_composio_drive_delta_sync(
documents_skipped += 1
continue
- # Check if file was trashed or removed
+ # Check if file was trashed or removed - handle deletions immediately
if is_removed or file_info.get("trashed", False):
- # Remove document from database
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
@@ -923,37 +911,219 @@ async def _index_composio_drive_delta_sync(
if mime_type == "application/vnd.google-apps.folder":
continue
- # Process the file
- indexed, skipped, errors = await _process_single_drive_file(
- session=session,
- composio_connector=composio_connector,
- file_id=file_id,
- file_name=file_name,
- mime_type=mime_type,
- connector_id=connector_id,
- search_space_id=search_space_id,
- user_id=user_id,
- task_logger=task_logger,
- log_entry=log_entry,
+ # Check for existing document by file ID (from any connector)
+ existing_by_file_id = await check_document_by_google_drive_file_id(
+ session, file_id, search_space_id
)
- documents_indexed += indexed
- documents_skipped += skipped
- processing_errors.extend(errors)
+ # Generate unique identifier hash
+ document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
+ unique_identifier_hash = generate_unique_identifier_hash(
+ document_type, f"drive_{file_id}", search_space_id
+ )
+
+ # Check if document exists by unique identifier
+ existing_document = await check_document_by_unique_identifier(
+ session, unique_identifier_hash
+ )
+
+ if existing_by_file_id and not existing_document:
+ # File already indexed by different connector - skip
+ logger.info(
+ f"Skipping file {file_name} (file_id={file_id}): already indexed "
+ f"by {existing_by_file_id.document_type.value}"
+ )
+ documents_skipped += 1
+ continue
+
+ if existing_document:
+ # Queue existing document for update
+ files_to_process.append({
+ 'document': existing_document,
+ 'is_new': False,
+ 'file_id': file_id,
+ 'file_name': file_name,
+ 'mime_type': mime_type,
+ })
+ continue
+
+ # Create new document with PENDING status
+ document = Document(
+ search_space_id=search_space_id,
+ title=file_name,
+ document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
+ document_metadata={
+ "file_id": file_id,
+ "file_name": file_name,
+ "FILE_NAME": file_name,
+ "mime_type": mime_type,
+ "connector_id": connector_id,
+ "toolkit_id": "googledrive",
+ "source": "composio",
+ },
+ content="Pending...",
+ content_hash=unique_identifier_hash,
+ unique_identifier_hash=unique_identifier_hash,
+ embedding=None,
+ chunks=[],
+ status=DocumentStatus.pending(),
+ updated_at=get_current_timestamp(),
+ created_by_id=user_id,
+ connector_id=connector_id,
+ )
+ session.add(document)
+ new_documents_created = True
+
+ files_to_process.append({
+ 'document': document,
+ 'is_new': True,
+ 'file_id': file_id,
+ 'file_name': file_name,
+ 'mime_type': mime_type,
+ })
+
+ except Exception as e:
+ logger.error(f"Error in Phase 1 for change: {e!s}", exc_info=True)
+ documents_skipped += 1
+ continue
+
+ # Commit all pending documents - they all appear in UI now
+ if new_documents_created:
+ logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents")
+ await session.commit()
+
+ # =======================================================================
+ # PHASE 2: Process each document one by one
+ # =======================================================================
+ logger.info(f"Phase 2: Processing {len(files_to_process)} documents")
+
+ for item in files_to_process:
+ # Send heartbeat periodically
+ if on_heartbeat_callback:
+ current_time = time.time()
+ if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
+ await on_heartbeat_callback(documents_indexed)
+ last_heartbeat_time = current_time
+
+ document = item['document']
+ try:
+ # Set to PROCESSING and commit
+ document.status = DocumentStatus.processing()
+ await session.commit()
+
+ # Get file content
+ content, content_error = await composio_connector.get_drive_file_content(
+ item['file_id'], original_mime_type=item['mime_type']
+ )
+
+ if content_error or not content:
+ logger.warning(f"Could not get content for file {item['file_name']}: {content_error}")
+ markdown_content = f"# {item['file_name']}\n\n"
+ markdown_content += f"**File ID:** {item['file_id']}\n"
+ markdown_content += f"**Type:** {item['mime_type']}\n"
+ elif isinstance(content, dict):
+ error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}"
+ logger.error(error_msg)
+ processing_errors.append(error_msg)
+ markdown_content = f"# {item['file_name']}\n\n"
+ markdown_content += f"**File ID:** {item['file_id']}\n"
+ markdown_content += f"**Type:** {item['mime_type']}\n"
+ else:
+ markdown_content = await _process_file_content(
+ content=content,
+ file_name=item['file_name'],
+ file_id=item['file_id'],
+ mime_type=item['mime_type'],
+ search_space_id=search_space_id,
+ user_id=user_id,
+ session=session,
+ task_logger=task_logger,
+ log_entry=log_entry,
+ processing_errors=processing_errors,
+ )
+
+ content_hash = generate_content_hash(markdown_content, search_space_id)
+
+ # For existing documents, check if content changed
+ if not item['is_new'] and document.content_hash == content_hash:
+ if not DocumentStatus.is_state(document.status, DocumentStatus.READY):
+ document.status = DocumentStatus.ready()
+ documents_skipped += 1
+ continue
+
+ # Check for duplicate content hash (for new documents)
+ if item['is_new']:
+ with session.no_autoflush:
+ duplicate_by_content = await check_duplicate_document_by_hash(
+ session, content_hash
+ )
+ if duplicate_by_content:
+ logger.info(
+ f"File {item['file_name']} already indexed by another connector. Skipping."
+ )
+ await session.delete(document)
+ duplicate_content_count += 1
+ documents_skipped += 1
+ continue
+
+ # Heavy processing (LLM, embeddings, chunks)
+ user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
+
+ if user_llm:
+ document_metadata_for_summary = {
+ "file_id": item['file_id'],
+ "file_name": item['file_name'],
+ "mime_type": item['mime_type'],
+ "document_type": "Google Drive File (Composio)",
+ }
+ summary_content, summary_embedding = await generate_document_summary(
+ markdown_content, user_llm, document_metadata_for_summary
+ )
+ else:
+ summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}"
+ summary_embedding = config.embedding_model_instance.embed(summary_content)
+
+ chunks = await create_document_chunks(markdown_content)
+
+ # Update document to READY
+ document.title = item['file_name']
+ document.content = summary_content
+ document.content_hash = content_hash
+ document.embedding = summary_embedding
+ document.document_metadata = {
+ "file_id": item['file_id'],
+ "file_name": item['file_name'],
+ "FILE_NAME": item['file_name'],
+ "mime_type": item['mime_type'],
+ "connector_id": connector_id,
+ "source": "composio",
+ }
+ safe_set_chunks(document, chunks)
+ document.updated_at = get_current_timestamp()
+ document.status = DocumentStatus.ready()
+
+ documents_indexed += 1
# Batch commit every 10 documents
- if documents_indexed > 0 and documents_indexed % 10 == 0:
+ if documents_indexed % 10 == 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} changes processed")
except Exception as e:
- error_msg = f"Error processing change for file {file_id}: {e!s}"
+ error_msg = f"Error processing change for file {item['file_id']}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
- documents_skipped += 1
+ try:
+ document.status = DocumentStatus.failed(str(e))
+ document.updated_at = get_current_timestamp()
+ except Exception as status_error:
+ logger.error(f"Failed to update document status to failed: {status_error}")
+ documents_failed += 1
+ continue
logger.info(
- f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped"
+ f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, "
+ f"{documents_failed} failed ({duplicate_content_count} duplicate content)"
)
return documents_indexed, documents_skipped, processing_errors
@@ -973,10 +1143,12 @@ async def _index_composio_drive_full_scan(
log_entry,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
- """Index Google Drive files using full scan (first sync or when no delta token)."""
+ """Index Google Drive files using full scan with real-time document status updates."""
documents_indexed = 0
documents_skipped = 0
+ documents_failed = 0
processing_errors = []
+ duplicate_content_count = 0
last_heartbeat_time = time.time()
all_files = []
@@ -1108,14 +1280,14 @@ async def _index_composio_drive_full_scan(
f"Found {len(all_files)} Google Drive files to index via Composio (full scan)"
)
- for file_info in all_files:
- # Send heartbeat periodically to indicate task is still alive
- if on_heartbeat_callback:
- current_time = time.time()
- if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
- await on_heartbeat_callback(documents_indexed)
- last_heartbeat_time = current_time
+ # =======================================================================
+ # PHASE 1: Analyze all files, create pending documents
+ # This makes ALL documents visible in the UI immediately with pending status
+ # =======================================================================
+ files_to_process = [] # List of dicts with document and file data
+ new_documents_created = False
+ for file_info in all_files:
try:
# Handle both standard Google API and potential Composio variations
file_id = file_info.get("id", "") or file_info.get("fileId", "")
@@ -1132,227 +1304,228 @@ async def _index_composio_drive_full_scan(
if mime_type == "application/vnd.google-apps.folder":
continue
- # Process the file
- indexed, skipped, errors = await _process_single_drive_file(
- session=session,
- composio_connector=composio_connector,
- file_id=file_id,
- file_name=file_name,
- mime_type=mime_type,
- connector_id=connector_id,
- search_space_id=search_space_id,
- user_id=user_id,
- task_logger=task_logger,
- log_entry=log_entry,
+ # ========== EARLY DUPLICATE CHECK BY FILE ID ==========
+ existing_by_file_id = await check_document_by_google_drive_file_id(
+ session, file_id, search_space_id
+ )
+ if existing_by_file_id:
+ logger.info(
+ f"Skipping file {file_name} (file_id={file_id}): already indexed "
+ f"by {existing_by_file_id.document_type.value}"
+ )
+ documents_skipped += 1
+ continue
+
+ # Generate unique identifier hash
+ document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
+ unique_identifier_hash = generate_unique_identifier_hash(
+ document_type, f"drive_{file_id}", search_space_id
)
- documents_indexed += indexed
- documents_skipped += skipped
- processing_errors.extend(errors)
+ # Check if document exists by unique identifier
+ existing_document = await check_document_by_unique_identifier(
+ session, unique_identifier_hash
+ )
+
+ if existing_document:
+ # Queue existing document for update (will be set to processing in Phase 2)
+ files_to_process.append({
+ 'document': existing_document,
+ 'is_new': False,
+ 'file_id': file_id,
+ 'file_name': file_name,
+ 'mime_type': mime_type,
+ })
+ continue
+
+ # Create new document with PENDING status (visible in UI immediately)
+ document = Document(
+ search_space_id=search_space_id,
+ title=file_name,
+ document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
+ document_metadata={
+ "file_id": file_id,
+ "file_name": file_name,
+ "FILE_NAME": file_name,
+ "mime_type": mime_type,
+ "connector_id": connector_id,
+ "toolkit_id": "googledrive",
+ "source": "composio",
+ },
+ content="Pending...", # Placeholder until processed
+ content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
+ unique_identifier_hash=unique_identifier_hash,
+ embedding=None,
+ chunks=[], # Empty at creation - safe for async
+ status=DocumentStatus.pending(), # Pending until processing starts
+ updated_at=get_current_timestamp(),
+ created_by_id=user_id,
+ connector_id=connector_id,
+ )
+ session.add(document)
+ new_documents_created = True
+
+ files_to_process.append({
+ 'document': document,
+ 'is_new': True,
+ 'file_id': file_id,
+ 'file_name': file_name,
+ 'mime_type': mime_type,
+ })
+
+ except Exception as e:
+ logger.error(f"Error in Phase 1 for file: {e!s}", exc_info=True)
+ documents_skipped += 1
+ continue
+
+ # Commit all pending documents - they all appear in UI now
+ if new_documents_created:
+ logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents")
+ await session.commit()
+
+ # =======================================================================
+ # PHASE 2: Process each document one by one
+ # Each document transitions: pending → processing → ready/failed
+ # =======================================================================
+ logger.info(f"Phase 2: Processing {len(files_to_process)} documents")
+
+ for item in files_to_process:
+ # Send heartbeat periodically
+ if on_heartbeat_callback:
+ current_time = time.time()
+ if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
+ await on_heartbeat_callback(documents_indexed)
+ last_heartbeat_time = current_time
+
+ document = item['document']
+ try:
+ # Set to PROCESSING and commit - shows "processing" in UI for THIS document only
+ document.status = DocumentStatus.processing()
+ await session.commit()
+
+ # Get file content (pass mime_type for Google Workspace export handling)
+ content, content_error = await composio_connector.get_drive_file_content(
+ item['file_id'], original_mime_type=item['mime_type']
+ )
+
+ if content_error or not content:
+ logger.warning(f"Could not get content for file {item['file_name']}: {content_error}")
+ markdown_content = f"# {item['file_name']}\n\n"
+ markdown_content += f"**File ID:** {item['file_id']}\n"
+ markdown_content += f"**Type:** {item['mime_type']}\n"
+ elif isinstance(content, dict):
+ error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}"
+ logger.error(error_msg)
+ processing_errors.append(error_msg)
+ markdown_content = f"# {item['file_name']}\n\n"
+ markdown_content += f"**File ID:** {item['file_id']}\n"
+ markdown_content += f"**Type:** {item['mime_type']}\n"
+ else:
+ # Process content based on file type
+ markdown_content = await _process_file_content(
+ content=content,
+ file_name=item['file_name'],
+ file_id=item['file_id'],
+ mime_type=item['mime_type'],
+ search_space_id=search_space_id,
+ user_id=user_id,
+ session=session,
+ task_logger=task_logger,
+ log_entry=log_entry,
+ processing_errors=processing_errors,
+ )
+
+ content_hash = generate_content_hash(markdown_content, search_space_id)
+
+ # For existing documents, check if content changed
+ if not item['is_new'] and document.content_hash == content_hash:
+ # Ensure status is ready
+ if not DocumentStatus.is_state(document.status, DocumentStatus.READY):
+ document.status = DocumentStatus.ready()
+ documents_skipped += 1
+ continue
+
+ # Check for duplicate content hash (for new documents)
+ if item['is_new']:
+ with session.no_autoflush:
+ duplicate_by_content = await check_duplicate_document_by_hash(
+ session, content_hash
+ )
+ if duplicate_by_content:
+ logger.info(
+ f"File {item['file_name']} already indexed by another connector. Skipping."
+ )
+ # Remove the pending document we created
+ await session.delete(document)
+ duplicate_content_count += 1
+ documents_skipped += 1
+ continue
+
+ # Heavy processing (LLM, embeddings, chunks)
+ user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
+
+ if user_llm:
+ document_metadata_for_summary = {
+ "file_id": item['file_id'],
+ "file_name": item['file_name'],
+ "mime_type": item['mime_type'],
+ "document_type": "Google Drive File (Composio)",
+ }
+ summary_content, summary_embedding = await generate_document_summary(
+ markdown_content, user_llm, document_metadata_for_summary
+ )
+ else:
+ summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}"
+ summary_embedding = config.embedding_model_instance.embed(summary_content)
+
+ chunks = await create_document_chunks(markdown_content)
+
+ # Update document to READY with actual content
+ document.title = item['file_name']
+ document.content = summary_content
+ document.content_hash = content_hash
+ document.embedding = summary_embedding
+ document.document_metadata = {
+ "file_id": item['file_id'],
+ "file_name": item['file_name'],
+ "FILE_NAME": item['file_name'],
+ "mime_type": item['mime_type'],
+ "connector_id": connector_id,
+ "source": "composio",
+ }
+ safe_set_chunks(document, chunks)
+ document.updated_at = get_current_timestamp()
+ document.status = DocumentStatus.ready()
+
+ documents_indexed += 1
# Batch commit every 10 documents
- if documents_indexed > 0 and documents_indexed % 10 == 0:
+ if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit()
except Exception as e:
- error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}"
+ error_msg = f"Error processing Drive file {item['file_name']}: {e!s}"
logger.error(error_msg, exc_info=True)
processing_errors.append(error_msg)
- documents_skipped += 1
+ # Mark document as failed with reason (visible in UI)
+ try:
+ document.status = DocumentStatus.failed(str(e))
+ document.updated_at = get_current_timestamp()
+ except Exception as status_error:
+ logger.error(f"Failed to update document status to failed: {status_error}")
+ documents_failed += 1
+ continue
logger.info(
- f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
+ f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, "
+ f"{documents_failed} failed ({duplicate_content_count} duplicate content)"
)
return documents_indexed, documents_skipped, processing_errors
-async def _process_single_drive_file(
- session: AsyncSession,
- composio_connector: ComposioGoogleDriveConnector,
- file_id: str,
- file_name: str,
- mime_type: str,
- connector_id: int,
- search_space_id: int,
- user_id: str,
- task_logger: TaskLoggingService,
- log_entry,
-) -> tuple[int, int, list[str]]:
- """Process a single Google Drive file for indexing.
-
- Returns:
- Tuple of (documents_indexed, documents_skipped, processing_errors)
- """
- processing_errors = []
-
- # ========== EARLY DUPLICATE CHECK BY FILE ID ==========
- # Check if this Google Drive file was already indexed by ANY connector
- # This happens BEFORE download/ETL to save expensive API calls
- existing_by_file_id = await check_document_by_google_drive_file_id(
- session, file_id, search_space_id
- )
- if existing_by_file_id:
- logger.info(
- f"Skipping file {file_name} (file_id={file_id}): already indexed "
- f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' "
- f"(saved download & ETL cost)"
- )
- return 0, 1, processing_errors # Skip - NO download, NO ETL!
- # ======================================================
-
- # Generate unique identifier hash
- document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
- unique_identifier_hash = generate_unique_identifier_hash(
- document_type, f"drive_{file_id}", search_space_id
- )
-
- # Check if document exists by unique identifier (same connector, same file)
- existing_document = await check_document_by_unique_identifier(
- session, unique_identifier_hash
- )
-
- # Get file content (pass mime_type for Google Workspace export handling)
- content, content_error = await composio_connector.get_drive_file_content(
- file_id, original_mime_type=mime_type
- )
-
- if content_error or not content:
- logger.warning(f"Could not get content for file {file_name}: {content_error}")
- # Use metadata as content fallback
- markdown_content = f"# {file_name}\n\n"
- markdown_content += f"**File ID:** {file_id}\n"
- markdown_content += f"**Type:** {mime_type}\n"
- elif isinstance(content, dict):
- # Safety check: if content is still a dict, log error and use fallback
- error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}"
- logger.error(error_msg)
- processing_errors.append(error_msg)
- markdown_content = f"# {file_name}\n\n"
- markdown_content += f"**File ID:** {file_id}\n"
- markdown_content += f"**Type:** {mime_type}\n"
- else:
- # Process content based on file type
- markdown_content = await _process_file_content(
- content=content,
- file_name=file_name,
- file_id=file_id,
- mime_type=mime_type,
- search_space_id=search_space_id,
- user_id=user_id,
- session=session,
- task_logger=task_logger,
- log_entry=log_entry,
- processing_errors=processing_errors,
- )
-
- content_hash = generate_content_hash(markdown_content, search_space_id)
-
- if existing_document:
- if existing_document.content_hash == content_hash:
- return 0, 1, processing_errors # Skipped - unchanged
-
- # Update existing document
- user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
-
- if user_llm:
- document_metadata = {
- "file_id": file_id,
- "file_name": file_name,
- "mime_type": mime_type,
- "document_type": "Google Drive File (Composio)",
- }
- (
- summary_content,
- summary_embedding,
- ) = await generate_document_summary(
- markdown_content, user_llm, document_metadata
- )
- else:
- summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
- summary_embedding = config.embedding_model_instance.embed(summary_content)
-
- chunks = await create_document_chunks(markdown_content)
-
- existing_document.title = file_name
- existing_document.content = summary_content
- existing_document.content_hash = content_hash
- existing_document.embedding = summary_embedding
- existing_document.document_metadata = {
- "file_id": file_id,
- "file_name": file_name,
- "FILE_NAME": file_name, # For compatibility
- "mime_type": mime_type,
- "connector_id": connector_id,
- "source": "composio",
- }
- existing_document.chunks = chunks
- existing_document.updated_at = get_current_timestamp()
-
- return 1, 0, processing_errors # Indexed - updated
-
- # Check if content_hash already exists (from any connector)
- # This prevents duplicate content and avoids IntegrityError on unique constraint
- existing_by_content_hash = await check_document_by_content_hash(
- session, content_hash
- )
- if existing_by_content_hash:
- logger.info(
- f"Skipping file {file_name} (file_id={file_id}): identical content "
- f"already indexed as '{existing_by_content_hash.title}'"
- )
- return 0, 1, processing_errors # Skipped - duplicate content
-
- # Create new document
- user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
-
- if user_llm:
- document_metadata = {
- "file_id": file_id,
- "file_name": file_name,
- "mime_type": mime_type,
- "document_type": "Google Drive File (Composio)",
- }
- (
- summary_content,
- summary_embedding,
- ) = await generate_document_summary(
- markdown_content, user_llm, document_metadata
- )
- else:
- summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}"
- summary_embedding = config.embedding_model_instance.embed(summary_content)
-
- chunks = await create_document_chunks(markdown_content)
-
- document = Document(
- search_space_id=search_space_id,
- title=file_name,
- document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
- document_metadata={
- "file_id": file_id,
- "file_name": file_name,
- "FILE_NAME": file_name, # For compatibility
- "mime_type": mime_type,
- "toolkit_id": "googledrive",
- "source": "composio",
- },
- content=summary_content,
- content_hash=content_hash,
- unique_identifier_hash=unique_identifier_hash,
- embedding=summary_embedding,
- chunks=chunks,
- updated_at=get_current_timestamp(),
- created_by_id=user_id,
- connector_id=connector_id,
- )
- session.add(document)
-
- return 1, 0, processing_errors # Indexed - new
-
-
async def _fetch_folder_files_recursively(
composio_connector: ComposioGoogleDriveConnector,
folder_id: str,
diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx
index 0bd8189b8..d579fe677 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx
@@ -56,7 +56,7 @@ function StatusIndicator({ status }: { status?: DocumentStatus }) {
- Processing...
+ Syncing
);
case "failed":
diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx
index 867fdc916..4133f2960 100644
--- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx
+++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx
@@ -119,7 +119,7 @@ export function RowActions({