feat: implement two-phase document indexing across Google connectors with real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 02:24:35 +05:30
parent 3bbac0d4ea
commit c12401c1e8
6 changed files with 681 additions and 331 deletions

View file

@ -2127,6 +2127,7 @@ async def run_google_gmail_indexing(
start_date: str | None,
end_date: str | None,
update_last_indexed: bool,
on_heartbeat_callback=None,
) -> tuple[int, str | None]:
# Use a reasonable default for max_messages
max_messages = 1000
@ -2139,6 +2140,7 @@ async def run_google_gmail_indexing(
end_date=end_date,
update_last_indexed=update_last_indexed,
max_messages=max_messages,
on_heartbeat_callback=on_heartbeat_callback,
)
# index_google_gmail_messages returns (int, str) but we need (int, str | None)
return indexed_count, error_message if error_message else None

View file

@ -1,5 +1,9 @@
"""
Google Calendar connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
import time
@ -11,7 +15,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.google_calendar_connector import GoogleCalendarConnector
from app.db import Document, DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -27,6 +31,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -284,7 +289,7 @@ async def index_google_calendar_events(
documents_indexed = 0
documents_skipped = 0
skipped_events = []
documents_failed = 0 # Track events that failed processing
duplicate_content_count = (
0 # Track events skipped due to duplicate content_hash
)
@ -292,14 +297,14 @@ async def index_google_calendar_events(
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all events, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
events_to_process = [] # List of dicts with document and event data
new_documents_created = False
for event in events:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:
event_id = event.get("id")
event_summary = event.get("summary", "No Title")
@ -307,14 +312,12 @@ async def index_google_calendar_events(
if not event_id:
logger.warning(f"Skipping event with missing ID: {event_summary}")
skipped_events.append(f"{event_summary} (missing ID)")
documents_skipped += 1
continue
event_markdown = calendar_client.format_event_to_markdown(event)
if not event_markdown.strip():
logger.warning(f"Skipping event with no content: {event_summary}")
skipped_events.append(f"{event_summary} (no content)")
documents_skipped += 1
continue
@ -341,82 +344,27 @@ async def index_google_calendar_events(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Google Calendar event {event_summary} unchanged. Skipping."
)
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Google Calendar event {event_summary}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location or "No location",
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
event_markdown, user_llm, document_metadata
)
else:
summary_content = (
f"Google Calendar Event: {event_summary}\n\n"
)
summary_content += f"Calendar: {calendar_id}\n"
summary_content += f"Start: {start_time}\n"
summary_content += f"End: {end_time}\n"
if location:
summary_content += f"Location: {location}\n"
if description:
desc_preview = description[:1000]
if len(description) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(event_markdown)
# Update existing document
existing_document.title = event_summary
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(
f"Successfully updated Google Calendar event {event_summary}"
)
continue
# Queue existing document for update (will be set to processing in Phase 2)
events_to_process.append({
'document': existing_document,
'is_new': False,
'event_markdown': event_markdown,
'content_hash': content_hash,
'event_id': event_id,
'event_summary': event_summary,
'calendar_id': calendar_id,
'start_time': start_time,
'end_time': end_time,
'location': location,
'description': description,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -434,52 +382,9 @@ async def index_google_calendar_events(
)
duplicate_content_count += 1
documents_skipped += 1
skipped_events.append(
f"{event_summary} (already indexed by another connector)"
)
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"event_id": event_id,
"event_summary": event_summary,
"calendar_id": calendar_id,
"start_time": start_time,
"end_time": end_time,
"location": location or "No location",
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
event_markdown, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Google Calendar Event: {event_summary}\n\n"
summary_content += f"Calendar: {calendar_id}\n"
summary_content += f"Start: {start_time}\n"
summary_content += f"End: {end_time}\n"
if location:
summary_content += f"Location: {location}\n"
if description:
desc_preview = description[:1000]
if len(description) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(event_markdown)
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=event_summary,
@ -491,23 +396,124 @@ async def index_google_calendar_events(
"start_time": start_time,
"end_time": end_time,
"location": location,
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new event {event_summary}")
new_documents_created = True
# Batch commit every 10 documents
events_to_process.append({
'document': document,
'is_new': True,
'event_markdown': event_markdown,
'content_hash': content_hash,
'event_id': event_id,
'event_summary': event_summary,
'calendar_id': calendar_id,
'start_time': start_time,
'end_time': end_time,
'location': location,
'description': description,
})
except Exception as e:
logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(events_to_process)} documents")
for item in events_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata_for_summary = {
"event_id": item['event_id'],
"event_summary": item['event_summary'],
"calendar_id": item['calendar_id'],
"start_time": item['start_time'],
"end_time": item['end_time'],
"location": item['location'] or "No location",
"document_type": "Google Calendar Event",
"connector_type": "Google Calendar",
}
summary_content, summary_embedding = await generate_document_summary(
item['event_markdown'], user_llm, document_metadata_for_summary
)
else:
summary_content = f"Google Calendar Event: {item['event_summary']}\n\n"
summary_content += f"Calendar: {item['calendar_id']}\n"
summary_content += f"Start: {item['start_time']}\n"
summary_content += f"End: {item['end_time']}\n"
if item['location']:
summary_content += f"Location: {item['location']}\n"
if item['description']:
desc_preview = item['description'][:1000]
if len(item['description']) > 1000:
desc_preview += "..."
summary_content += f"Description: {desc_preview}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['event_markdown'])
# Update document to READY with actual content
document.title = item['event_summary']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"event_id": item['event_id'],
"event_summary": item['event_summary'],
"calendar_id": item['calendar_id'],
"start_time": item['start_time'],
"end_time": item['end_time'],
"location": item['location'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
@ -515,19 +521,18 @@ async def index_google_calendar_events(
await session.commit()
except Exception as e:
logger.error(
f"Error processing event {event.get('summary', 'Unknown')}: {e!s}",
exc_info=True,
)
skipped_events.append(
f"{event.get('summary', 'Unknown')} (processing error)"
)
documents_skipped += 1
logger.error(f"Error processing Calendar event: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(
@ -535,6 +540,9 @@ async def index_google_calendar_events(
)
try:
await session.commit()
logger.info(
"Successfully committed all Google Calendar document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
@ -551,10 +559,15 @@ async def index_google_calendar_events(
else:
raise
# Build warning message if duplicates were found
warning_message = None
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_message = f"{duplicate_content_count} skipped (duplicate)"
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
total_processed = documents_indexed
await task_logger.log_task_success(
log_entry,
@ -563,14 +576,15 @@ async def index_google_calendar_events(
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_events_count": len(skipped_events),
},
)
logger.info(
f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped "
f"({duplicate_content_count} due to duplicate content from other connectors)"
f"Google Calendar indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return total_processed, warning_message

View file

@ -1,4 +1,9 @@
"""Google Drive indexer using Surfsense file processors."""
"""Google Drive indexer using Surfsense file processors.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
import logging
import time
@ -17,11 +22,12 @@ from app.connectors.google_drive import (
get_files_in_folder,
get_start_page_token,
)
from app.db import DocumentType, SearchSourceConnectorType
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
get_current_timestamp,
update_connector_last_indexed,
)
from app.utils.document_converters import generate_unique_identifier_hash
@ -324,8 +330,29 @@ async def index_google_drive_single_file(
display_name = file_name or file.get("name", "Unknown")
logger.info(f"Indexing Google Drive file: {display_name} ({file_id})")
# Create pending document for status visibility
pending_doc, should_skip = await _create_pending_document_for_file(
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if should_skip:
await task_logger.log_task_progress(
log_entry,
f"File {display_name} is unchanged or not indexable",
{"status": "skipped"},
)
return 0, None
# Commit pending document so it appears in UI
if pending_doc and pending_doc.id is None:
await session.commit()
# Process the file
indexed, skipped = await _process_single_file(
indexed, skipped, failed = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
@ -334,6 +361,7 @@ async def index_google_drive_single_file(
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
)
await session.commit()
@ -341,6 +369,15 @@ async def index_google_drive_single_file(
"Successfully committed Google Drive file indexing changes to database"
)
if failed > 0:
error_msg = f"Failed to index file {display_name}"
await task_logger.log_task_failure(
log_entry,
error_msg,
{"file_name": display_name, "file_id": file_id},
)
return 0, error_msg
if indexed > 0:
await task_logger.log_task_success(
log_entry,
@ -397,7 +434,12 @@ async def _index_full_scan(
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int]:
"""Perform full scan indexing of a folder."""
"""Perform full scan indexing of a folder.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Collect all files and create pending documents (visible in UI immediately)
- Phase 2: Process each file: pending processing ready/failed
"""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name} (include_subfolders={include_subfolders})",
@ -410,29 +452,31 @@ async def _index_full_scan(
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
files_processed = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Collect all files and create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
files_to_process = [] # List of (file, pending_document or None)
new_documents_created = False
# Queue of folders to process: (folder_id, folder_name)
folders_to_process = [(folder_id, folder_name)]
logger.info("Phase 1: Collecting files and creating pending documents")
while folders_to_process and files_processed < max_files:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
current_folder_id, current_folder_name = folders_to_process.pop(0)
logger.info(f"Processing folder: {current_folder_name} ({current_folder_id})")
logger.info(f"Scanning folder: {current_folder_name} ({current_folder_id})")
page_token = None
while files_processed < max_files:
# Get files and folders in current folder
# include_subfolders=True here so we get folder items to queue them
files, next_token, error = await get_files_in_folder(
drive_client,
current_folder_id,
@ -462,35 +506,74 @@ async def _index_full_scan(
logger.debug(f"Queued subfolder: {file.get('name', 'Unknown')}")
continue
# Process the file
files_processed += 1
indexed, skipped = await _process_single_file(
drive_client=drive_client,
# Create pending document for this file
pending_doc, should_skip = await _create_pending_document_for_file(
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
)
documents_indexed += indexed
documents_skipped += skipped
if should_skip:
documents_skipped += 1
continue
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
)
if pending_doc and pending_doc.id is None:
# New document was created
new_documents_created = True
files_to_process.append((file, pending_doc))
page_token = next_token
if not page_token:
break
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([f for f in files_to_process if f[1] and f[1].id is None])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each file one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(files_to_process)} files")
for file, pending_doc in files_to_process:
# Check if it's time for a heartbeat update
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
indexed, skipped, failed = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
)
documents_indexed += indexed
documents_skipped += skipped
documents_failed += failed
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(
f"Committed batch: {documents_indexed} files indexed so far"
)
logger.info(
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped"
f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed"
)
return documents_indexed, documents_skipped
@ -514,6 +597,10 @@ async def _index_with_delta_sync(
Note: include_subfolders is accepted for API consistency but delta sync
automatically tracks changes across all folders including subfolders.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Collect all changes and create pending documents (visible in UI immediately)
- Phase 2: Process each file: pending processing ready/failed
"""
await task_logger.log_task_progress(
log_entry,
@ -537,19 +624,21 @@ async def _index_with_delta_sync(
documents_indexed = 0
documents_skipped = 0
documents_failed = 0
files_processed = 0
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze changes and create pending documents for new/modified files
# =======================================================================
changes_to_process = [] # List of (change, file, pending_document or None)
new_documents_created = False
logger.info("Phase 1: Analyzing changes and creating pending documents")
for change in changes:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
if files_processed >= max_files:
break
@ -566,7 +655,45 @@ async def _index_with_delta_sync(
if not file:
continue
indexed, skipped = await _process_single_file(
# Create pending document for this file
pending_doc, should_skip = await _create_pending_document_for_file(
session=session,
file=file,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
)
if should_skip:
documents_skipped += 1
continue
if pending_doc and pending_doc.id is None:
# New document was created
new_documents_created = True
changes_to_process.append((change, file, pending_doc))
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each file one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(changes_to_process)} changes")
for change, file, pending_doc in changes_to_process:
# Check if it's time for a heartbeat update
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
indexed, skipped, failed = await _process_single_file(
drive_client=drive_client,
session=session,
file=file,
@ -575,21 +702,123 @@ async def _index_with_delta_sync(
user_id=user_id,
task_logger=task_logger,
log_entry=log_entry,
pending_document=pending_doc,
)
documents_indexed += indexed
documents_skipped += skipped
documents_failed += failed
if documents_indexed % 10 == 0 and documents_indexed > 0:
await session.commit()
logger.info(f"Committed batch: {documents_indexed} changes processed")
logger.info(
f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped"
f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, {documents_failed} failed"
)
return documents_indexed, documents_skipped
async def _create_pending_document_for_file(
session: AsyncSession,
file: dict,
connector_id: int,
search_space_id: int,
user_id: str,
) -> tuple[Document | None, bool]:
"""
Create a pending document for a Google Drive file if it doesn't exist.
This is Phase 1 of the 2-phase document status update pattern.
Creates documents with 'pending' status so they appear in UI immediately.
Args:
session: Database session
file: File metadata from Google Drive API
connector_id: ID of the Drive connector
search_space_id: ID of the search space
user_id: ID of the user
Returns:
Tuple of (document, should_skip):
- (existing_doc, False): Existing document that needs update
- (new_pending_doc, False): New pending document created
- (None, True): File should be skipped (unchanged, rename-only, or folder)
"""
from app.connectors.google_drive.file_types import should_skip_file
file_id = file.get("id")
file_name = file.get("name", "Unknown")
mime_type = file.get("mimeType", "")
# Skip folders and shortcuts
if should_skip_file(mime_type):
return None, True
if not file_id:
return None, True
# Generate unique identifier hash for this file
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
# Check if document exists
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing_document:
# Check if this is a rename-only update (content unchanged)
incoming_md5 = file.get("md5Checksum")
incoming_modified_time = file.get("modifiedTime")
doc_metadata = existing_document.document_metadata or {}
stored_md5 = doc_metadata.get("md5_checksum")
stored_modified_time = doc_metadata.get("modified_time")
# Determine if content changed
content_unchanged = False
if incoming_md5 and stored_md5:
content_unchanged = incoming_md5 == stored_md5
elif not incoming_md5 and incoming_modified_time and stored_modified_time:
# Google Workspace file - use modifiedTime as fallback
content_unchanged = incoming_modified_time == stored_modified_time
if content_unchanged:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
return None, True
# Content changed - return existing document for update
return existing_document, False
# Create new pending document
document = Document(
search_space_id=search_space_id,
title=file_name,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
document_metadata={
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
return document, False
async def _check_rename_only_update(
session: AsyncSession,
file: dict,
@ -725,15 +954,31 @@ async def _process_single_file(
user_id: str,
task_logger: TaskLoggingService,
log_entry: any,
) -> tuple[int, int]:
pending_document: Document | None = None,
) -> tuple[int, int, int]:
"""
Process a single file by downloading and using Surfsense's file processor.
Implements Phase 2 of the 2-phase document status update pattern.
Updates document status: pending processing ready/failed
Args:
drive_client: Google Drive client
session: Database session
file: File metadata from Google Drive API
connector_id: ID of the connector
search_space_id: ID of the search space
user_id: ID of the user
task_logger: Task logging service
log_entry: Log entry for tracking
pending_document: Optional pending document created in Phase 1
Returns:
Tuple of (indexed_count, skipped_count)
Tuple of (indexed_count, skipped_count, failed_count)
"""
file_name = file.get("name", "Unknown")
mime_type = file.get("mimeType", "")
file_id = file.get("id")
try:
logger.info(f"Processing file: {file_name} ({mime_type})")
@ -756,10 +1001,15 @@ async def _process_single_file(
# Return 1 for renamed files (they are "indexed" in the sense that they're updated)
# Return 0 for unchanged files
if "renamed" in (rename_message or "").lower():
return 1, 0
return 0, 1
return 1, 0, 0
return 0, 1, 0
_, error, _ = await download_and_process_file(
# Set document to PROCESSING status if we have a pending document
if pending_document:
pending_document.status = DocumentStatus.processing()
await session.commit()
_, error, metadata = await download_and_process_file(
client=drive_client,
file=file,
search_space_id=search_space_id,
@ -776,14 +1026,43 @@ async def _process_single_file(
f"Skipped {file_name}: {error}",
{"status": "skipped", "reason": error},
)
return 0, 1
# Mark pending document as failed if it exists
if pending_document:
pending_document.status = DocumentStatus.failed(error)
pending_document.updated_at = get_current_timestamp()
await session.commit()
return 0, 1, 0
# The document was created/updated by download_and_process_file
# Find the document and ensure it has READY status
if file_id:
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id
)
processed_doc = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if processed_doc:
# Ensure status is READY
if not DocumentStatus.is_state(processed_doc.status, DocumentStatus.READY):
processed_doc.status = DocumentStatus.ready()
processed_doc.updated_at = get_current_timestamp()
await session.commit()
logger.info(f"Successfully indexed Google Drive file: {file_name}")
return 1, 0
return 1, 0, 0
except Exception as e:
logger.error(f"Error processing file {file_name}: {e!s}", exc_info=True)
return 0, 1
# Mark pending document as failed if it exists
if pending_document:
try:
pending_document.status = DocumentStatus.failed(str(e))
pending_document.updated_at = get_current_timestamp()
await session.commit()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
return 0, 0, 1
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):

View file

@ -1,5 +1,9 @@
"""
Google Gmail connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
import time
@ -13,6 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.google_gmail_connector import GoogleGmailConnector
from app.db import (
Document,
DocumentStatus,
DocumentType,
SearchSourceConnectorType,
)
@ -32,6 +37,7 @@ from .base import (
get_connector_by_id,
get_current_timestamp,
logger,
safe_set_chunks,
update_connector_last_indexed,
)
@ -220,20 +226,21 @@ async def index_google_gmail_messages(
logger.info(f"Found {len(messages)} Google gmail messages to index")
documents_indexed = 0
skipped_messages = []
documents_skipped = 0
documents_failed = 0 # Track messages that failed processing
duplicate_content_count = 0 # Track messages skipped due to duplicate content_hash
# Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time()
# =======================================================================
# PHASE 1: Analyze all messages, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
new_documents_created = False
for message in messages:
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
try:
# Extract message information
message_id = message.get("id", "")
@ -259,7 +266,6 @@ async def index_google_gmail_messages(
if not message_id:
logger.warning(f"Skipping message with missing ID: {subject}")
skipped_messages.append(f"{subject} (missing ID)")
documents_skipped += 1
continue
@ -268,7 +274,6 @@ async def index_google_gmail_messages(
if not markdown_content.strip():
logger.warning(f"Skipping message with no content: {subject}")
skipped_messages.append(f"{subject} (no content)")
documents_skipped += 1
continue
@ -288,68 +293,25 @@ async def index_google_gmail_messages(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
logger.info(
f"Document for Gmail message {subject} unchanged. Skipping."
)
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
existing_document.status = DocumentStatus.ready()
documents_skipped += 1
continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Gmail message {subject}. Updating document."
)
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Google Gmail Message: {subject}\n\n"
summary_content += f"Sender: {sender}\n"
summary_content += f"Date: {date_str}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Update existing document
existing_document.title = subject
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
logger.info(f"Successfully updated Gmail message {subject}")
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append({
'document': existing_document,
'is_new': False,
'markdown_content': markdown_content,
'content_hash': content_hash,
'message_id': message_id,
'thread_id': thread_id,
'subject': subject,
'sender': sender,
'date_str': date_str,
})
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
@ -364,45 +326,11 @@ async def index_google_gmail_messages(
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Document doesn't exist - create new one
# Generate summary with metadata
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
# Fallback to simple summary if no LLM configured
summary_content = f"Google Gmail Message: {subject}\n\n"
summary_content += f"Sender: {sender}\n"
summary_content += f"Date: {date_str}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
# Process chunks
chunks = await create_document_chunks(markdown_content)
# Create and store new document
logger.info(f"Creating new document for Gmail message: {subject}")
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=subject,
@ -413,21 +341,111 @@ async def index_google_gmail_messages(
"subject": subject,
"sender": sender,
"date": date_str,
"connector_id": connector_id,
},
content=summary_content,
content_hash=content_hash,
content="Pending...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector_id,
)
session.add(document)
documents_indexed += 1
logger.info(f"Successfully indexed new email {summary_content}")
new_documents_created = True
# Batch commit every 10 documents
messages_to_process.append({
'document': document,
'is_new': True,
'markdown_content': markdown_content,
'content_hash': content_hash,
'message_id': message_id,
'thread_id': thread_id,
'subject': subject,
'sender': sender,
'date_str': date_str,
})
except Exception as e:
logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True)
documents_failed += 1
continue
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
for item in messages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata_for_summary = {
"message_id": item['message_id'],
"thread_id": item['thread_id'],
"subject": item['subject'],
"sender": item['sender'],
"date": item['date_str'],
"document_type": "Gmail Message",
"connector_type": "Google Gmail",
}
summary_content, summary_embedding = await generate_document_summary(
item['markdown_content'], user_llm, document_metadata_for_summary
)
else:
summary_content = f"Google Gmail Message: {item['subject']}\n\n"
summary_content += f"Sender: {item['sender']}\n"
summary_content += f"Date: {item['date_str']}\n"
summary_embedding = config.embedding_model_instance.embed(
summary_content
)
chunks = await create_document_chunks(item['markdown_content'])
# Update document to READY with actual content
document.title = item['subject']
document.content = summary_content
document.content_hash = item['content_hash']
document.embedding = summary_embedding
document.document_metadata = {
"message_id": item['message_id'],
"thread_id": item['thread_id'],
"subject": item['subject'],
"sender": item['sender'],
"date": item['date_str'],
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Gmail messages processed so far"
@ -435,45 +453,74 @@ async def index_google_gmail_messages(
await session.commit()
except Exception as e:
logger.error(
f"Error processing the email {message_id}: {e!s}",
exc_info=True,
)
skipped_messages.append(f"{subject} (processing error)")
documents_skipped += 1
continue # Skip this message and continue with others
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
# Update the last_indexed_at timestamp for the connector only if requested
total_processed = documents_indexed
if total_processed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed")
await session.commit()
logger.info(
"Successfully committed all Google gmail document changes to database"
)
try:
await session.commit()
logger.info(
"Successfully committed all Google Gmail document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"This may occur if the same message was indexed by multiple connectors. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
# Don't fail the entire task - some documents may have been successfully indexed
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
warning_message = ", ".join(warning_parts) if warning_parts else None
total_processed = documents_indexed
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully completed Google gmail indexing for connector {connector_id}",
f"Successfully completed Google Gmail indexing for connector {connector_id}",
{
"events_processed": total_processed,
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"skipped_messages_count": len(skipped_messages),
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
},
)
logger.info(
f"Google gmail indexing completed: {documents_indexed} new emails, {documents_skipped} skipped"
f"Google Gmail indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return (
total_processed,
None,
) # Return None as the error message to indicate success
warning_message,
) # Return warning_message (None on success)
except SQLAlchemyError as db_error:
await session.rollback()

View file

@ -17,7 +17,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
from app.db import Document, DocumentType, Log, Notification
from app.db import Document, DocumentStatus, DocumentType, Log, Notification
from app.services.llm_service import get_user_long_context_llm
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
@ -499,6 +499,7 @@ async def add_received_file_document_using_unstructured(
existing_document.blocknote_document = blocknote_json
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
await session.commit()
await session.refresh(existing_document)
@ -528,6 +529,7 @@ async def add_received_file_document_using_unstructured(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
)
session.add(document)
@ -640,6 +642,7 @@ async def add_received_file_document_using_llamacloud(
existing_document.blocknote_document = blocknote_json
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
await session.commit()
await session.refresh(existing_document)
@ -669,6 +672,7 @@ async def add_received_file_document_using_llamacloud(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
)
session.add(document)
@ -806,6 +810,7 @@ async def add_received_file_document_using_docling(
existing_document.blocknote_document = blocknote_json
existing_document.content_needs_reindexing = False
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
await session.commit()
await session.refresh(existing_document)
@ -835,6 +840,7 @@ async def add_received_file_document_using_docling(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
)
session.add(document)

View file

@ -7,7 +7,7 @@ import logging
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType
from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -270,6 +270,7 @@ async def add_received_markdown_file_document(
existing_document.chunks = chunks
existing_document.blocknote_document = blocknote_json
existing_document.updated_at = get_current_timestamp()
existing_document.status = DocumentStatus.ready() # Mark as ready
await session.commit()
await session.refresh(existing_document)
@ -297,6 +298,7 @@ async def add_received_markdown_file_document(
updated_at=get_current_timestamp(),
created_by_id=user_id,
connector_id=connector.get("connector_id") if connector else None,
status=DocumentStatus.ready(), # Mark as ready
)
session.add(document)